How to multithread queue processing

Chris Redford picture Chris Redford · May 14, 2014 · Viewed 12.5k times · Source

C++ containers are supposed to be thread-safe by default. I must be using queue to multithread incorrectly because for this code:

#include <thread>
using std::thread;
#include <iostream>
using std::cout;
using std::endl;
#include <queue>
using std::queue;
#include <string>
using std::string;
using std::to_string;
#include <functional>
using std::ref;


void fillWorkQueue(queue<string>& itemQueue) {
    int size = 40000;
    for(int i = 0; i < size; i++)
        itemQueue.push(to_string(i));
}

void doWork(queue<string>& itemQueue) {
    while(!itemQueue.empty()) {
        itemQueue.pop();
    }   
}

void singleThreaded() {
    queue<string> itemQueue;
    fillWorkQueue(itemQueue);
    doWork(itemQueue);
    cout << "done\n";
}

void multiThreaded() {
    queue<string> itemQueue;
    fillWorkQueue(itemQueue);
    thread t1(doWork, ref(itemQueue));
    thread t2(doWork, ref(itemQueue));
    t1.join();
    t2.join();
    cout << "done\n";
}

int main() {
    cout << endl;

    // Single Threaded
    cout << "singleThreaded\n";
    singleThreaded();
    cout << endl;

    // Multi Threaded
    cout << "multiThreaded\n";
    multiThreaded();
    cout << endl;
}

I'm getting:

singleThreaded
done

multiThreaded
main(32429,0x10e530000) malloc: *** error for object 0x7fe4e3883e00: pointer being freed was not allocated
*** set a breakpoint in malloc_error_break to debug
make: *** [run] Abort trap: 6

What am I doing wrong here?

EDIT

Apparently I misread the link above. Is there a thread-safe queue implementation available that does what I am trying to do? I know this is a common thread organization strategy.

Answer

Anton picture Anton · May 14, 2014

As pointed in the comments, STL containers are not thread-safe for read-write operations. Instead, try concurrent_queue class from TBB or PPL, e.g.:

void doWork(concurrent_queue<string>& itemQueue) {
    string result;
    while(itemQueue.try_pop(result)) {
        // you have `result`
    }   
}