The C++ framework for developing highly scalable, high performance servers on Windows platforms.

Example Servers - Thread Pool Simple Protocol Server

This example shows you how to build a server which works with a simple, CR LF terminated, line based protocol and uses a thread pool to perform all of its business logic. You might want to do this if your business logic involves long and slow operations or if it blocks for resources, such as database connections, etc. The basic structure is very similar to the Simple Protocol Server example and you should go and read about that first and have a good understanding of how everything fits together. This document will only cover the differences between the Simple Protocol Server example and this example.

This example is shipped with all licensed versions of The Server Framework and it requires the core server framework libraries (see here for licensing options). You can always download the latest version of this example from here; and although you will need the correct libraries to be able to build it you can look at the example code and see how it works and perhaps get ideas from it. A compiled, unicode release, build of this example is available on request if you require it for performance analysis of the framework.

This server includes two thread pools. The standard I/O thread pool that all of the other example servers use to drive their overlapped I/O and a separate thread pool that is used for the processing of business logic requests. The reason for this is that the I/O thread pool is of a fixed size so that, in versions of Windows prior to Windows Vista, we don't need to worry about tracking outstanding I/O requests on a per thread basis to determine when it's safe to shut down a thread. In these versions of Windows, when a thread that has outstanding overlapped I/O requests terminates all of those oustanding requests are cancelled. To be able to make an I/O thread pool that expands and contracts due to user demand and blocking calls would required that we tracked outstanding I/O requests. Instead we keep the I/O pool at a fixed size and have a separate pool of threads for our potentially blocking business logic operations. We pass data from one pool to another via a thread safe queue, which is implemeted using an /ref IOCP "I/O Completion Port". When we dispatch a work item from an I/O thread to the business logic pool the dispatch functionality uses a timeout to determine if it needs to spawn a new thread. If the business logic pool is busy with blocking calls the new work item will time out and a new thread will be spawned. Likewise monitoring occurs to reduce the size of the pool when threads are idle. See JetByteTools::Win32::CThreadPool for more details.

