ConcurrentQueue and Parallel.ForEach

Zroq picture Zroq · Jun 11, 2016 · Viewed 7.5k times · Source

I have a ConcurrentQueue with a list of URLs that I need to get the the source of. When using the Parallel.ForEach with the ConcurrentQueue object as the input parameter, the Pop method won't work nothing (Should return a string).

I'm using Parallel with the MaxDegreeOfParallelism set to four. I really need to block the number of concurrent threads. Is using a queue with Parallelism redundant?

Thanks in advance.

// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });

// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
            itemQueue,
            new ParallelOptions {MaxDegreeOfParallelism = 4},
            item =>
            {

                var worker = new Worker();
                // Pop doesn't return anything
                worker.Url = itemQueue.Pop();
                /* Some work */
             });
 }

// Item Queue
class ItemQueue : ConcurrentQueue<string>
    {
        private ConcurrentQueue<string> queue = new ConcurrentQueue<string>();

        public string Pop()
        {
            string value = String.Empty;
            if(this.queue.Count == 0)
                throw new Exception();
            this.queue.TryDequeue(out value);
            return value;
        }

        public void Push(string item)
        {
            this.queue.Enqueue(item);
        }

        public void EnqueueList(List<string> list)
        {
            list.ForEach(this.queue.Enqueue);
        }
    }

Answer

svick picture svick · Jul 10, 2016

You don't need ConcurrentQueue<T> if all you're going to do is to first add items to it from a single thread and then iterate it in Parallel.ForEach(). A normal List<T> would be enough for that.

Also, your implementation of ItemQueue is very suspicious:

  • It inherits from ConcurrentQueue<string> and also contains another ConcurrentQueue<string>. That doesn't make much sense, is confusing and inefficient.

  • The methods on ConcurrentQueue<T> were designed very carefully to be thread-safe. Your Pop() isn't thread-safe. What could happen is that you check Count, notice it's 1, then call TryDequeue() and not get any value (i.e. value will be null), because another thread removed the item from the queue in the time between the two calls.