Multi-threaded C++ Message Passing

grouma picture grouma · Aug 4, 2012 · Viewed 10.3k times · Source

I am tasked to modify a synchronous C program so that it can run in parallel. The goal is to have it be as portable as possible as it is an open source program that many people use. Because of this, I thought it would be best to wrap the program in a C++ layer so that I could take advantage of the portable boost libraries. I have already done this and everything seems to work as expected.

The problem I am having is deciding on what is the best approach to pass messages between the threads. Luckily, the architecture of the program is that of a multiple producer and single consumer. Even better, the order of the messages is not important. I have read that single-producer/single-consumer (SPSC) queues would benefit from this architecture. Those experienced with multi-threaded programming have any advice? I'm quite new to this stuff. Also any code examples using boost to implement SPSC would be greatly appreciated.

Answer

bytemaster picture bytemaster · Aug 4, 2012

Below is the technique I used for my Cooperative Multi-tasking / Multi-threading library (MACE) http://bytemaster.github.com/mace/. It has the benefit of being lock-free except for when the queue is empty.

struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }