Windows 8 Registered I/O - Simple RIO Polled UDP Example Server

Page content

I’ve been looking at the Windows 8 Registered I/O Networking Extensions since October when they first made an appearance as part of the Windows 8 Developer Preview. Whilst exploring and understanding the new API I spent some time putting together some simple UDP servers using the various notification styles that RIO provides. I then put together some equally simple UDP servers using the “traditional” APIs so that I could compare performance. This series of blog posts describes each of the example servers in turn. You can find an index to all of the articles about the Windows 8 Registered I/O example servers here.

Polling RIO for completions

As I mentioned back in October, there are three ways to receive completion notifications from RIO; polling, event driven and via an I/O Completion Port. The first is the simplest though it burns CPU time even when no datagrams are being received.

At its simplest a polled RIO server obtains datagrams to process like this:

   RIORESULT results[RIO_MAX_RESULTS];

   ULONG numResults = 0;

   do
   {
      numResults = g_rio.RIODequeueCompletion(
         g_queue,
         results,
         RIO_MAX_RESULTS);

      if (0 == numResults)
      {
         YieldProcessor();
      }
      else if (RIO_CORRUPT_CQ == numResults)
      {
         ErrorExit("RIODequeueCompletion");
      }
   }
   while (0 == numResults);

You then loop over the results array and process each result in turn before looping back to dequeue more completions.

Getting to the point where you can call RIODequeueCompletion() takes a bit of setting up though…

Creating a RIO completion queue

The examples are each stand alone but can share two common header files. The first, Constants.h, contains all constants that are used to tune the examples. The second, Shared.h, contains inline helper functions which hide some of the complexity and allow the individual example programs to focus on the area of the API that they’re demonstrating. We use several of these helper functions as we prepare to create our RIO completion queue.

