How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.
The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into “finishing mode”: it will process remaining items in its input queue, but it won't accept any new items.
There is a simpler fix than splitting your processor block in two though: don't set PropagateCompletion
, but instead set completion of the processor block manually when both transform blocks complete:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
.ContinueWith(_ => processorBlock.Complete());