Efficiently waiting for all tasks in a threadpool to finish

Syntactic Fructose picture Syntactic Fructose · May 27, 2014 · Viewed 7.7k times · Source

I currently have a program with x workers in my threadpool. During the main loop y tasks are assigned to the workers to complete, but after the tasks are sent out I must wait for all tasks for finish before preceding with the program. I believe my current solution is inefficient, there must be a better way to wait for all tasks to finish but I am not sure how to go about this

// called in main after all tasks are enqueued to 
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
    while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
    {
        //do literally nothing
    }
}

More information:

threadpool structure

//worker thread objects
class Worker {
    public:
        Worker(ThreadPool& s): pool(s) {}
        void operator()();
    private:
        ThreadPool &pool;
};

//thread pool
class ThreadPool {
    public:
        ThreadPool(size_t);
        template<class F>
        void enqueue(F f);   
        void waitFinished();
        ~ThreadPool();
    private:
        friend class Worker;
        //keeps track of threads so we can join
        std::vector< std::thread > workers;
        //task queue
        std::deque< std::function<void()> > tasks;
        //sync
        std::mutex queue_mutex;
        std::condition_variable condition;
        bool stop;
};

or here's a gist of my threadpool.hpp

example of what I want to use waitFinished() for:

while(running)
    //....
    for all particles alive
        push particle position function to threadpool
    end for

    threadPool.waitFinished();

    push new particle position data into openGL buffer
end while

so this way I can send hundrends of thousands of particle position tasks to be done in parallel, wait for them to finish and put the new data inside the openGL position buffers

Answer

WhozCraig picture WhozCraig · May 27, 2014

This is one way to do what you're trying. Using two condition variables on the same mutex is not for the light-hearted unless you know what is going on internally. I didn't need the atomic processed member other than my desire to demonstrate how many items were finished between each run.

The sample workload function in this generates one million random int values, then sorts them (gotta heat my office one way or another). waitFinished will not return until the queue is empty and no threads are busy.

#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <random>

//thread pool
class ThreadPool
{
public:
    ThreadPool(unsigned int n = std::thread::hardware_concurrency());

    template<class F> void enqueue(F&& f);
    void waitFinished();
    ~ThreadPool();

    unsigned int getProcessed() const { return processed; }

private:
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > tasks;
    std::mutex queue_mutex;
    std::condition_variable cv_task;
    std::condition_variable cv_finished;
    std::atomic_uint processed;
    unsigned int busy;
    bool stop;

    void thread_proc();
};

ThreadPool::ThreadPool(unsigned int n)
    : busy()
    , processed()
    , stop()
{
    for (unsigned int i=0; i<n; ++i)
        workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
}

ThreadPool::~ThreadPool()
{
    // set stop-condition
    std::unique_lock<std::mutex> latch(queue_mutex);
    stop = true;
    cv_task.notify_all();
    latch.unlock();

    // all threads terminate, then we're done.
    for (auto& t : workers)
        t.join();
}

void ThreadPool::thread_proc()
{
    while (true)
    {
        std::unique_lock<std::mutex> latch(queue_mutex);
        cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
        if (!tasks.empty())
        {
            // got work. set busy.
            ++busy;

            // pull from queue
            auto fn = tasks.front();
            tasks.pop_front();

            // release lock. run async
            latch.unlock();

            // run function outside context
            fn();
            ++processed;

            latch.lock();
            --busy;
            cv_finished.notify_one();
        }
        else if (stop)
        {
            break;
        }
    }
}

// generic function push
template<class F>
void ThreadPool::enqueue(F&& f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.emplace_back(std::forward<F>(f));
    cv_task.notify_one();
}

// waits until the queue is empty.
void ThreadPool::waitFinished()
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
}

// a cpu-busy task.
void work_proc()
{
    std::random_device rd;
    std::mt19937 rng(rd());

    // build a vector of random numbers
    std::vector<int> data;
    data.reserve(100000);
    std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
    std::sort(data.begin(), data.end(), std::greater<int>());
}

int main()
{
    ThreadPool tp;

    // run five batches of 100 items
    for (int x=0; x<5; ++x)
    {
        // queue 100 work tasks
        for (int i=0; i<100; ++i)
            tp.enqueue(work_proc);

        tp.waitFinished();
        std::cout << tp.getProcessed() << '\n';
    }

    // destructor will close down thread pool
    return EXIT_SUCCESS;
}

Output

100
200
300
400
500

Best of luck.