Ensuring task execution order in threadpool

nc3b picture nc3b · Aug 25, 2011 · Viewed 27.3k times · Source

I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

Consider this scenario where the tasks with * need to be processed in the order they were pushed in. The other tasks can be processed in any order.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.

So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

EDIT

As a more general problem, suppose the scenario above becomes

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 for example.

One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).

Thank you for your time.

Answer

Tim Lloyd picture Tim Lloyd · Sep 1, 2011

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Update

To handle task groups with serial and parallel task mixes, a GroupedTaskQueue can manage a TaskQueue for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}