Sunday, August 29, 2004

 

Some notes on IOCP Thread Pooling in C# - 1

I.
IOCP Thread Pooling in C#
By William Kennedy
From http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-I/
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-II/

Continuum Technology Center
Introduction

When building server based applications in C#, it is important to have the ability to create thread pools. Thread pools allow our server to queue and perform work in the most efficient and scalable way possible. Without thread pooling we are left with two options. The first option is to perform all of the work on a single thread. The second option is to spawn a thread every time some piece of work needs to be done. For this article, work is defined as an event that requires the processing of code. Work may or may not be associated with data, and it is our job to process all of the work our server receives in the most efficient and fastest way possible.

As a general rule, if you can accomplish all of the work required with a single thread, then only use a single thread. Having multiple threads performing work at the same time does not necessarily mean our application is getting more work done, or getting work done faster. This is true for many reasons.

For example, if you spawn multiple threads which attempt to access the same resource bound to a synchronization object, like a monitor object, these threads will serialize and fall in line waiting for the resource to become available. As each thread tries to access the resource, it has the potential to block, and wait for the thread that owns the resource to release the resource. At that point, these waiting threads are put to sleep, and not getting any work done. In fact, these waiting threads have caused more work for the operating system to perform. Now the operating system must task another thread to perform work, and then determine which thread, waiting for the resource, may access the resource next, once it becomes available. If the threads that need to perform work are sleeping, because they are waiting for the resource to become available, we have actually created a performance problem. In this case it would be more efficient to queue up this work and have a single thread process the queue.

Threads that start waiting for a resource before other threads, are not guaranteed to be given the resource first. In diagram A, thread 1 requests access to the resource before thread 2, and thread 2 requests access to the resource before thread 3. The operating system however decides to give the resource to thread 1 first, then thread 3, and then thread 2. This scenario causes work to be performed in an undetermined order. The possible issues are endless when dealing with multi-threaded applications.

If work received can be performed independent of each other, we could always spawn a thread for processing that piece of work. The problem here is that an operating system like Windows has severe performance problems when a large number of threads are created or running at the same time, waiting to have access to the CPU. The Windows operating system needs to manage all of these threads, and compared to the UNIX operating system, it just doesn’t hold up. If large amounts of work are issued to the server, this model will most likely cause the Windows operating system to become overloaded. System performance will degrade drastically.

This article is a case study comparing thread performance between Windows NT and Solaris.

http://www.usenix.org/publications/library/proceedings/usenix-nt98/full_papers/zabatta/zabatta_html/zabatta.html

In the .NET framework, the “System.Threading” namespace has a ThreadPool class. Unfortunately, it is a static class and therefore our server can only have a single thread pool. This isn’t the only issue. The ThreadPool class does not allow us to set the concurrency level of the thread pool. The concurrency level is the most important setting when configuring a thread pool. The concurrency level defines how many threads in the pool may be in an “active state” at the same time. If we set this parameter correctly, we will have the most efficient, performance enhanced thread pool for the work being processed.

Imagine we have a thread pool with 4 threads and a concurrency level of 1. Then, three pieces of work are queued up for processing in the pool. Since the concurrency level for the thread pool is 1, only a single thread from the pool is activated and given work from the queue. Even though there are two pieces of work queued up, no other threads are activated. This is because the concurrency level is set to 1. If the concurrency level was set to 2, then another thread would have been activated immediately and given work from the queue. In diagram B we have thread 1 running and all of the other threads sleeping with two pieces of work queued.

So the question exists, why have more than 1 thread in the pool if the concurrency level is set to 1? If thread 1 in diagram B ever goes to sleep before it completes its work, another thread from the pool will be activated. When thread 1 goes to sleep, there are 0 threads “active” in the pool and it is ok to activate a new thread based on the concurrency level. In diagram C, we now have thread 1 sleeping and thread 4 running with one piece of work queued.

Eventually, thread 1 will wake up, and it is possible for thread 4 to still be active. We have 2 threads active in the pool, even though the concurrency level is set to 1. In diagram D, we now have thread 1 and thread 4 running and one piece of work still queued.

The last piece of work in the queue will need to wait until both threads return to a sleeping state. This is because the concurrency level is set to 1. As we can see, even though the concurrency level restricts the number of active threads in the pool at any given time, we could have more active threads then the concurrency level allows. It all depends on the state of the threads in the pool and how fast the threads can complete the work they are processing.

