Say I have 10N items(I need to fetch them via http protocol), in the code N Tasks are started to get data, each task takes 10 items in sequence. I put the items in a ConcurrentQueue<Item>
. After that, the items are processed in a thread-unsafe method one by one.
async Task<Item> GetItemAsync()
{
//fetch one item from the internet
}
async Task DoWork()
{
var tasks = new List<Task>();
var items = new ConcurrentQueue<Item>();
var handles = new List<ManualResetEvent>();
for i 1 -> N
{
var handle = new ManualResetEvent(false);
handles.Add(handle);
tasks.Add(Task.Factory.StartNew(async delegate
{
for j 1 -> 10
{
var item = await GetItemAsync();
items.Enqueue(item);
}
handle.Set();
});
}
//begin to process the items when any handle is set
WaitHandle.WaitAny(handles);
while(true)
{
if (all handles are set && items collection is empty) //***
break;
//in another word: all tasks are really completed
while(items.TryDequeue(out item))
{
AThreadUnsafeMethod(item); //process items one by one
}
}
}
I don't know what if condition can be placed in the statement marked ***
. I can't use Task.IsCompleted
property here, because I use await
in the task, so the task is completed very soon. And a bool[]
that indicates whether the task is executed to the end looks really ugly, because I think ManualResetEvent can do the same work. Can anyone give me a suggestion?
Well, you could build this yourself, but I think it's tons easier with TPL Dataflow.
Something like:
static async Task DoWork()
{
// By default, ActionBlock uses MaxDegreeOfParallelism == 1,
// so AThreadUnsafeMethod is not called in parallel.
var block = new ActionBlock<Item>(AThreadUnsafeMethod);
// Start off N tasks, each asynchronously acquiring 10 items.
// Each item is sent to the block as it is received.
var tasks = Enumerable.Range(0, N).Select(Task.Run(
async () =>
{
for (int i = 0; i != 10; ++i)
block.Post(await GetItemAsync());
})).ToArray();
// Complete the block when all tasks have completed.
Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); });
// Wait for the block to complete.
await block.Completion;
}