The C++ framework for developing highly scalable, high performance servers on Windows platforms.

Example Servers - WebSockets Binary Echo Server

This example shows you how to build WebSockets server which processed binary messages from a HyBi WebSocket connection. This example uses our WebSocket Tools Library to implement the WebSocket protocol. The basic structure of the server is very similar to the Basic Echo Server and Simple Text WebSocket Echo Server example and you should go and read about those first and have a good understanding of how everything fits together. This document will only cover the differences between those examples and this one.

This example requires the "WebSockets" licensing option of The Server Framework and it requires libraries that only ship with that option (see here for licensing options). You can always download the latest version of this example from here; and although you will need the correct libraries to be able to build it you can look at the example code and see how it works and perhaps get ideas from it. A compiled, unicode release, build of this example is available on request if you require it for performance analysis of the framework.

Once again this server uses a server collection, see here for more details, to manage the 5 endpoints that it exposes. We have three different servers with two different configuration options for two of them.

The first endpoint is a server that only supports the HyBi version of the WebSockets protocol and only supports binary messages. This is very similar to the HyBi server in the Simple Text WebSocket Server except that the OnData() functions only work with binary messages.

 void CSocketServer::OnData(
    IWebSocket & /*socket*/,
    const _tstring & /*text*/,
    const MessageStatus /*status*/,
    const __int64 /*messageBytesOutstanding*/)
 {
    // For this to be called you need to specify CProtocolHandler::DispatchTextAsStrings in the flags that are passed
    // during construction

    throw CException(
       _T("CSocketServer::OnData()"),
       _T("Unexpected: we don't handle text frames"));
 }

 void CSocketServer::OnData(
    IWebSocket &socket,
    IBuffer &buffer,
    const MessageType type,
    const MessageStatus status,
    const __int64 messageBytesOutstanding)
 {
    if (status != MessageStatusComplete)
    {
       throw CException(
          _T("CSocketServer::OnData()"),
          _T("Unexpected: message larger than buffer size"));
    }

    if (messageBytesOutstanding != 0)
    {
       throw CException(
          _T("CSocketServer::OnData()"),
          _T("Unexpected: message larger than buffer size: ") + ToString(messageBytesOutstanding) + _T(" outstanding"));
    }

    //DEBUG_ONLY(Output(_T("OnData - ") + ToString(buffer.GetUsed()) + _T(" bytes\r\n") + DumpData(buffer.GetMemory(), buffer.GetUsed())));

    if (type == MessageTypeText)
    {
       socket.TryWriteText(buffer);
    }
    else
    {
       socket.TryWriteBinary(buffer);
    }

    socket.TryRead();
 }

The next server demonstrates how to accumulate messages larger than your I/O buffer size. This may be required if you have messages which can be of variable length and where many messages are small and therefore it doesn't make sense to increase the buffer size for the messages that are larger than a single buffer. Note that this assumes that your server requires complete messages to be able to begin processing them, if that's not the case then you may be able to process parts of the message as they arrive...

To be able to accumulate messages we need to have some per-connection data to store the accumulated message in. We define a class for this here.

 class CPerConnectionData
 {
    public :

       CPerConnectionData(
          const IIndexedOpaqueUserData::UserDataIndex nextBufferIndex);

       ~CPerConnectionData();

       void OnData(
          IWebSocket &socket,
          IBuffer &buffer,
          const MessageType type,
          const MessageStatus status,
          const __int64 frameBytesOutstanding);

    private :

       void AddBuffer(
          IBuffer &buffer);

       void EchoMessage(
          IWebSocket &socket);

       __int64 m_messageSize;

       MessageType m_messageType;

       CBufferChain m_messageBuffers;

       /// No copies do not implement
       CPerConnectionData(const CPerConnectionData &rhs);
       /// No copies do not implement
       CPerConnectionData &operator=(const CPerConnectionData &rhs);
 };

We use a JetByteTools::IO::CBufferChain to keep track of the buffers that form the message. The JetByteTools::IO::CBufferChain is a specialised invasive container which does not require any memory management to add and remove items from the container, unlike STL containers. To make the message easy to process we store details about the message type and the total size. Note that theoretically the total message size could be infinite, as it could consist of an infinite sequence of fragments (see here, to avoid potential denial of service attacks you should limit the sizes of messages that you accept).

Our OnData() callback now passes all of the work off to the per-connection data.

 void CMessageAccumulatingSocketServer::OnData(
    IWebSocket &socket,
    IBuffer &buffer,
    const MessageType type,
    const MessageStatus status,
    const __int64 messageBytesOutstanding)
 {
    CPerConnectionData *pConnectionData = reinterpret_cast<CPerConnectionData *>(socket.GetUserPointer(m_userDataIndex));

    pConnectionData->OnData(socket, buffer, type, status, messageBytesOutstanding);
 }