A good rule of thumb is to set the concurrency level to match the number of CPU’s in the system. If the machine our server is running on only has one CPU, then only one thread can be executing at any given time. It will require a task swap to have another thread get CPU time. We want to reduce the number of active threads at any given time to maximize performance. This also leads to scalability. As the number of CPU’s increase, we can increase the concurrency level because there is a CPU to execute that thread. This is a general rule and is always a good starting point for configuring our thread pools.

The bottom line is, if the CPU is available, and there is work to perform, activate a thread. If the CPU is not available, do not activate a thread. One other thing, we need to be careful that we don’t cause a situation where the threads in the pool are constantly being put to sleep for long periods of time during the processing of work. This may cause all of the threads in the pool to constantly be in an active state, defeating the efficiency of the pool and the performance of the server.

The remaining scope of this article will show you how to add IOCP thread pools to your C# server based applications. How to configure the thread pools for your specific application will not be covered. It is suggested to use the general rules as discussed.

System Requirements

A basic understanding of C# is required to follow through the examples and the classes. Basic concepts of type, properties, threading, synchronization, and delegates are required.

Defining the Problem

IOCP thread support has not been made available to C# developers through the “System.Threading” namespace. We need to access the Win32 API calls from the Kernel32.dll. This requires us to write unsafe code. This is really not a problem, but something that needs to be discussed. Let’s take a look at the Win32 API calls we need to implement an IOCP thread pool.

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

This Win32 API call is used to create an IOCP thread pool. The first argument will always be set to INVALID_HANDLE_VALUE, which is 0xFFFFFFFF. This tells the operating system this IOCP thread pool is not linked to a device. The second argument will always be set to 0. There is no existing IOCP thread pool because we are creating this for the first time. The third argument will always be null. We do not require a key because we have not associated this IOCP thread pool with a device. The last argument is the important argument. Here we define the concurrency level of the thread pool. If we pass a 0 for this argument the operating system will set the concurrency level to match the number of CPU’s in the machine. This option gives us our best chance to be scalable and take advantage of the number of CPU’s present in the machine. This API call will return a handle to the newly created IOCP thread pool. If the API call fails, it will return null.

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean CloseHandle(UInt32 hObject);

This Win32 API call is used to close our thread pool. The only argument is the handle to the IOCP thread pool. This API call will return TRUE or FALSE if the handle can not be closed.

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, OVERLAPPED* pOverlapped);

This Win32 API call is used to post work in the IOCP thread pool queue. Other threads in our application will make this Win32 API call. The first argument is the handle to the IOCP thread pool. The second argument is the size of the data we are posting to the queue. The third argument is a value or a reference to an object or data structure we are posting to the queue. The last argument will always be null. The following diagram shows how the data is associated with the posted work.

In diagram E, we have two threads actively processing posted work and one piece of work on the queue waiting for its data to be processed. The thing to note here is that each piece of work was given a reference to its specific data. I am calling this variable pData to help describe what is happening in the IOCP thread pool. The actual name or structure of this variable is undocumented.

When we make this API call in a C++ application, we can pass the address of any object in memory we wish, as in diagram E. In C#, we don’t have the same luxury because of the managed heap. The managed heap is a contiguous region of address space that contains all of the memory allocated for reference variables. The heap maintains a pointer that indicates where the next object is to be allocated, and all allocations are contiguous from that point. This is much different from the C-runtime heap.

The C-runtime heap uses a link list of data structures to reference available memory blocks. For the C-runtime heap to allocate memory, it must walk through the link list until a large enough block of free memory is found. Then the free block of memory must be resized, and the link list adjusted. If objects are allocated consecutively in a C++ application, those objects could be allocated anywhere on the heap. This can never happen with the managed heap. Objects that are allocated consecutively in a C# application will always be allocated consecutively on the managed heap. The catch is that the managed heap must be compacted to guarantee the heap does not run out of memory. That is the job of garbage collection.

For more information on garbage collection, try these links:

http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpguide/html/cpconautomaticmemorymanagement.asp

http://msdn.microsoft.com/msdnmag/issues/1100/GCI/default.aspx

http://msdn.microsoft.com/msdnmag/issues/1200/GCI2/default.aspx

