Windows 8 Registered I/O - Multi threaded RIO IOCP UDP Example Server

Page content

This article presents the fourth in my series of example servers using the Windows 8 Registered I/O Networking extensions, RIO. This example server, like the last example, uses the I/O Completion Port notification method to handle RIO completions, but where the last example used only a single thread to service the IOCP this one uses multiple thread to scale the load . I’ve been looking at the Windows 8 Registered I/O Networking Extensions since October when they first made an appearance as part of the Windows 8 Developer Preview. Whilst exploring and understanding the new API I spent some time putting together some simple UDP servers using the various notification styles that RIO provides. I then put together some equally simple UDP servers using the “traditional” APIs so that I could compare performance. This series of blog posts describes each of the example servers in turn. You can find an index to all of the articles about the Windows 8 Registered I/O example servers here.

Using an I/O Completion Port for RIO completions

As I mentioned back in October, there are three ways to receive completion notifications from RIO; polling, event driven and via an I/O Completion Port. Using an IOCP for RIO completions allows you to easily scale your completion handling across multiple threads as we do here and this is the first of my example servers that allows for more than one thread to be used to process completions.

Creating an IOCP driven RIO completion queue

We start by initialising things in the same way that we did with the earlier example RIO servers. In fact, this initialisation is identical to the previous IOCP example except for one thing.

