ConcurrentQueue with multithreading

Naveen kumar picture Naveen kumar · Jan 4, 2017 · Viewed 11k times · Source

I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue which is thread safe.

This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}

Answer

Sefe picture Sefe · Jan 4, 2017

There are a couple of issues with your implementation. The first and obvious one is that the worker method only dequeues zero or one item and then stops:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

It should be:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true once enqueueing is done:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

The workers will check the value:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}