In diagram F, we have allocated four objects on the managed heap. Imagine that the managed heap has allocated memory for these objects at address FDEO, FDDO, FDCO, and FDBO. This would mean the value of pClass1 is FDEO, the value of pClass2 is FDDO, the value of pClass3 is FDCO, and the value of pClass4 is FDBO.

MyClass pClass1 = new MyClass();

MyClass pClass2 = new MyClass();

MyClass pClass3 = new MyClass();

MyClass pClass4 = new MyClass();

Now we write the following code.

pClass2 = null;

Diagram G shows what happens to the managed heap after garbage collection takes place and the managed heap is compacted.

The Class 2 object has been removed from the managed heap and the Class3 and Class 4 objects have been moved. Now the value of pClass3 is FDDO and the value of pClass4 is FDCO. The value that the pointer points to has changed. The garbage collection process changes the values of all reference variables to make sure they are pointing to the correct objects after the managed heap is compacted.

So what does this mean for our IOCP thread pool implementation? If we pass the reference of a managed object as the data for the work, there is a chance the reference is no longer valid when a thread in the pool is chosen to work on the data.

In diagram H, we have passed a reference to the Class 3 object as the data for the work posted to the IOCP thread pool. This object is at address FDCO. Before the work is given to thread 1, the Class 2 object is marked for deletion. Then the garbage collection process runs, and the managed heap is compacted. Now in diagram I, the work has been given to thread 1 for processing. The value of pData is still FDCO, but Class 3 is no longer at address FDCO, it is at address FDDO. The thread will perform the work, but using Class 4 instead of Class 3.

The garbage collection process can not change the value of pData, as it does with other variables, because this variable is not a managed variable. It is a variable owned by the IOCP thread pool and exists outside the scope of the CLR. The garbage collector has no knowledge of this variable or access to this variable. The variable is set during the unsafe call to PostQueuedCompletionStatus.

Unfortunately, pinning the objects we want to pass as the data for the work posted to the IOCP thread pool is not a possible solution. Pinning provides the ability to prevent an object from being moved on the manage heap during the garbage collection process. We can not pin these objects because there is no way to pin an object in one thread and unpin the object in a different thread. To pin an object, we need to use the fixed keyword. This keyword can only be used in the context of a single method. Here is a quick example of pinning.

Int32 iArray = new Int32[5] {12, 34, 56, 78, 90};

unsafe
{
fixed (Int32* piArray = iArray)
{
// Do Something
}
}

The safest thing we can do is pass a value to the IOCP thread pool. This value could be the index from a managed array, containing a reference to an object on the managed heap. If the garbage collection process does compact the heap, the index values of the array will not change. In diagram J and K, we can see one way to properly pass data for the work posted to the IOCP thread pool. After the garbage collection process compacts the heap, the values of pData change, but the index positions to the pData variables do not change.

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds);

The final Win32 API call is used to add threads to the IOCP thread pool. Any thread that makes this Win32 API call will become part of the IOCP thread pool. This is a blocking call and the method will return when the IOCP thread pool chooses the thread to perform work. The first argument is the handle to the IOCP thread pool. The second argument is the size of the data associated with the work. This value was provided when the work was posted. The third argument is the data value or data reference associated with the work. This value was provided when the work was posted. The forth argument is the address to a pointer of type OVERLAPPED. This address is returned after the call. The last argument is the time in milliseconds the thread should wait to be activated to perform work. We will always pass INFINITE or 0xFFFFFFFF.

These are the Win32 API calls we need to add IOCP thread pool support to our C# server based applications. We need to encapsulate these Win32 API calls using .NET threads and minimize the sections of unsafe code. We need to prevent the application developer from passing a reference variable into the IOCP thread pool, by restricting them to passing only integer values.

Defining the Solution
We will build a class that encapsulates a single IOCP thread pool. The application developer will be able to instantiate as many thread pools as he wishes. During construction, the application developer will be able to: set the concurrency level of the thread pool, set the minimum and maximum number of threads in the pool, and will be able to provide a method to be called when work posted to the thread pool needs to be processed. The application developer will also be able to post work with data into the IOCP thread pool.

Component Design and Coding

Start by adding a new class to your C# project. Remove all of the code provided by the Visual Studio .NET wizard. Then add the following namespaces. The System.Runtime.InteropServices namespace is required to access the Win32 API methods from the Kernel32 DLL.

using System;

using System.Threading;

using System.Runtime.InteropServices;

