Windows 8 Registered I/O - Multi threaded RIO IOCP UDP Example Server

| 6 Comments
This article presents the fourth in my series of example servers using the Windows 8 Registered I/O Networking extensions, RIO. This example server, like the last example, uses the I/O Completion Port notification method to handle RIO completions, but where the last example used only a single thread to service the IOCP this one uses multiple thread to scale the load . I've been looking at the Windows 8 Registered I/O Networking Extensions since October when they first made an appearance as part of the Windows 8 Developer Preview. Whilst exploring and understanding the new API I spent some time putting together some simple UDP servers using the various notification styles that RIO provides. I then put together some equally simple UDP servers using the "traditional" APIs so that I could compare performance. This series of blog posts describes each of the example servers in turn. You can find an index to all of the articles about the Windows 8 Registered I/O example servers here.

Using an I/O Completion Port for RIO completions

As I mentioned back in October, there are three ways to receive completion notifications from RIO; polling, event driven and via an I/O Completion Port. Using an IOCP for RIO completions allows you to easily scale your completion handling across multiple threads as we do here and this is the first of my example servers that allows for more than one thread to be used to process completions.

Creating an IOCP driven RIO completion queue

We start by initialising things in the same way that we did with the earlier example RIO servers. In fact, this initialisation is identical to the previous IOCP example except for one thing.
int _tmain(int argc, _TCHAR* argv[])
{
   SetupTiming("RIO IOCP UDP");

   InitialiseWinsock();

   CreateRIOSocket();

   g_hIOCP = ::CreateIoCompletionPort(
      INVALID_HANDLE_VALUE,
      0,
      0,
      0);

   OVERLAPPED overlapped;

   RIO_NOTIFICATION_COMPLETION completionType;

   completionType.Type = RIO_IOCP_COMPLETION;
   completionType.Iocp.IocpHandle = g_hIOCP;
   completionType.Iocp.CompletionKey = (void*)1;
   completionType.Iocp.Overlapped = &overlapped;

   g_queue = g_rio.RIOCreateCompletionQueue(
      RIO_PENDING_RECVS,
      &completionType);

   if (g_queue == RIO_INVALID_CQ)
   {
      ErrorExit("RIOCreateCompletionQueue");
   }
With the previous design we passed 0 as the completion key. This server passes 1. This is an arbitrary change purely to allow us to post completions of 0 to cause all of the threads waiting on the completion queue to shut down. This is a common idiom with normal, non-RIO, IOCP designs as it's more usual that the completion key is a pointer to a "per operation" data structure. A RIO design with multiple completion queues would likely use the completion key for "per queue" data.

Creating a RIO request queue

Creating the request queue and posting our receives is identical to the polled example. The only difference is how we handle the completions.

Starting our worker threads

This example is the first to use more than just a single thread. Because of this we need to have a way to start and manage our worker threads and for them to communicate with each other and the main thread.
   CreateIOCPThreads(NUM_IOCP_THREADS);

   INT notifyResult = g_rio.RIONotify(g_queue);

   if (notifyResult != ERROR_SUCCESS)
   {
      ErrorExit("RIONotify", notifyResult);
   }

   WaitForProcessingStarted();
First we call CreateIOCPThreads(), which is shown below, this creates some events that the threads will use to communicate and then creates and starts the threads themselves. As with the earlier examples, we use globals for convenience and not as an example of good design.
inline void CreateIOCPThreads(
   const DWORD numThreads)
{
   g_hStartedEvent = ::CreateEvent(0, TRUE, FALSE, 0);

   if (0 == g_hStartedEvent)
   {
      ErrorExit("CreateEvent");
   }

   g_hStoppedEvent = ::CreateEvent(0, TRUE, FALSE, 0);

   if (0 == g_hStoppedEvent)
   {
      ErrorExit("CreateEvent");
   }

   // Start our worker threads

   for (DWORD i = 0; i < numThreads; ++i)
   {
      unsigned int notUsed;

      const uintptr_t result = ::_beginthreadex(
         0,
         0,
         ThreadFunction,
         0,
         0,
         &notUsed);

      if (result == 0)
      {
         ErrorExit("_beginthreadex", errno);
      }

      g_threads.push_back(reinterpret_cast<handle>(result));
   }

   cout << numThreads << " threads running" << endl;
}
The main thread then calls RIONotify() to enable notifications and then waits for the first datagram to be processed before it starts the timer.
inline void WaitForProcessingStarted()
{
   if (WAIT_OBJECT_0 != ::WaitForSingleObject(
      g_hStartedEvent,
      INFINITE))
   {
      ErrorExit("WaitForSingleObject");
   }

   StartTiming();
}

Calling RIODequeueCompletion() and processing results

This example's processing loop is similar to the previous examples, especially the single threaded IOCP example server. It's slightly more complicated due to the fact that it's being run on a separate thread.
unsigned int __stdcall ThreadFunction(
   void *pV)
{
   DWORD numberOfBytes = 0;

   ULONG_PTR completionKey = 0;

   OVERLAPPED *pOverlapped = 0;

   const DWORD recvFlags = 0;

   if (!::GetQueuedCompletionStatus(
      g_hIOCP,
      &numberOfBytes,
      &completionKey,
      &pOverlapped,
      INFINITE))
   {
      ErrorExit("GetQueuedCompletionStatus");
   }

   int workValue = 0;

   if (completionKey == 1)
   {
      RIORESULT results[RIO_MAX_RESULTS];

      bool done = false;

      ::SetEvent(g_hStartedEvent);

      ULONG numResults = g_rio.RIODequeueCompletion(
         g_queue,
         results,
         RIO_MAX_RESULTS);

      if (0 == numResults ||
          RIO_CORRUPT_CQ == numResults)
      {
         ErrorExit("RIODequeueCompletion");
      }

      INT notifyResult = g_rio.RIONotify(g_queue);

      if (notifyResult != ERROR_SUCCESS)
      {
         ErrorExit("RIONotify", notifyResult);
      }

      do
      {
         for (DWORD i = 0; i < numResults; ++i)
         {
            EXTENDED_RIO_BUF *pBuffer = reinterpret_cast<EXTENDED_RIO_BUF *>(results[i].RequestContext);

            if (results[i].BytesTransferred == EXPECTED_DATA_SIZE)
            {
               ::InterlockedIncrement(&g_packets);

               workValue += DoWork(g_workIterations);

               ::EnterCriticalSection(&g_criticalSection);

               if (!g_rio.RIOReceive(
                  g_requestQueue,
                  pBuffer,
                  1,
                  recvFlags,
                  pBuffer))
               {
                  ErrorExit("RIOReceive");
               }

               ::LeaveCriticalSection(&g_criticalSection);


               done = false;
            }
            else
            {
               done = true;
            }
         }

         if (!done)
         {
            if (!::GetQueuedCompletionStatus(
               g_hIOCP,
               &numberOfBytes,
               &completionKey,
               &pOverlapped,
               INFINITE))
            {
               ErrorExit("GetQueuedCompletionStatus");
            }

            if (completionKey == 0)
            {
               done = true;
            }
            else
            {
               numResults = g_rio.RIODequeueCompletion(
                  g_queue,
                  results,
                  RIO_MAX_RESULTS);

               if (0 == numResults ||
                   RIO_CORRUPT_CQ == numResults)
               {
                  ErrorExit("RIODequeueCompletion");
               }

               INT notifyResult = g_rio.RIONotify(g_queue);

               if (notifyResult != ERROR_SUCCESS)
               {
                  ErrorExit("RIONotify", notifyResult);
               }
            }
         }
      }
      while (!done);
   }

   ::SetEvent(g_hStoppedEvent);

   return workValue;
}
The first thing we do is wait for a completion. Once we have a completion we dequeue the results and then call RIONotify() to allow more completions to occur. It's important to realise that until we call RIONotify() no further completions will be posted to the I/O Completion Port and that this effectively acts as synchronisation around the calls to RIODequeueCompletion(). With this design only one thread can ever be calling RIODequeueCompletion() at a time, which is a good thing as the documentation for RIODequeueCompletion() states that this is a requirement for users of the API.

An update to the original RIO IOCP server design

Updated: 29/08/2012 - The original RIO IOCP server design that I presented back in March had a bug in it, which is now fixed. The bug was due to a lack of detail in the RIO API documentation and an assumption on my part. It seems that calling RIOReceive() on a given request queue is not thread safe. With our IOCP server design where we call RIONotify() as soon as we have dequeued the available completions and then call RIOReceive() each time we have finished with a datagram and can issue another read into the buffer that is now available it is likely that multiple threads are calling RIOReceive() on the same request queue at the same time. I've witnessed some failures due to the number of reads permitted being exceeded, and also performance degradations. Both of these issues are fixed by locking around the calls to RIOReceive(); in a design which used multiple request queues you would have one lock per queue. The locking causes some inter-thread contention but the API does not appear to be able to be used without it. It would be useful if the documentation were to be explicit about this.

Remember that this example is made more complex due to the way we profile the servers. See the explanation in the completion handling section of the polled RIO server example for details of why this is.

Shutting down and displaying results

Whilst our worker threads are processing datagrams our main thread is waiting for the performance test to end.
   WaitForProcessingStopped();

   StopIOCPThreads();

   PrintTimings();

   return 0;
}
Our thread function's main loop can exit in two ways. Firstly because a datagram arrives that isn't of the expected size, thus signalling the end of the performance test, and secondly if GetQueuedCompletionStatus() returns a completion key of 0 which means that the main thread has posted completions to request that we shut down. This means that the first time a "shutdown" datagram arrives the first thread that begins processing it will shut down and set the g_hStoppedEvent event. The main thread is currently waiting for this event, and will wake when the event is set and shut the rest of the worker threads down. Once all of the threads have terminated the main thread will display details of the datagrams received and the test timings.
inline void WaitForProcessingStopped()
{
   if (WAIT_OBJECT_0 != ::WaitForSingleObject(
      g_hStoppedEvent,
      INFINITE))
   {
      ErrorExit("WaitForSingleObject");
   }

   StopTiming();
}