So, this server has a CThreadPool class which is a fairly complex beast. It inherits from several interfaces so that we can plug it in to the JetByteTools::Win32::CThreadPool member variable that it holds and which actually does all of the work of managing the pool of threads.

 class CThreadPool :
    private JetByteTools::Win32::IThreadPoolWorkerThreadFactory,
    private JetByteTools::Win32::IIOCPWorkerThreadCallback,
    private JetByteTools::Win32::IMonitorThreadPool
 {
    public:

       typedef JetByteTools::Win32::CThreadPool::ThreadCount ThreadCount;


       CThreadPool(
          JetByteTools::Win32::IProvideUserData &userDataProvider,
          const ThreadCount initialThreads,
          const ThreadCount minThreads,
          const ThreadCount maxThreads,
          const ThreadCount maxDormantThreads,
          const JetByteTools::Milliseconds poolMaintPeriod,
          const JetByteTools::Milliseconds dispatchTimeout);

       void Start();

       void WaitForShutdownToComplete(
          const JetByteTools::Milliseconds forceTerminationAfter = INFINITE);

IThreadPoolWorkerThreadFactory is responsible for creating and destroying thread pool, per thread, callbacks. and these are instances of IIOCPWorkerThreadCallback which actually do the work that the thread pool thread has to do. Our thread pool has no special per thread requirements for its thread pool callbacks and so simply returns itself as the instance of IIOCPWorkerThreadCallback each time that it is asked to create a callback.

The main work of each worker thread is done in the CThreadPool::Process() function, shown below:

 void CThreadPool::Process(
    const ULONG_PTR completionKey,
    const DWORD operation,
    OVERLAPPED *pOverlapped)
 {
    CSmartStreamSocket socket(reinterpret_cast<IStreamSocket *>(completionKey));

    try
    {
       switch(operation)
       {
          case ConnectionEstablished :

             OnConnectionEstablished(socket.GetRef());

          break;

          case ReadCompleted :
          {
             CSmartBuffer buffer(static_cast<IBuffer *>(pOverlapped));

             ProcessMessage(socket.GetRef(), buffer.GetRef());
          }
          break;
       }
    }
    catch(const CException &e)
    {
       OutputEx(_T("Process - Exception - ") + e.GetDetails());
       socket->Shutdown();
    }
    catch(...)
    {
       OutputEx(_T("Process - Unexpected exception"));
       socket->Shutdown();
    }
 }

As you can see, we simply access the data that has been passed to the thread and dispatch the required operation. We have several dispatch operations that the I/O threads can use to dispatch work items to this thread pool.

 void CThreadPool::DispatchConnectionEstablished(
    IStreamSocket &socket,
    const IAddress &address)
 {
    // Allocate per connection data

    CPerConnectionData *pData = new CPerConnectionData(CAddressRenderer::AsString(address, true));

    // And store our data in the socket...

    socket.SetUserPointer(m_userDataIndex, pData);

    DoDispatch(socket, 0, ConnectionEstablished);
 }

 void CThreadPool::DispatchReadCompleted(
    IStreamSocket &socket,
    IBuffer &buffer)
 {
    DoDispatch(socket, &buffer, ReadCompleted);
 }

All of which result in a call to DoDispatch() which dispatches the work item to the worker threads via the work item queue. Note that DispatchConnectionEstablished() allocates and associates per connection user data and that this is cleaned up in OnSocketReleased().

The work items that are dispatched by the code above are handled by the handlers shown below which CThreadPool::Process() dispatches to from its switch statement.

 void CThreadPool::OnConnectionEstablished(
    IStreamSocket &socket)
 {
    CPerConnectionData &data = GetPerConnectionData(socket);

    std::string welcomeMessage("+OK POP3 server ready - ");

    welcomeMessage = welcomeMessage + CStringConverter::TtoA(data.GetConnectionDetails()) + "\r\n";

    socket.Write(welcomeMessage.c_str(), GetStringLengthAsDWORD(welcomeMessage));

    socket.TryRead();
 }

 void CThreadPool::ProcessMessage(
    IStreamSocket &socket,
    const IBuffer &buffer) const
 {

And ProcessMessage() is where the main protocol handling work is done. It's similar to the version in the Simple Protocol Server example, but you can uncomment a Sleep() call to make the processing take a long time so that the thread pool will expand its number of threads to deal with the work load.

The important thing about dispatching work items to the thread pool is that when you dispatch the work item you increment the reference count for the socket and the buffer that you put into the queue and when you finish processing the item you decrement the count. This ensures that the objects are valid until you are done with them, failure to do this will cause access violation errors. In this example DoDispatch() deals with incrementing the reference counts.

 void CThreadPool::DoDispatch(
    IStreamSocket &socket,
    IBuffer *pBuffer,
    DispatchEvents event)
 {
    socket.AddRef();

    if (pBuffer)
    {
       pBuffer->AddRef();
    }
    else if (event != ConnectionClosing && event != ConnectionEstablished)
    {
       throw CException(_T("CThreadPool::DoDispatch()"), _T("Unexpected: Only ConnectionClosing and ConnectionEstablished events can have a null pBuffer"));
    }

    m_threadPool.Dispatch(
       reinterpret_cast<ULONG_PTR>(&socket),
       event,
       pBuffer);
 }

And the smart socket and buffers in CThreadPool::Process() deal with decrementing the reference counts when the work is complete.
If the server happens to be shutdown whilst there are work items in the queue to the business logic thread pool then the resources used by those work items need to be released. To allow for this the IThreadPoolWorkerThreadFactory interface provides a DisposeOfQueuedItemsAfterShutdown method that is called once for each work item that is currently in the queue after the thread pool is shutdown. Our implementation of this is fairly straight forward. We release the resources without doing any of the work.

 void CThreadPool::DisposeOfQueuedItemsAfterShutdown(
    const ULONG_PTR completionKey,
    const DWORD operation,
    OVERLAPPED *pOverlapped)
 {
    Output(_T("DisposeOfQueuedItemsAfterShutdown"));

    reinterpret_cast<IStreamSocket *>(completionKey)->Release();

    if (operation == ReadCompleted)
    {
       static_cast<IBuffer *>(pOverlapped)->Release();
    }
 }


Our server object simply dispatches work items to the thread pool.

Generated on Sun Sep 12 19:06:45 2021 for The Server Framework - v7.4 by doxygen 1.5.3