Don’t forget to change the project properties to allow unsafe code blocks. This can be done by opening the project properties and selecting the Configuration Properties. Under the Build / Code Generation section you will see “Allow unsafe code blocks”. Set this to true.

Next add the namespace. You will notice that I have defined a two level namespace. This is great when you are building a class library with many different classes.

namespace Continuum.Threading
{


The PostQueuedCompletionStatus and GetQueuedCompletionStats Win32 API methods both require a pointer to the Win32 OVERLAPPED structure. Because this structure will be used by the unsafe Win32 API call, we need to make sure the structure is aligned exactly the same way it would be in our C++ applications. This can be accomplished by using the StructLayout attribute. By setting the attribute to “LayoutKind.Sequential”, the structure will be aligned based on the same rules as the C++ compiler.

The structure requires members that are pointers. The only way to add pointers to this structure is to use the unsafe keyword. We can still use the FCL types when defining the structure. This is very important because we can make sure the structure is identical to the C++ version.

// Structures

//=============================================================================

/// This is the WIN32 OVERLAPPED structure

[StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]

public unsafe struct OVERLAPPED

{

UInt32* ulpInternal;

UInt32* ulpInternalHigh;

Int32 lOffset;

Int32 lOffsetHigh;

UInt32 hEvent;

}

Now it is time to define the IOCP thread pool class. I am using the keyword sealed in the definition of this class. The sealed keyword tells the compiler that this class can not be inherited. If you know there is no reason for a class to be inherited, use the sealed keyword. Certain run-time optimizations are enabled for the class when the sealed keyword is used.

// Classes

//=============================================================================



/// This class provides the ability to create a thread pool to manage work. The



/// class abstracts the Win32 IOCompletionPort API so it requires the use of



/// unmanaged code. Unfortunately the .NET framework does not provide this functionality




public sealed class IOCPThreadPool

{

The first section of the IOCP thread pool class is the Win32 function prototypes. These are the same ones described earlier.


// Win32 Function Prototypes


/// Win32Func: Create an IO Completion Port Thread Pool



[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

/// Win32Func: Closes an IO Completion Port Thread Pool

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean CloseHandle(UInt32 hObject);

/// Win32Func: Posts a context based event into an IO Completion Port Thread Pool

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, OVERLAPPED* pOverlapped);

/// Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.

/// All threads in the pool wait in this Win32 Function


[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds);

The next section is the constants section. Here we need to define the Win32 constants required for the Win32 API calls we are going to make later.

// Constants

/// SimTypeConst: This represents the Win32 Invalid Handle Value Macro

private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;

/// SimTypeConst: This represents the Win32 INFINITE Macro

private const UInt32 INIFINITE = 0xffffffff;

/// SimTypeConst: This tells the IOCP Function to shutdown

private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;

The delegate function type section is where we define any delegate functions. We need one delegate function type to define the signature of the function we will call when work needs to be processed.


// Delegate Function Types

/// DelType: This is the type of user function to be supplied for the thread pool

public delegate void USER_FUNCTION(Int32 iValue);

These private properties are required to maintain the application developer’s settings. The most interesting property is the GetUserFunction property. This property contains a reference to a method supplied by the application developer. We will use this property to call the application developers method.


// Private Properties

private UInt32 m_hHandle;

/// SimType: Contains the IO Completion Port Thread Pool handle for this instance

private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }

private Int32 m_uiMaxConcurrency;

/// SimType: The maximum number of threads that may be running at the same time



private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }





private Int32 m_iMinThreadsInPool;

/// SimType: The minimal number of threads the thread pool maintains



private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }





private Int32 m_iMaxThreadsInPool;

/// SimType: The maximum number of threads the thread pool maintains



private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }





private Object m_pCriticalSection;

/// RefType: A serialization object to protect the class state



private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }





private USER_FUNCTION m_pfnUserFunction;

/// DelType: A reference to a user specified function to be call by the thread pool



private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }





private Boolean m_bDisposeFlag;

/// SimType: Flag to indicate if the class is disposing



private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }



These public properties are used to determine if new threads need to be added to the thread pool. These properties also provide statistical data about the thread pool. Here we use the Interlocked class to provide serialization when we increment or decrement these properties. This is the least expensive way to perform serialization.


// Public Properties





private Int32 m_iCurThreadsInPool;

/// SimType: The current number of threads in the thread pool