Which does this...

 void CPerConnectionData::OnData(
    IWebSocket &socket,
    IBuffer &buffer,
    const MessageType type,
    const MessageStatus status,
    const __int64 messageBytesOutstanding)
 {
    if (m_messageSize == 0)
    {
       m_messageType = type;
    }
    else if (m_messageType != type)
    {
       throw CException(_T("CPerConnectionData::OnDataFrame()"), _T("Unexpected frame type"));
    }

    if (status == MessageStatusComplete && messageBytesOutstanding == 0)
    {
       AddBuffer(buffer);

       EchoMessage(socket);

       socket.TryRead();
    }
    else if (buffer.GetSize() == buffer.GetUsed())
    {
       AddBuffer(buffer);

       socket.TryRead();
    }
    else
    {
       socket.TryRead(&buffer);
    }
 }

This server can be configured either to accumulate messages in the WebSocket protocol handler, OR to pass data through to the OnData() callback as it arrives. If we need to accumulate complete messages before processing then accumulating in the WebSocket protocol handler is sensible as it means that for messages that are smaller than the buffer size we have less work to do. If, however, we could process messages incrementally then we should not have the protocol handler accumulate messages as this could delay our processing. The example server configures both kinds of endpoint and, to us, it doesn't make a lot of difference. If the protocol handler is accumulating messages then we'll recieve full buffers except for the final buffer. If the protocol handler is not accumulating then we'll also use the final branch of the if to accumulate more data into the space that's left in the buffer.

 void CPerConnectionData::AddBuffer(
    IBuffer &buffer)
 {
    const IBuffer::BufferSize used = buffer.GetUsed();

    if (used || m_messageBuffers.IsEmpty())
    {
       m_messageSize += used;

       m_messageBuffers.Add(buffer);
    }
 }

CPerConnectionData::AddBuffer() is where we could impose message size limits if we wanted to. Be aware that this example, as presented, could suffer from overflow in m_messageSize.

 void CPerConnectionData::EchoMessage(
    IWebSocket &socket)
 {
    CSmartBuffer buffer(m_messageBuffers.GetNext());

    socket.StartMessage(m_messageType, m_messageSize, buffer.GetRef());

    buffer = m_messageBuffers.GetNext();

    while (buffer.Get())
    {
       socket.SendMessageData(buffer.GetRef());

       buffer = m_messageBuffers.GetNext();
    }

    m_messageSize = 0;
 }

Once we have a complete message we echo it using the "large message API". This allows us to send a single message in a series of buffers. We start by calling JetByteTools::WebSocket::HyBi::IWebSocket::StartMessage() and specifying the type of the message, text or binary, and the message size, we also provide the first data buffer. We can then continue the message by calling JetByteTools::WebSocket::HyBi::IWebSocket::SendMessageData() and passing data buffers until the message is complete.

The final server in this example demonstrates the "explicit fragment API". This allows you to send a message as a series of fragments. You may wish to do this if you don't know how big the message will be when you start sending and you can the message fragmentation support in the WebSocket protocol to send a series of, potentially infinitely long, message streams.

Rather than accumulating a complete message we echo as much as we have back to the client as a fragment. When we get to the end of a message we send a fragment with the FIN bit set and the client recognises that the message is now complete.

When a new message arrives we call JetByteTools::WebSocket::HyBi::IWebSocket::StartFragmentedMessage() and specify the message type, size of the fragment and the first data buffer. Note that in this example each fragment is sent with a single call to either JetByteTools::WebSocket::HyBi::IWebSocket::StartFragmentedMessage() or JetByteTools::WebSocket::HyBi::IWebSocket::StartNewFragment(), this is just a convenience for us in this example. We could send fragments in multiple pieces much like we did with the "large message API" by following the first call with subsequent calls to JetByteTools::WebSocket::HyBi::IWebSocket::SendMessageData() and passing data buffers until the fragment is complete.

When we finally get to the end of our message we call JetByteTools::WebSocket::HyBi::IWebSocket::StartNewFragment() one final time and pass true for the, normally defaulted, flag to specify that this is the final fragment.

 void CFragmentEchoingSocketServer::OnData(
    IWebSocket &socket,
    IBuffer &buffer,
    const MessageType type,
    const MessageStatus status,
    const __int64 messageBytesOutstanding)
 {
    const bool echoing = ToBool(socket.GetUserData(m_userDataIndex));

    if (echoing)
    {
       if (status == MessageStatusComplete && messageBytesOutstanding == 0)
       {
          socket.StartNewFragment(buffer.GetUsed(), buffer, true);

          socket.SetUserData(m_userDataIndex, false);
       }
       else
       {
          socket.StartNewFragment(buffer.GetUsed(), buffer);
       }
    }
    else
    {
       socket.StartFragmentedMessage(type, buffer.GetUsed(), buffer);

       socket.SetUserData(m_userDataIndex, true);
    }

    socket.TryRead();
 }

Note that both the "large message API" and the "explicit fragment API" work with both text and binary messages. This example simply restructs itself to binary messages for simplicity.

Generated on Sun Sep 12 19:06:45 2021 for The Server Framework - v7.4 by doxygen 1.5.3