Replacing Socket.ReceiveAsync with NetworkStream.ReadAsync (awaitable)

Josh picture Josh · Mar 16, 2012 · Viewed 15.8k times · Source

I have an application that makes a couple hundred TCP connections at the same time, and receives a constant stream of data from them.

 private void startReceive()
    {
        SocketAsyncEventArgs e = new SocketAsyncEventArgs();
        e.Completed += receiveCompleted;
        e.SetBuffer(new byte[1024], 0, 1024);
        if (!Socket.ReceiveAsync(e)) { receiveCompleted(this, e); }  
    }

    void receiveCompleted(object sender, SocketAsyncEventArgs e)
    {
        ProcessData(e);

        if (!Socket.ReceiveAsync(e)) { receiveCompleted(this, e); }
    }

My attempts led to something like this:

private async void StartReceive()
    {
        byte[] Buff = new byte[1024];
        int recv = 0;
        while (Socket.Connected)
        {
            recv = await NetworkStream.ReadAsync(Buff, 0, 1024);
            ProcessData(Buff,recv);
        }
    }

The issue I had was the method calling StartReceive() would block, and not get to the accompanying StartSend() method called afterStartReceive(). Creating a new task forStartReceive()would just end up with 300-ish threads, and it seems to do so just by callingStartReceive()` anyways.

What would be the correct method of implementing the new async and await keywords on my existing code while using a NetworkStream so it is using the thread pool that Socket.SendAsync() and Socket.ReceiveAsync() are using to avoid having to have hundreds of threads/tasks?

Is there any performance advantage of using networkstream in this manner over i/o completion ports with beginreceive?

Answer

Stephen Cleary picture Stephen Cleary · Mar 17, 2012

You're changing two things at once here: the asynchronous style (SocketAsyncEventArgs to Task/async) and the level of abstraction (Socket to NetworkStream).

Since you're already comfortable with Socket, I recommend just changing the asynchronous style, and continue using the Socket class directly.

The Async CTP doesn't give Socket any async-compatible methods (which is weird; I assume they were left out by mistake and will be added in .NET 4.5).

It's not that hard to create your own ReceiveAsyncTask extension method (and similar wrappers for other operations) if you use my AsyncEx library:

public static Task<int> ReceiveAsyncTask(this Socket socket,
    byte[] buffer, int offset, int size)
{
  return AsyncFactory<int>.FromApm(socket.BeginReceive, socket.EndReceive,
      buffer, offset, size, SocketFlags.None);
}

Once you do that, your StartReceive can be written as such:

private async Task StartReceive()
{
  try
  {
    var buffer = new byte[1024];
    while (true)
    {
      var bytesReceived = await socket.ReceiveAsyncTask(buffer, 0, 1024)
          .ConfigureAwait(false);
      ProcessData(buffer, bytesReceived);
    }
  }
  catch (Exception ex)
  {
    // Handle errors here
  }
}

Now, to address many minor points:

  • await doesn't spawn a new thread. I wrote up an async/await intro on my blog, as have many others. async/await allows concurrency, but that doesn't necessarily imply multithreading.
  • Hundreds of threads can be problematic. Hundreds of tasks, though, are not a problem at all; the thread pool and BCL are designed to handle many, many tasks.
  • async/await is not a brand new form of asynchronous processing; it's just an easier way to express asynchronous processing. It still uses IOCPs underneath. async/await has slightly lower performance than the lower-level methods; its appeal is the ease of writing and composing asynchronous methods.
  • A very busy system can see some increased GC pressure when it switches to async/await. Stephen Toub on the Parallel Team wrote up some example socket-specific awaitables that can help with that issue. (I recommend using the straightforward pattern first, and only using the performance-enhanced approach if you find it necessary; still, it's good to know it's out there if you do end up needing it).
  • Async methods should return Task unless you really need them to return void. Task is awaitable, so your method is composable (and more easily testable); void is more like "fire and forget".
  • You can call ConfigureAwait(false) to tell the rest of the async method to execute on a thread pool thread. I use this in my example above so that ProcessData is executed in a thread pool thread, just like it was when using SocketAsyncEventArgs.
  • Socket.Connected is useless. You need to send data to detect if the connection is still valid.