public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }

/// SimType: Increment current number of threads in the thread pool



private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }

/// SimType: Decrement current number of threads in the thread pool



private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }





private Int32 m_iActThreadsInPool;

/// SimType: The current number of active threads in the thread pool



public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }

/// SimType: Increment current number of active threads in the thread pool



private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }

/// SimType: Decrement current number of active threads in the thread pool



private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }





private Int32 m_iCurWorkInPool;

/// SimType: The current number of Work posted in the thread pool



public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }

/// SimType: Increment current number of Work posted in the thread pool



private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }

/// SimType: Decrement current number of Work posted in the thread pool



private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }

The constructor method does several things. The class state is initialized and then the IOCP thread pool is created with a call to the CreateIoCompletionPort method. Notice the method call is within the scope of the unsafe keyword. This is required because we are passing pointers into the Win32 API call. The last thing we do is create the minimal number of threads specified by the application developer. Notice we use the .NET threading classes to create the threads. We do not need to use the unsafe CreateThread method. One might think we need to because these threads will be calling the GetQueuedCompletionStatus Win32 API method.


// Constructor, Finalize, and Dispose


//********************************************************************



/// Constructor



/// SimType: Max number of running threads allowed



/// SimType: Min number of threads in the pool



/// SimType: Max number of threads in the pool



/// DelType: Reference to a function to call to perform work



/// Unhandled Exception



public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)

{

try



{

// Set initial class state



GetMaxConcurrency = iMaxConcurrency;

GetMinThreadsInPool = iMinThreadsInPool;

GetMaxThreadsInPool = iMaxThreadsInPool;

GetUserFunction = pfnUserFunction;





// Init the thread counters



GetCurThreadsInPool = 0;

GetActThreadsInPool = 0;

GetCurWorkInPool = 0;





// Initialize the Monitor Object



GetCriticalSection = new Object();





// Set the disposing flag to false



IsDisposed = false;





unsafe



{

// Create an IO Completion Port for Thread Pool use



GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);

}





// Test to make sure the IO Completion Port was created



if (GetHandle == 0)

throw new Exception("Unable To Create IO Completion Port");





// Allocate and start the Minimum number of threads specified



Int32 iStartingCount = GetCurThreadsInPool;



ThreadStart tsThread = new ThreadStart(IOCPFunction);

for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)

{

// Create a thread and start it



Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();





// Increment the thread pool count



IncCurThreadsInPool();

}

}





catch



{

throw new Exception("Unhandled Exception");

}

}





The finalize method is only required to guarantee the IOCP thread pool handle is closed. As a general rule, if a class allocates a resource outside the scope of the .NET framework, a finalize method is required else do not add a finalize method. A finalize method will cause the garbage collection process to spend more time trying to release the memory for the object.





//********************************************************************



/// Finalize called by the GC



~IOCPThreadPool()

{

if (!IsDisposed)

Dispose();

}





The dispose method will not return until all of the threads in the pool have been terminated. We can’t use the Abort method to kill the threads in the pool because any thread blocked, via the call to the GetQueuedCompletionStatus Win32 API method, will not respond to the Abort message. The GetQueuedCompletionStatus Win32 API method will cause the thread to run outside the scope of the CLR and the .NET framework will lose access to the thread. So what we do is post work into the IOCP thread pool. We pass the SHUTDOWN_IOCPTHREAD data when we post the work. This will tell the thread to terminate. Then, we wait in a spin lock, until all of the threads have terminated. The last thing is to close the IOCP thread pool.





//********************************************************************



/// Called when the object will be shutdown. This



/// function will wait for all of the work to be completed



/// inside the queue before completing




public void Dispose()

{

try



{

// Flag that we are disposing this object



IsDisposed = true;





// Get the current number of threads in the pool



Int32 iCurThreadsInPool = GetCurThreadsInPool;





// Shutdown all thread in the pool



for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)

{

unsafe



{

bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

}

}





// Wait here until all the threads are gone



while (GetCurThreadsInPool != 0) Thread.Sleep(100);





unsafe



{

// Close the IOCP Handle



CloseHandle(GetHandle);

}

}





catch



{

}

}