inline void StopIOCPThreads()
{
   // Tell all threads to exit

   for (Threads::const_iterator it = g_threads.begin(),
      end = g_threads.end();
      it != end;
      ++it)
   {
      if (0 == ::PostQueuedCompletionStatus(
         g_hIOCP,
         0,
         0,
         0))
      {
         ErrorExit("PostQueuedCompletionStatus");
      }
   }

   cout << "Threads stopping" << endl;

   // Wait for all threads to exit

   for (Threads::const_iterator it = g_threads.begin(),
      end = g_threads.end();
      it != end;
      ++it)
   {
      HANDLE hThread = *it;

      if (WAIT_OBJECT_0 != ::WaitForSingleObject(
         hThread,
         INFINITE))
      {
         ErrorExit("WaitForSingleObject");
      }

      ::CloseHandle(hThread);
   }   

   cout << "Threads stopped" << endl;
}

Unexpected performance issues...

The slight problem with this design is that it's not actually as performant in some scenarios as we might like it to be. As it stands, the fact that we can scale out across multiple threads is a plus point but the fact that the operations that we have to perform to achieve that scaling are considerably more expensive is a problem. This is more of an issue when we're looking for a general purpose solution which works as well for low throughput and fast processing of each datagram as it does for high throughput and/or slow processing. Luckily there are a couple of things we can do to fix this, but we'll look at those once we've done some performance comparisons and seen the problems first hand.

