First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor
class, essentially (I left the exception handling and propagation stuff out) like so:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
{}
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
interrupt.notify_all();
actor_thread.join();
}
private: virtual void run_actor() {
try {
while(!stop)
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
process(incoming_msgs.wait_and_pop(interrupt));
}
catch(interrupted_exception) {
// ...
}
};
private: virtual void process(const message&) = 0;
// ...
};
Every actor runs in its own actor_thread
, waits on a new incoming message on incoming_msgs
and -- when a message arrives -- processes it.
The actor_thread
is created together with the actor
and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt)
.
Essentially, I require that wait_and_pop
blocks until either
a) a new message
arrives or
b) until the interrupt
is fired, in which case -- ideally -- an interrupted_exception
is to be thrown.
The arrival of a new message in the message_queue
is presently modeled also by a std::condition_variable new_msg_notification
:
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
new_msg_notification.wait(lock,[&]{
return !queue.empty();
});
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...)
when the interrupt
is triggered (without introducing a time-out)?
Alternatively, the question may be read as: How do I wait until any one of two std::condition_variable
s are signaled?
One naive approach seems to be not to use std::condition_variable
at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted
and then busy wait on new_msg_notification
with a very small time-out until either a new message has arrived or until true==interrupted
. However, I would very much like to avoid busy waiting.
From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.
For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore
(because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt
which can be obtained from the semaphore via semaphore::get_interrupt()
. If now one thread blocks in semaphore::wait()
, another thread has the possibility to call semaphore::interrupt::trigger()
on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception
.
struct
interrupt_exception {};
class
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
public:
semaphore();
public:
~semaphore() throw();
public: void
wait();
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
counter++;
cond.notify_one(); // never throws
}
public: unsigned long
load () const {
return counter.load();
}
};
class
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
public:
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
assert(forward_posts_to);
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
}
public: void
trigger() {
assert(forward_posts_to);
std::lock_guard<std::mutex>(forward_posts_to->mutex);
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
}
public: bool
is_triggered () const throw() {
return triggered.load();
}
public: void
reset () throw() {
return triggered.store(false);
}
};
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
}
void
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
cond.wait(lock,[&]{
if(informed_by->is_triggered())
throw interrupt_exception();
return counter>0;
});
}
counter--;
}
Using this semaphore
, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable
I could get rid of the std::mutex
:
class
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
push(message&& msg) {
queue.push(std::move(msg));
new_msg_notification.post();
}
public: const message
wait_and_pop() {
new_msg_notification.wait();
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
};
My actor
, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:
class
actor {
private: message_queue
incoming_msgs;
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
interrupt;
private: std::thread
my_thread;
private: std::exception_ptr
exception;
public:
actor()
: interrupt(incoming_msgs.get_interrupt()), my_thread(
[&]{
try {
run_actor();
}
catch(...) {
exception = std::current_exception();
}
})
{}
private: virtual void
run_actor() {
while(!interrupt.is_triggered())
process(incoming_msgs.wait_and_pop());
};
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
incoming_msgs.push(std::forward<message>(msg_in));
}
public: virtual
~actor() throw (interrupt_exception) {
interrupt.trigger();
my_thread.join();
if(exception)
std::rethrow_exception(exception);
}
};
You ask,
What is the best way to wait on multiple condition variables in C++11?
You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time. In this regard the Windows facilities for synchronization are rather richer than those of the "POSIX-style" family of synchronization primitives.
The typical approach with thread-safe queues is to enqueue a special "all done!" message, or to design a "breakable" (or "shutdown-able") queue. In the latter case, the queue's internal condition variable then protects a complex predicate: either an item is available or the queue has been broken.
In a comment you observe that
a notify_all() will have no effect if there is no one waiting
That's true but probably not relevant. wait()
ing on a condition variable also implies checking a predicate, and checking it before actually blocking for a notification. So, a worker thread busy processing a queue item that "misses" a notify_all()
will see, the next time it inspects the queue condition, that the predicate (a new item is available, or, the queue is all done) has changed.