int _tmain(int argc, _TCHAR* argv[])
{
   SetupTiming("RIO polled UDP");

   InitialiseWinsock();

   CreateRIOSocket();

   g_queue = g_rio.RIOCreateCompletionQueue(RIO_PENDING_RECVS, 0);

   if (g_queue == RIO_INVALID_CQ)
   {
      ErrorExit("RIOCreateCompletionQueue");
   }

So that we can compare the performance of these examples we first call SetupTiming(), this prepares us for calling StartTiming() and StopTiming() later in the program. SetupTiming() locks this thread to a single CPU, using SetThreadAffinityMask() as recommended by the help for QueryPerformanceFrequency(), once this is done we call QueryPerformanceFrequency() and store the resulting value for use by StopTiming().

With that done we initialise Winsock and then create our RIO socket. Note that these examples use quite a few global variables for convenience. This isn’t how I would suggest you write production code, but it’s convenient for these examples as it makes much of the code simpler and allows us to focus on the RIO API. For example, CreateRIOSocket() does the following, creating a socket and assigning it to g_s, we then bind the socket to PORT, which is a constant that is defined in Constants.h, and then initialising the RIO API function table so that we can use it through g_rio.

inline void CreateRIOSocket()
{
   g_s = CreateSocket(WSA_FLAG_REGISTERED_IO);

   Bind(g_s, PORT);

   InitialiseRIO(g_s);
}

inline SOCKET CreateSocket(
   const DWORD flags = 0)
{
   g_s = ::WSASocket(
      AF_INET,
      SOCK_DGRAM,
      IPPROTO_UDP,
      NULL,
      0,
      flags);

   if (g_s == INVALID_SOCKET)
   {
      ErrorExit("WSASocket");
   }

   return g_s;
}

inline void InitialiseRIO(
   SOCKET s)
{
   GUID functionTableId = WSAID_MULTIPLE_RIO;

   DWORD dwBytes = 0;

   bool ok = true;

   if (0 != WSAIoctl(
      s,
      SIO_GET_MULTIPLE_EXTENSION_FUNCTION_POINTER,
      &functionTableId,
      sizeof(GUID),
      (void**)&g_rio,
      sizeof(g_rio),
      &dwBytes,
      0,
      0))
   {
      ErrorExit("WSAIoctl");
   }
}

As you can see, we check all API calls for failure and report errors via our ErrorExit() functions.

Finally we can create the RIO completion queue. We use the tunable constant RIO_PENDING_RECVS to specify how large the queue should be.

Creating a RIO request queue

Setting up a RIO request queue seems to be one of the few places where RIO API changes between the Windows 8 Developer Preview and the Windows 8 Server Beta are visible to us. With the Developer Preview we could pass any value for the maxReceiveDataBuffers and maxSendDataBuffers whereas in the beta RIOCreateRequestQueue() only accepts a value of 1. See my earlier posting on RIO for more details, but the documentation and the API are now in sync and the API currently doesn’t support scatter/gather I/O.

   ULONG maxOutstandingReceive = RIO_PENDING_RECVS;
   ULONG maxReceiveDataBuffers = 1;
   ULONG maxOutstandingSend = 0;
   ULONG maxSendDataBuffers = 1;

   void *pContext = 0;

   g_requestQueue = g_rio.RIOCreateRequestQueue(
      g_s,
      maxOutstandingReceive,
      maxReceiveDataBuffers,
      maxOutstandingSend,
      maxSendDataBuffers,
      g_queue,
      g_queue,
      pContext);

   if (g_requestQueue == RIO_INVALID_RQ)
   {
      ErrorExit("RIOCreateRequestQueue");
   }

   PostRIORecvs(RECV_BUFFER_SIZE, RIO_PENDING_RECVS);

Once the request queue has been created we can post some read requests. Note that we specify the size of the buffers to use and the number of receives that we want to have pending, both of these values can be changed easily in Constants.h.

Registering buffers and posting RIO read requests

Before we can issue some read requests we need to register some I/O buffers. The PostRIORecvs() function is complicated by the fact that we’re trying to keep things simple (!) and by the fact that the example is tunable for different buffer sizes and the number of receives that we may want pending. We loop allocating and registering buffers and then slicing the buffer into buffer slices. We use an extended RIO_BUF structure so that we can pass an “operation” code with each buffer slice. These examples don’t use this operation, but a real server might need to pass additional information with each I/O request, especially if it’s using a single completion queue for reads and writes. We deliberately leak our EXTENDED_RIO_BUF structures but this isn’t much of a problem in this example as they’re in use from this point until the program exits.

inline void PostRIORecvs(
   const DWORD recvBufferSize,
   const DWORD pendingRecvs)
{
   DWORD totalBuffersAllocated = 0;

   while (totalBuffersAllocated < pendingRecvs)
   {
      DWORD bufferSize = 0;

      DWORD receiveBuffersAllocated = 0;

      char *pBuffer = AllocateBufferSpace(
         recvBufferSize,
         pendingRecvs,
         bufferSize,
         receiveBuffersAllocated);

      totalBuffersAllocated += receiveBuffersAllocated;

      RIO_BUFFERID id = g_rio.RIORegisterBuffer(
         pBuffer,
         static_cast<DWORD>(bufferSize));

      if (id == RIO_INVALID_BUFFERID)
      {
         ErrorExit("RIORegisterBuffer");
      }

      DWORD offset = 0;

      const DWORD recvFlags = 0;

      EXTENDED_RIO_BUF *pBufs = new EXTENDED_RIO_BUF[receiveBuffersAllocated];

      for (DWORD i = 0; i < receiveBuffersAllocated; ++i)
      {
         // now split into buffer slices and post our recvs

         EXTENDED_RIO_BUF *pBuffer = pBufs + i;

         pBuffer->operation = 0;
         pBuffer->BufferId = id;
         pBuffer->Offset = offset;
         pBuffer->Length = recvBufferSize;

         offset += recvBufferSize;

         if (!g_rio.RIOReceive(g_requestQueue, pBuffer, 1, recvFlags, pBuffer))
         {
            ErrorExit("RIOReceive");
         }
      }

      if (totalBuffersAllocated != pendingRecvs)
      {
         cout << pendingRecvs << " receives pending" << endl;
      }
   }

   cout << totalBuffersAllocated << " total receives pending" << endl;
}

Before we can register our I/O buffer we need to allocate the memory that we will be using. As I mentioned back in October when I was first looking at the RIO API, it’s important to allocate your I/O buffer memory in a particular way so that you use memory efficiently for RIO’s registered I/O buffers.

inline char *AllocateBufferSpace(
   const DWORD recvBufferSize,
   const DWORD pendingRecvs,
   DWORD &bufferSize,
   DWORD &receiveBuffersAllocated)
{
   const DWORD preferredNumaNode = 0;

   const SIZE_T largePageMinimum = USE_LARGE_PAGES ? ::GetLargePageMinimum() : 0;

   SYSTEM_INFO systemInfo;

   ::GetSystemInfo(&systemInfo);

   systemInfo.dwAllocationGranularity;

   const unsigned __int64 granularity = (largePageMinimum == 0 ? systemInfo.dwAllocationGranularity : largePageMinimum);

   const unsigned __int64 desiredSize = recvBufferSize * pendingRecvs;

   unsigned __int64 actualSize = RoundUp(desiredSize, granularity);

   if (actualSize > std::numeric_limits<DWORD>::max())
   {
      actualSize = (std::numeric_limits<DWORD>::max() / granularity) * granularity;
   }

   receiveBuffersAllocated = std::min<DWORD>(pendingRecvs, static_cast<DWORD>(actualSize / recvBufferSize));

   bufferSize = static_cast<DWORD>(actualSize);

   char *pBuffer = reinterpret_cast<char *>(VirtualAllocExNuma(
      GetCurrentProcess(),
      0,
      bufferSize,
      MEM_COMMIT |
      MEM_RESERVE  |
      (largePageMinimum != 0 ? MEM_LARGE_PAGES : 0),
      PAGE_READWRITE,
      preferredNumaNode));

   if (pBuffer == 0)
   {
      ErrorExit("VirtualAlloc");
   }

   return pBuffer;
}

Our allocation function is again slightly more complex than it need be, but that complexity allows us to explore various options by simply changing our configuration constants; you can ignore things like the USE_LARGE_PAGES flag and the fact that we’re allocating to a preferred NUMA node unless you’re interested in the details and your hardware supports these features. The important thing is that we allocate in terms of the system’s allocation granularity and that we use a variant of VirtualAlloc() to do. Once again, in the name of simplicity, we leak these buffers (which will be in use for the life of the program) and allow program exit to clean them up.

Calling RIODequeueCompletion() and processing results

We finally have queues created, buffers registered and reads pending. Processing these reads in our simple polled RIO server is fairly straight forward. First we enter a polling loop for completions and spin until completions are available. Once we have at least one completion we call StartTiming() to start our performance timing. We then process the completion results. Our performance tests are simple, we send a number of datagrams of EXPECTED_DATA_SIZE and then indicate that the test is complete by sending a series of datagrams of a different size. Once our servers receive an unexpected sized datagram they consider the test to be complete and shutdown. Thus our main completion loop is the do/while loop below. We process datagrams, issue new reads and then dequeue more results. Once we’re done we stop our timer and display details about the time taken and the number of datagrams that we processed.

bool done = false;

   DWORD recvFlags = 0;

   RIORESULT results[RIO_MAX_RESULTS];

   ULONG numResults = 0;

   do
   {
      numResults = g_rio.RIODequeueCompletion(
         g_queue,
         results,
         RIO_MAX_RESULTS);

      if (0 == numResults)
      {
         YieldProcessor();
      }
      else if (RIO_CORRUPT_CQ == numResults)
      {
         ErrorExit("RIODequeueCompletion");
      }
   }
   while (0 == numResults);

   StartTiming();

   int workValue = 0;

   bool running = true;

   do
   {
      for (DWORD i = 0; i < numResults; ++i)
      {
         EXTENDED_RIO_BUF *pBuffer = reinterpret_cast<EXTENDED_RIO_BUF *>(results[i].RequestContext);

         if (results[i].BytesTransferred == EXPECTED_DATA_SIZE)
         {
            g_packets++;

            workValue += DoWork(g_workIterations);

            if (!g_rio.RIOReceive(
               g_requestQueue,
               pBuffer,
               1,
               recvFlags,
               pBuffer))
            {
               ErrorExit("RIOReceive");
            }

            done = false;
         }
         else
         {
            done = true;
         }
      }

      if (!done)
      {
         do
         {
            numResults = g_rio.RIODequeueCompletion(
               g_queue,
               results,
               RIO_MAX_RESULTS);

            if (0 == numResults)
            {
               YieldProcessor();
            }
            else if (RIO_CORRUPT_CQ == numResults)
            {
               ErrorExit("RIODequeueCompletion");
            }
         }
         while (0 == numResults);
      }
   }
   while (!done);

   StopTiming();

   PrintTimings();

   return workValue;
}

The DoWork() function above can be used to add ‘processing overhead’ to each datagram. This can be configured using the g_workIterations which is defined in Constants.h. With this set to 0 there is no overhead and we can compare how quickly each API can receive datagrams. Setting larger values will affect how the various multi-threaded examples perform and can be useful if you’re unable to saturate the test machine’s network interfaces.

The code for this example can be downloaded from here. This code requires Visual Studio 11, but would work with earlier compilers if you have a Windows SDK that supports RIO. Note that Shared.h and Constants.h contain helper functions and tuning constants for ALL of the examples and so there will be code in there that is not used by this example. You should be able to unzip each example into the same directory structure so that they all share the same shared headers. This allows you to tune all of the examples the same so that any performance comparisons make sense.

Join in

Comments and suggestions are more than welcome. I’m learning as I go here and I’m quite likely to have made some mistakes or come up with some erroneous conclusions, feel free to put me straight and help make these examples better.

Code is here

Code - updated 15th April 2023

Full source can be found here on GitHub.

This isn’t production code, error handling is simply “panic and run away”.

This code is licensed with the MIT license.