The code for this example can be downloaded from here. This code requires Visual Studio 11, but would work with earlier compilers if you have a Windows SDK that supports RIO. Note that Shared.h and Constants.h contain helper functions and tuning constants for ALL of the examples and so there will be code in there that is not used by this example. You should be able to unzip each example into the same directory structure so that they all share the same shared headers. This allows you to tune all of the examples the same so that any performance comparisons make sense.

Join in

Comments and suggestions are more than welcome. I'm learning as I go here and I'm quite likely to have made some mistakes or come up with some erroneous conclusions, feel free to put me straight and help make these examples better.

6 Comments

Hello Len, great post as always!
I have some questions:

1) Since you use locking for `RIOReceive()`, does `RIODequeueCompletion` need to be interlocked with a CRITICAL_SECTION or a slim reader-writer lock too?

Quoting the MSDN page of that API:

"Note For purposes of efficiency, access to the completion queues (RIO_CQ structs) and request queues (RIO_RQ structs) are not protected by synchronization primitives. If you need to access a completion or request queue from multiple threads, access should be coordinated by a critical section, slim reader write lock or similar mechanism. This locking is not needed for access by a single thread. Different threads can access separate requests/completion queues without locks. The need for synchronization occurs only when multiple threads try to access the same queue. Synchronization is also required if multiple threads issue sends and receives on the same socket because the send and receive operations use the socket’s request queue."

So, why don't you use a locking mechanism when you call `RIODequeueCompletion`?
Isn't it needed there, since multiple threads can receive the IOCP packet which indicate that there is something in the RIO queue that can be dequeued, and since MSDN says "Different threads can access separate requests/completion queues without locks."?

Maybe you don't use locking here because of IOCPs, since it cannot notify again the IOCP handle until you call `RIONotify()` again, you're pretty sure that only 1 thread call `RIODequeueCompletion()`.
2) Is that the reason?

If that is true, then using only 1 RIO completion queue in a multithreaded IOCP environment, won't be really multithreaded, since only 1 of the N threads will dequeue RIO completions.
So, you have to use N RIO completion queues, one for each IOCP thread. And, in such a case, you won't need locking for `RIODequeueCompletion()`, because every IOCP thread will have its own RIO completion queue, and MSDN clearly says "This locking is not needed for access by a single thread.".
3) Is this assumption correct?

4) Do you need locking for `RIODequeueCompletion()` when using multiple threads dequeueing from the same RIO completion queue with the other 2 methods, poll and evented loops? I really believe so, and MSDN says it clearly.
---

Sorry for all of this questions, but RIO APIs are still little used and documented, and you seem the only one who extensively tried them. Thanks for your blog and effort :-)

Each queue needs its own synchronisation. So we lock around calls to RIORecveive() as that's called from multiple threads and we have to protect the request queue. The completion queue is only accessed from a single thread at a time. We wait on the IOCP with multiple threads but the RIO design means that once a notification has been posted to the IOCP no more notifications will be posted until you call RIONotify() again. So, effectively, only one thread can ever access the completion queue at a time and so we don't need explicit locking.

The designs that I presented use multiple threads to process the completions but they never allow multiple threads to dequeue completions at the same time - thus they're safe.

If you were to come up with a polled design that uses multiple polling threads then YES you would need some form of synchronisation. In my opinion this would completely defeat the object of the polling design. Likewise the event driven method would also require synchronisation but is also (IMHO) designed for single threaded use anyway...

You can see the results of the various design trade offs on the results page and the IOCP based designs are more efficient and use more threads.

I'm pleased that they updated the MSDN documentation to state explicitly that locking was required as this took me a long time to track down when I was originally investigating the API.

I totally agree with you about the point that if we have multiple threads (event or poll) which do read the same RIO completion queue (so we need locking), that will completely nullify the benefits of RIO, and even multithreading in general, I'd say.
So, a RIO completion queue should always be single threaded, or: only 1 thread should read completions from it at a time.

I also agree the best results are N RIO completion queues, completed inside IOCP loops. Then you associate your RIO request queues with one of those N RIO completion queues.

I said N, because if you use only 1 RIO completion queue, you won't exploit really the multithreading benefits within the IOCP loops. With N, you'll get that each thread of the IOCP threadpool will complete 1 and only 1 RIO completion queue requests at a time, when there are RIO requests pending.

The only drawback is that you have to distribute your socket's RIO request queues across the N RIO completion queues you instance. E.g. a completion queue may have few associated request queues (few sockets) than another one, at any point of time.

I also believe that having N RIO completion queues (event or poll), with only 1 thread looping for each one of them, could also be ok. So you do not need locking there. Btw, you still get the afore mentioned drawback of distribution.

Said that, a question is beating me:
Let's consider an UDP server in IOCP, you allocate N OVERLAPPEDs structure, and you make them pending with WSARecvFrom(), waiting for data arrival. When datagrams will arrive, a multithreaded IOCP facility will complete them, and you'll get N threads working with N completed UDP reads, for the *same* UDP socket, in the *same* time.

With RIO you can't get this. If your UDP server socket can be associated with only 1 RIO request queue, and that request queue will be associated with only 1 RIO completion queue, which will be single threaded as we said before, you won't be able to process N UDP packets in the same time in N threads, but only 1 in 1 thread. In this scenario it's even just not convenient having N pending UDP reads, but only 1, I guess.

At first glance, I thought you could have been able to associate multiple RIO request queue to the same socket, so multiple RIO completion queues (single threaded) could have been processed multiple UDP packets of the same UDP socket, in several threads. But this seems not possible: MSDN says nothing about this, but some tests tell me that the association of RIO request queues to sockets is 1:1.

What am I missing?

I don't think your first conclusion leads to your second.

Yes, you can only manipulate a completion queue from one thread without explicit synchronisation and yes adding such synchronisation could impair performance but the IOCP design already gives a way to achieve this and still allow multiple threads to process completions. You simply have a single completion queue attached to a single IOCP with multiple threads and each thread retrieves a batch of completions and then calls Notify() to allow another IOCP thread to step in and retrieve more completions if necessary; mixing this with polling may make it more efficient when the workload is lower. See the designs here: http://www.serverframework.com/asynchronousevents/2012/08/winsock-registered-io-io-completion-port-performance.html that use GQCSEx().

This then moves the problem from completion processing to handling the issuing of new receives to the same request queue... With TCP there's certainly scope to split sockets across multiple request queues. With UDP and a single server-side socket probably the best you can do is some kind of lock-free queue that's used to pass requests from the IOCP threads to a dedicated thread which issues requests without synchronisation. I'm assuming here that there's no need to synchronise multiple request queues just because they are associated with the same completion queue.

The complexity (certainly from trying to write generic framework code) is how you determine the queue sizes and split things accordingly. With UDP it's reasonably easy with TCP not so.

is there any way where i dont want to get any send notification just normal send using RIOsend nd RIO buffers.??please Reply.!

If you didn't get a send completion notification how would you know that you could re-use the buffer that you had sent in?

Leave a comment

Follow us on Twitter: @ServerFramework

About this Entry

Windows 8 Registered I/O - Single threaded RIO IOCP UDP Example Server was the previous entry in this blog.

Windows 8 Registered I/O - Traditional Polled UDP Example Server is the next entry in this blog.

I usually write about the development of The Server Framework, a super scalable, high performance, C++, I/O Completion Port based framework for writing servers and clients on Windows platforms.

Find recent content on the main index or look in the archives to find all content.