is it good to use BlockingCollection<T> as single-producer, single-consumer FIFO query?

Oleg Vazhnev picture Oleg Vazhnev · Apr 11, 2012 · Viewed 8.4k times · Source

I need single-producer, single-consumer FIFO query because

  • I need to process messages in the order they received.
  • I need to do this asynchronous because caller should not wait while I'm processing message.
  • Next message processing should be started only when previous message processing is finished. Sometimes frequency of "receiving" messages is higher than frequency of "processing" messages. But in average I should be able to process all messages, just sometimes I have to "queue" pack of them.

So it's pretty like TCP/IP I think, where you have one producer and one consumer, SOMETIMES you can receive messages faster than you can process, so you have to query them. Where order IS important and where caller absolutely not interested what you doing with that stuff.

This sounds easy enough and I likely can use general Queue for that, but I want to use BlockingCollection for that because I don't want to write any code with ManualResetEvent etc.

How suitable BlockingCollection for my task and probably you can suggest something else?

Answer

sll picture sll · Apr 11, 2012

BlockingCollection class implements IProducerConsumerCollection interface so perfectly fits your requirements.

You can create two Tasks, one for async producer and an other one as consumer worker. Former would add items to BlockingCollection and the latter just consume as soon as new are available in FIFO order.

Producer-consumer sample application using TPL Tasks and BlockingCollection:

class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        Start();
    }

    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());

        Task.WaitAll(producerWorker, consumerWorker);
    }

    private static void RunProducer()
    {
        int itemsCount = 100;

        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }

    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}

IProducerConsumerCollection:

Defines methods to manipulate thread-safe collections intended for producer/consumer usage. This interface provides a unified representation for producer/consumer collections so that higher level abstractions such as System.Collections.Concurrent.BlockingCollection(Of T) can use the collection as the underlying storage mechanism.