How to cancel GetConsumingEnumerable() on BlockingCollection

Spud picture Spud · Feb 2, 2012 · Viewed 9.1k times · Source

In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

later...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}

Answer

Anthony picture Anthony · Sep 27, 2012

You could use CompleteAdding() which signifies that no more items will be added to the collection. If GetConsumingEnumerable is used, the foreach will end gracefully as it will know there's no point in waiting for more items.

Basically once you have finished adding items to the BlockingCollection just do:

myBlockingCollection.CompleteAdding() 

Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.