What is the best way to wait on multiple condition variables in C++11?

Julian picture Julian · Dec 7, 2014 · Viewed 21.8k times · Source

First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor class, essentially (I left the exception handling and propagation stuff out) like so:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

Every actor runs in its own actor_thread, waits on a new incoming message on incoming_msgs and -- when a message arrives -- processes it.

The actor_thread is created together with the actor and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt).

Essentially, I require that wait_and_pop blocks until either a) a new message arrives or b) until the interrupt is fired, in which case -- ideally -- an interrupted_exception is to be thrown.

The arrival of a new message in the message_queue is presently modeled also by a std::condition_variable new_msg_notification:

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...) when the interrupt is triggered (without introducing a time-out)?

Alternatively, the question may be read as: How do I wait until any one of two std::condition_variables are signaled?

One naive approach seems to be not to use std::condition_variable at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted and then busy wait on new_msg_notification with a very small time-out until either a new message has arrived or until true==interrupted. However, I would very much like to avoid busy waiting.


EDIT:

From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.

  1. Enqueue a special "Terminate" message, as proposed by Alan, mukunda and pilcrow. I decided against this option because I have no idea about the size of the queue at the time I want the actor to terminate. It may very well be (as it is mostly the case when I want something to quickly terminate) that there are thousands of messages left to process in the queue and it seems unacceptable to wait for them to be processed until finally the terminate message gets its turn.
  2. Implement a custom version of a condition variable, that may be interrupted by another thread by forwarding the notification to the condition variable that the first thread is waiting on. I opted for this approach.

For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore (because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt which can be obtained from the semaphore via semaphore::get_interrupt(). If now one thread blocks in semaphore::wait(), another thread has the possibility to call semaphore::interrupt::trigger() on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception.

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

Using this semaphore, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable I could get rid of the std::mutex:

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    push(message&& msg) {
        queue.push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

My actor, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};

Answer

pilcrow picture pilcrow · Dec 7, 2014

You ask,

What is the best way to wait on multiple condition variables in C++11?

You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time. In this regard the Windows facilities for synchronization are rather richer than those of the "POSIX-style" family of synchronization primitives.

The typical approach with thread-safe queues is to enqueue a special "all done!" message, or to design a "breakable" (or "shutdown-able") queue. In the latter case, the queue's internal condition variable then protects a complex predicate: either an item is available or the queue has been broken.

In a comment you observe that

a notify_all() will have no effect if there is no one waiting

That's true but probably not relevant. wait()ing on a condition variable also implies checking a predicate, and checking it before actually blocking for a notification. So, a worker thread busy processing a queue item that "misses" a notify_all() will see, the next time it inspects the queue condition, that the predicate (a new item is available, or, the queue is all done) has changed.