The only private method is the IOCPFunction method. This method is spawned as a thread and is made part of the IOCP thread pool by calling the GetQueuedCompletionStatus Win32 API method. When the GetQueuedCompletionStatus Win32 API method returns, we check to make sure we are not being asked to shutdown the thread. The third argument is the data associated with the posted work. If the data is not SHUTDOWN_IOCPTHREAD, then real work has been posted into the IOCP thread pool and this thread has been chosen to process the work. The application developer’s supplied user function is called since the application developer is the only one who knows what needs to be done. Once that is complete, the method checks if a new thread should be added to the pool. This is done by reviewing the number of active threads in the pool.





// Private Methods





//********************************************************************



/// IOCP Worker Function that calls the specified user function



private void IOCPFunction()

{

UInt32 uiNumberOfBytes;

Int32 iValue;





try



{

while (true)

{

unsafe



{

OVERLAPPED* pOv;





// Wait for an event



GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);

}



// Decrement the number of events in queue



DecCurWorkInPool();





// Was this thread told to shutdown



if (iValue == SHUTDOWN_IOCPTHREAD)

break;





// Increment the number of active threads



IncActThreadsInPool();





try



{

// Call the user function



GetUserFunction(iValue);

}





catch



{

}





// Get a lock



Monitor.Enter(GetCriticalSection);





try



{

// If we have less than max threads currently in the pool



if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool



if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it



ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();





// Increment the thread pool count



IncCurThreadsInPool();

}

}

}

}





catch



{

}





// Relase the lock



Monitor.Exit(GetCriticalSection);





// Increment the number of active threads



DecActThreadsInPool();

}

}





catch



{

}





// Decrement the thread pool count



DecCurThreadsInPool();

}





The last two public methods are the PostEvent methods. The first method takes an integer as an argument and the second version takes no argument at all. The integer is the data the application developer wishes to pass with the work posted into the IOCP thread pool. In the PostQueuedCompletionStatus Win32 API call, we can see that the third argument is where we pass the data value. Since this value is always an integer we set the size of the data to four, as seen in the second argument. Like in the IOCPFunction, we check to see if we need to add a new thread to the pool.





// Public Methods







//********************************************************************



/// IOCP Worker Function that calls the specified user function



/// SimType: A value to be passed with the event



/// Unhandled Exception



public void PostEvent(Int32 iValue)

{

try



{

// Only add work if we are not disposing



if (IsDisposed == false)

{

unsafe



{

// Post an event into the IOCP Thread Pool



PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);

}





// Increment the number of item of work



IncCurWorkInPool();





// Get a lock



Monitor.Enter(GetCriticalSection);





try



{

// If we have less than max threads currently in the pool



if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool



if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it



ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();





// Increment the thread pool count



IncCurThreadsInPool();

}

}

}

}





catch



{

}





// Release the lock



Monitor.Exit(GetCriticalSection);

}

}





catch (Exception e)

{

throw e;

}





catch



{

throw new Exception("Unhandled Exception");

}

}





//********************************************************************



/// IOCP Worker Function that calls the specified user function



/// Unhandled Exception



public void PostEvent()

{

try



{

// Only add work if we are not disposing



if (IsDisposed == false)

{

unsafe



{

// Post an event into the IOCP Thread Pool



PostQueuedCompletionStatus(GetHandle, 0, null, null);

}





// Increment the number of item of work



IncCurWorkInPool();





// Get a lock



Monitor.Enter(GetCriticalSection);





try



{

// If we have less than max threads currently in the pool



if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool



if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it



ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();

// Increment the thread pool count



IncCurThreadsInPool();

}

}

}

}





catch



{

}





// Release the lock



Monitor.Exit(GetCriticalSection);

}

}





catch (Exception e)

{

throw e;

}





catch



{

throw new Exception("Unhandled Exception");

}

}

}

}

We have now completed the implementation of the IOCP thread pool class. Now it is time to test it.

The Sample Application

Start by adding a new class to your C# project. Remove all of the code provided by the Visual Studio .NET wizard. Then add all of the following code. In Main, an IOCP thread pool is created, and a single piece of work is posted to the IOCP thread pool. We pass the data value of 10 along with the posted work. The main thread is then put to sleep. This gives the IOCP thread function time to wake up to process the work posted. The last thing in main is to dispose the IOCP thread pool. The IOCP thread function displays the value of the data passed into the IOCP thread pool.


using System;

using System.Threading; // Included for the Thread.Sleep call

using Continuum.Threading;

namespace Sample

{

//=============================================================================



/// Sample class for the threading class



public class UtilThreadingSample

{

//*****************************************************************************



/// Test Method



static void Main()

{

// Create the MSSQL IOCP Thread Pool



IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 5, 10, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));



pThreadPool.PostEvent(10);



Thread.Sleep(100);



pThreadPool.Dispose();

}



//********************************************************************



/// Function to be called by the IOCP thread pool. Called when



/// a command is posted for processing by the SocketManager




/// The value provided by the thread posting the event



static public void IOCPThreadFunction(Int32 iValue)

{

try



{

Console.WriteLine("Value: {0}", iValue);



}



catch (Exception pException)

{

Console.WriteLine(pException.Message);

}

}

}

}

This is what you should see when you run the sample application. On your own change the main function to call the PostEvent method several times and see how the IOCP thread pool performs.

Review
This class provides the IOCP thread pooling support your server based software requires. By using this class, you can build efficient and robust servers that can scale as the amount of work increases. Though there are some limitations due to the nature of the managed heap and unsafe code, these limitations can easily be overcome with a little ingenuity.

II.
From http://blog.csdn.net/zhengyun_ustc/archive/2005/04/13/346030.aspx

[C#]I/O完成端口的类定义和测试实例
从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用
System.Threading.NativeOverlapped.

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:

整理者:郑昀@UltraPower

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1 先在主线程中调用CreateIoCompletionPort创建IOCP。

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL和0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2 我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT为1秒。

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息: 传输的字节数、完成键和OVERLAPPED结构的地址。

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

需要注意的是:

第一, 线程池的数目是有限制的,和CPU数目有关系。

第二, IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

测试代码:

using System;
using System.Threading; // Included for the Thread.Sleep call
using Continuum.Threading;
using System.Runtime.InteropServices;

namespace IOCPDemo
{
//=============================================================================
/**//// Sample class for the threading class
public class UtilThreadingSample
{
//*****************************************************************************
/**//// Test Method
static void Main()
{
// Create the MSSQL IOCP Thread Pool
IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));

//for(int i =1;i<10000;i++)
{
pThreadPool.PostEvent(1234);
}

Thread.Sleep(100);

pThreadPool.Dispose();
}

//********************************************************************
/**//// Function to be called by the IOCP thread pool. Called when
/// a command is posted for processing by the SocketManager

/// The value provided by the thread posting the event
static public void IOCPThreadFunction(int iValue)
{
try
{
Console.WriteLine("Value: {0}", iValue.ToString());
Thread.Sleep(3000);
}

catch (Exception pException)
{
Console.WriteLine(pException.Message);
}
}
}

}


类代码:
using System;
using System.Threading;
using System.Runtime.InteropServices;

namespace IOCPThreading
{
[StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]

public sealed class IOCPThreadPool
{
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean CloseHandle(UInt32 hObject);

[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);

[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);

private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;
private const UInt32 INIFINITE = 0xffffffff;
private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;
public delegate void USER_FUNCTION(int iValue);
private UInt32 m_hHandle;
private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }

private Int32 m_uiMaxConcurrency;

private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }


private Int32 m_iMinThreadsInPool;

private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }

private Int32 m_iMaxThreadsInPool;

private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }


private Object m_pCriticalSection;

private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }


private USER_FUNCTION m_pfnUserFunction;

private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }


private Boolean m_bDisposeFlag;

/**//// SimType: Flag to indicate if the class is disposing

private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }

private Int32 m_iCurThreadsInPool;

/**//// SimType: The current number of threads in the thread pool

public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }

/**//// SimType: Increment current number of threads in the thread pool

private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }

/**//// SimType: Decrement current number of threads in the thread pool

private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }


private Int32 m_iActThreadsInPool;

/**//// SimType: The current number of active threads in the thread pool

public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }

/**//// SimType: Increment current number of active threads in the thread pool

private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }

/**//// SimType: Decrement current number of active threads in the thread pool

private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }


private Int32 m_iCurWorkInPool;

/**//// SimType: The current number of Work posted in the thread pool

public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }

/**//// SimType: Increment current number of Work posted in the thread pool

private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }

/**//// SimType: Decrement current number of Work posted in the thread pool

private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }

public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
{
try
{
// Set initial class state

GetMaxConcurrency = iMaxConcurrency;

GetMinThreadsInPool = iMinThreadsInPool;

GetMaxThreadsInPool = iMaxThreadsInPool;

GetUserFunction = pfnUserFunction;


// Init the thread counters

GetCurThreadsInPool = 0;

GetActThreadsInPool = 0;

GetCurWorkInPool = 0;


// Initialize the Monitor Object

GetCriticalSection = new Object();


// Set the disposing flag to false

IsDisposed = false;


unsafe
{

// Create an IO Completion Port for Thread Pool use
GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);

}


// Test to make sure the IO Completion Port was created

if (GetHandle == 0)

throw new Exception("Unable To Create IO Completion Port");


// Allocate and start the Minimum number of threads specified

Int32 iStartingCount = GetCurThreadsInPool;



ThreadStart tsThread = new ThreadStart(IOCPFunction);

for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
{

// Create a thread and start it

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();


// Increment the thread pool count

IncCurThreadsInPool();

}

}


catch
{

throw new Exception("Unhandled Exception");

}

}

~IOCPThreadPool()
{

if (!IsDisposed)

Dispose();

}

public void Dispose()
{

try
{

// Flag that we are disposing this object

IsDisposed = true;


// Get the current number of threads in the pool

Int32 iCurThreadsInPool = GetCurThreadsInPool;


// Shutdown all thread in the pool

for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
{
unsafe
{

bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

}

}


// Wait here until all the threads are gone

while (GetCurThreadsInPool != 0) Thread.Sleep(100);


unsafe
{

// Close the IOCP Handle
CloseHandle(GetHandle);

}

}

catch
{

}

}
private void IOCPFunction()
{
UInt32 uiNumberOfBytes;

Int32 iValue;

try
{
while (true)
{

unsafe
{

System.Threading.NativeOverlapped* pOv;


// Wait for an event

GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);
}

// Decrement the number of events in queue

DecCurWorkInPool();


// Was this thread told to shutdown

if (iValue == SHUTDOWN_IOCPTHREAD)

break;


// Increment the number of active threads

IncActThreadsInPool();


try
{
// Call the user function
GetUserFunction(iValue);

}

catch(Exception ex)
{
throw ex;
}


// Get a lock

Monitor.Enter(GetCriticalSection);


try
{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)
{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)
{

if (IsDisposed == false)
{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();


// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}

catch
{

}


// Relase the lock

Monitor.Exit(GetCriticalSection);


// Increment the number of active threads

DecActThreadsInPool();

}

}


catch(Exception ex)
{
string str=ex.Message;

}


// Decrement the thread pool count

DecCurThreadsInPool();

}

//public void PostEvent(Int32 iValue
public void PostEvent(int iValue)
{

try
{

// Only add work if we are not disposing

if (IsDisposed == false)
{

unsafe
{

// Post an event into the IOCP Thread Pool

PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);

}


// Increment the number of item of work

IncCurWorkInPool();


// Get a lock

Monitor.Enter(GetCriticalSection);


try
{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)
{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)
{

if (IsDisposed == false)
{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();


// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}


catch
{

}


// Release the lock

Monitor.Exit(GetCriticalSection);

}

}


catch (Exception e)
{

throw e;

}


catch
{

throw new Exception("Unhandled Exception");

}

}

public void PostEvent()
{

try
{

// Only add work if we are not disposing

if (IsDisposed == false)
{

unsafe
{

// Post an event into the IOCP Thread Pool

PostQueuedCompletionStatus(GetHandle, 0, null, null);

}


// Increment the number of item of work

IncCurWorkInPool();


// Get a lock

Monitor.Enter(GetCriticalSection);


try

{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();


// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}


catch

{

}


// Release the lock

Monitor.Exit(GetCriticalSection);

}

}

catch

{

throw new Exception("Unhandled Exception");

}

}

}

}

在VC++中我几乎每一个Windows Service都是采用I/O完成端口。至于在C#中如何使用I/O完成端口,一直很少见人提及。 William Kennedy的三篇文章《IOCP Thread Pooling in C#》,对实现这种机制很有帮助,唯一美中不足的是,它只能把int数值压入完成端口,而无法像VC++中那样可以将接口指针/BSTR字符串等等转为OVERLAPPED*。我试了很多遍Marshal.PtrToStructure/StructureToPtr 和StringToBSTR,总是无法成功通过I/O完成端口传递string。

3.



<< Home

This page is powered by Blogger. Isn't yours?