int _tmain(int argc, _TCHAR* argv[])
{
   SetupTiming("RIO IOCP UDP");

   InitialiseWinsock();

   CreateRIOSocket();

   g_hIOCP = ::CreateIoCompletionPort(
      INVALID_HANDLE_VALUE,
      0,
      0,
      0);

   OVERLAPPED overlapped;

   RIO_NOTIFICATION_COMPLETION completionType;

   completionType.Type = RIO_IOCP_COMPLETION;
   completionType.Iocp.IocpHandle = g_hIOCP;
   completionType.Iocp.CompletionKey = (void*)1;
   completionType.Iocp.Overlapped = &overlapped;

   g_queue = g_rio.RIOCreateCompletionQueue(
      RIO_PENDING_RECVS,
      &completionType);

   if (g_queue == RIO_INVALID_CQ)
   {
      ErrorExit("RIOCreateCompletionQueue");
   }

With the previous design we passed 0 as the completion key. This server passes 1. This is an arbitrary change purely to allow us to post completions of 0 to cause all of the threads waiting on the completion queue to shut down. This is a common idiom with normal, non-RIO, IOCP designs as it’s more usual that the completion key is a pointer to a “per operation” data structure. A RIO design with multiple completion queues would likely use the completion key for “per queue” data.

Creating a RIO request queue

Creating the request queue and posting our receives is identical to the polled example. The only difference is how we handle the completions.

Starting our worker threads

This example is the first to use more than just a single thread. Because of this we need to have a way to start and manage our worker threads and for them to communicate with each other and the main thread.

   CreateIOCPThreads(NUM_IOCP_THREADS);

   INT notifyResult = g_rio.RIONotify(g_queue);

   if (notifyResult != ERROR_SUCCESS)
   {
      ErrorExit("RIONotify", notifyResult);
   }

   WaitForProcessingStarted();

First we call CreateIOCPThreads(), which is shown below, this creates some events that the threads will use to communicate and then creates and starts the threads themselves. As with the earlier examples, we use globals for convenience and not as an example of good design.

inline void CreateIOCPThreads(
   const DWORD numThreads)
{
   g_hStartedEvent = ::CreateEvent(0, TRUE, FALSE, 0);

   if (0 == g_hStartedEvent)
   {
      ErrorExit("CreateEvent");
   }

   g_hStoppedEvent = ::CreateEvent(0, TRUE, FALSE, 0);

   if (0 == g_hStoppedEvent)
   {
      ErrorExit("CreateEvent");
   }

   // Start our worker threads

   for (DWORD i = 0; i < numThreads; ++i)
   {
      unsigned int notUsed;

      const uintptr_t result = ::_beginthreadex(
         0,
         0,
         ThreadFunction,
         0,
         0,
         &notUsed);

      if (result == 0)
      {
         ErrorExit("_beginthreadex", errno);
      }

      g_threads.push_back(reinterpret_cast<handle>(result));
   }

   cout << numThreads << " threads running" << endl;
}

The main thread then calls RIONotify() to enable notifications and then waits for the first datagram to be processed before it starts the timer.

inline void WaitForProcessingStarted()
{
   if (WAIT_OBJECT_0 != ::WaitForSingleObject(
      g_hStartedEvent,
      INFINITE))
   {
      ErrorExit("WaitForSingleObject");
   }

   StartTiming();
}

Calling RIODequeueCompletion() and processing results

This example’s processing loop is similar to the previous examples, especially the single threaded IOCP example server. It’s slightly more complicated due to the fact that it’s being run on a separate thread.

unsigned int __stdcall ThreadFunction(
   void *pV)
{
   DWORD numberOfBytes = 0;

   ULONG_PTR completionKey = 0;

   OVERLAPPED *pOverlapped = 0;

   const DWORD recvFlags = 0;

   if (!::GetQueuedCompletionStatus(
      g_hIOCP,
      &numberOfBytes,
      &completionKey,
      &pOverlapped,
      INFINITE))
   {
      ErrorExit("GetQueuedCompletionStatus");
   }

   int workValue = 0;

   if (completionKey == 1)
   {
      RIORESULT results[RIO_MAX_RESULTS];

      bool done = false;

      ::SetEvent(g_hStartedEvent);

      ULONG numResults = g_rio.RIODequeueCompletion(
         g_queue,
         results,
         RIO_MAX_RESULTS);

      if (0 == numResults ||
          RIO_CORRUPT_CQ == numResults)
      {
         ErrorExit("RIODequeueCompletion");
      }

      INT notifyResult = g_rio.RIONotify(g_queue);

      if (notifyResult != ERROR_SUCCESS)
      {
         ErrorExit("RIONotify", notifyResult);
      }

      do
      {
         for (DWORD i = 0; i < numResults; ++i)
         {
            EXTENDED_RIO_BUF *pBuffer = reinterpret_cast<EXTENDED_RIO_BUF *>(results[i].RequestContext);

            if (results[i].BytesTransferred == EXPECTED_DATA_SIZE)
            {
               ::InterlockedIncrement(&g_packets);

               workValue += DoWork(g_workIterations);

               ::EnterCriticalSection(&g_criticalSection);

               if (!g_rio.RIOReceive(
                  g_requestQueue,
                  pBuffer,
                  1,
                  recvFlags,
                  pBuffer))
               {
                  ErrorExit("RIOReceive");
               }

               ::LeaveCriticalSection(&g_criticalSection);

               done = false;
            }
            else
            {
               done = true;
            }
         }

         if (!done)
         {
            if (!::GetQueuedCompletionStatus(
               g_hIOCP,
               &numberOfBytes,
               &completionKey,
               &pOverlapped,
               INFINITE))
            {
               ErrorExit("GetQueuedCompletionStatus");
            }

            if (completionKey == 0)
            {
               done = true;
            }
            else
            {
               numResults = g_rio.RIODequeueCompletion(
                  g_queue,
                  results,
                  RIO_MAX_RESULTS);

               if (0 == numResults ||
                   RIO_CORRUPT_CQ == numResults)
               {
                  ErrorExit("RIODequeueCompletion");
               }

               INT notifyResult = g_rio.RIONotify(g_queue);

               if (notifyResult != ERROR_SUCCESS)
               {
                  ErrorExit("RIONotify", notifyResult);
               }
            }
         }
      }
      while (!done);
   }

   ::SetEvent(g_hStoppedEvent);

   return workValue;
}

The first thing we do is wait for a completion. Once we have a completion we dequeue the results and then call RIONotify() to allow more completions to occur. It’s important to realise that until we call RIONotify() no further completions will be posted to the I/O Completion Port and that this effectively acts as synchronisation around the calls to RIODequeueCompletion(). With this design only one thread can ever be calling RIODequeueCompletion() at a time, which is a good thing as the documentation for states that this is a requirement for users of the API.

An update to the original RIO IOCP server design

Updated: 29/08/2012 - The original RIO IOCP server design that I presented back in March had a bug in it, which is now fixed. The bug was due to a lack of detail in the RIO API documentation and an assumption on my part. It seems that calling RIOReceive() on a given request queue is not thread safe. With our IOCP server design where we call RIONotify() as soon as we have dequeued the available completions and then call RIOReceive() each time we have finished with a datagram and can issue another read into the buffer that is now available it is likely that multiple threads are calling RIOReceive() on the same request queue at the same time. I’ve witnessed some failures due to the number of reads permitted being exceeded, and also performance degradations. Both of these issues are fixed by locking around the calls to RIOReceive(); in a design which used multiple request queues you would have one lock per queue. The locking causes some inter-thread contention but the API does not appear to be able to be used without it. It would be useful if the documentation were to be explicit about this.

Remember that this example is made more complex due to the way we profile the servers. See the explanation in the completion handling section of the polled RIO server example for details of why this is.

Shutting down and displaying results

Whilst our worker threads are processing datagrams our main thread is waiting for the performance test to end.

   WaitForProcessingStopped();

   StopIOCPThreads();

   PrintTimings();

   return 0;
}

