I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.
I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.
Here's my implementation:
public class MessageQueue<T>
{
ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
ConcurrentQueue<TaskCompletionSource<T>> waitingQueue =
new ConcurrentQueue<TaskCompletionSource<T>>();
object queueSyncLock = new object();
public void Enqueue(T item)
{
queue.Enqueue(item);
ProcessQueues();
}
public async Task<T> Dequeue()
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
waitingQueue.Enqueue(tcs);
ProcessQueues();
return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
}
private void ProcessQueues()
{
TaskCompletionSource<T> tcs=null;
T firstItem=default(T);
while (true)
{
bool ok;
lock (queueSyncLock)
{
ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
if (ok)
{
waitingQueue.TryDequeue(out tcs);
queue.TryDequeue(out firstItem);
}
}
if (!ok) break;
tcs.SetResult(firstItem);
}
}
}
I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T>
should suffice, e.g.:
BufferBlock<int> buffer = new BufferBlock<int>();
Production and consumption are most easily done via extension methods on the dataflow block types.
Production is as simple as:
buffer.Post(13);
and consumption is async-ready:
int item = await buffer.ReceiveAsync();
I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.