C++ Syncing threads in most elegant way

Hawk89 picture Hawk89 · Apr 29, 2013 · Viewed 14.1k times · Source

I am try to solve the following problem, I know there are multiple solutions but I'm looking for the most elegant way (less code) to solve it.

I've 4 threads, 3 of them try to write a unique value (0,1,or 2) to a volatile integer variable in an infinite loop, the forth thread try to read the value of this variable and print the value to the stdout also in an infinite loop.

I'd like to sync between the thread so the thread that writes 0 will be run and then the "print" thread and then the thread that writes 1 and then again the print thread, an so on... So that finally what I expect to see at the output of the "print" thread is a sequence of zeros and then sequence of 1 and then 2 and then 0 and so on...

What is the most elegant and easy way to sync between these threads.

This is the program code:

volatile int value;
int thid[4];

int main() {
    HANDLE handle[4];
    for (int ii=0;ii<4;ii++) {
        thid[ii]=ii;
        handle[ii] = (HANDLE) CreateThread( NULL, 0, (LPTHREAD_START_ROUTINE)                 ThreadProc, &thid[ii], 0, NULL);
    }
    return 0;
}

void WINAPI ThreadProc( LPVOID param ) {
    int h=*((int*)param);

    switch (h) {
        case 3:
            while(true) {
                cout << value << endl;
            }
            break;
        default:
            while(true) {
                // setting a unique value to the volatile variable
                value=h;
            }
            break;
    }
}

Answer

Gabriel picture Gabriel · Jul 14, 2013

your problem can be solved with the producer consumer pattern. I got inspired from Wikipedia so here is the link if you want some more details.

https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem

I used a random number generator to generate the volatile variable but you can change that part.

Here is the code: it can be improved in terms of style (using C++11 for random numbers) but it produces what you expect.

#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
#include <stdlib.h>     /* srand, rand */
using namespace std;

//random number generation
std::mutex mutRand;//mutex for random number generation (given that the random generator is not thread safe).
int GenerateNumber()
{
    std::lock_guard<std::mutex> lk(mutRand);
    return rand() % 3;
}

// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }

//      Constants
//
const int num_producers = 3;                //the three producers of random numbers
const int num_consumers = 1;                //the only consumer
const int producer_delay_to_produce = 10;   // in miliseconds
const int consumer_delay_to_consume = 30;   // in miliseconds

const int consumer_max_wait_time = 200;     // in miliseconds - max time that a consumer can wait for a product to be produced.

const int max_production = 1;              // When producers has produced this quantity they will stop to produce
const int max_products = 1;                // Maximum number of products that can be stored

//
//      Variables
//
atomic<int> num_producers_working(0);       // When there's no producer working the consumers will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry

condition_variable is_not_full;             // to indicate that our stack is not full between the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between the thread operations

//
//      Functions
//

//      Produce function, producer_id will produce a product
void produce(int producer_id)
{
    while (true)
    {
        unique_lock<mutex> lock(xmutex);
        int product;

        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = GenerateNumber();
        products.push(product);

        print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
        is_not_empty.notify_all();
    }

}

//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
    while (true)
    {
        unique_lock<mutex> lock(xmutex);
        int product;

        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size() > 0; }))
        {
                product = products.top();
                products.pop();

                print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
                is_not_full.notify_all();
        }
    }

}

//      Producer function, this is the body of a producer thread
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i < max_production; ++i)
        {
                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }

        print(stringstream() << "Producer " << id << " has exited\n");
        --num_producers_working;
}

//      Consumer function, this is the body of a consumer thread
void consumer(int id)
{
        // Wait until there is any producer working
        while(num_producers_working == 0) this_thread::yield();

        while(num_producers_working != 0 || products.size() > 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }

        print(stringstream() << "Consumer " << id << " has exited\n");
}

//
//      Main
//

int main()
{
        vector<thread> producers_and_consumers;

        // Create producers
        for(int i = 0; i < num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));

        // Create consumers
        for(int i = 0; i < num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));

        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
        return 0;
}

Hope that helps, tell me if you need more info or if you disagree with something :-)

And Good Bastille Day to all French people!