Our thread function’s main loop can exit in two ways. Firstly because a datagram arrives that isn’t of the expected size, thus signalling the end of the performance test, and secondly if GetQueuedCompletionStatus() returns a completion key of 0 which means that the main thread has posted completions to request that we shut down. This means that the first time a “shutdown” datagram arrives the first thread that begins processing it will shut down and set the g_hStoppedEvent event. The main thread is currently waiting for this event, and will wake when the event is set and shut the rest of the worker threads down. Once all of the threads have terminated the main thread will display details of the datagrams received and the test timings.

inline void WaitForProcessingStopped()
{
   if (WAIT_OBJECT_0 != ::WaitForSingleObject(
      g_hStoppedEvent,
      INFINITE))
   {
      ErrorExit("WaitForSingleObject");
   }

   StopTiming();
}

inline void StopIOCPThreads()
{
   // Tell all threads to exit

   for (Threads::const_iterator it = g_threads.begin(),
      end = g_threads.end();
      it != end;
      ++it)
   {
      if (0 == ::PostQueuedCompletionStatus(
         g_hIOCP,
         0,
         0,
         0))
      {
         ErrorExit("PostQueuedCompletionStatus");
      }
   }

   cout << "Threads stopping" << endl;

   // Wait for all threads to exit

   for (Threads::const_iterator it = g_threads.begin(),
      end = g_threads.end();
      it != end;
      ++it)
   {
      HANDLE hThread = *it;

      if (WAIT_OBJECT_0 != ::WaitForSingleObject(
         hThread,
         INFINITE))
      {
         ErrorExit("WaitForSingleObject");
      }

      ::CloseHandle(hThread);
   }

   cout << "Threads stopped" << endl;
}

Unexpected performance issues…

The slight problem with this design is that it’s not actually as performant in some scenarios as we might like it to be. As it stands, the fact that we can scale out across multiple threads is a plus point but the fact that the operations that we have to perform to achieve that scaling are considerably more expensive is a problem. This is more of an issue when we’re looking for a general purpose solution which works as well for low throughput and fast processing of each datagram as it does for high throughput and/or slow processing. Luckily there are a couple of things we can do to fix this, but we’ll look at those once we’ve done some performance comparisons and seen the problems first hand.

The code for this example can be downloaded from here. This code requires Visual Studio 11, but would work with earlier compilers if you have a Windows SDK that supports RIO. Note that Shared.h and Constants.h contain helper functions and tuning constants for ALL of the examples and so there will be code in there that is not used by this example. You should be able to unzip each example into the same directory structure so that they all share the same shared headers. This allows you to tune all of the examples the same so that any performance comparisons make sense.

Join in

Comments and suggestions are more than welcome. I’m learning as I go here and I’m quite likely to have made some mistakes or come up with some erroneous conclusions, feel free to put me straight and help make these examples better.

Code is here

Code - updated 15th April 2023

Full source can be found here on GitHub.

This isn’t production code, error handling is simply “panic and run away”.

This code is licensed with the MIT license.