This question is very similar to a previous one here: race-condition in pthread_once()?
It is essentially the same issue - the lifetime of a std::promise
ending during a call to promise::set_value
(ie: after the associated future has been flagged, but before pthread_once
has executed)
So I know that my usage has this issue, and that I therefore cannot use it in this way. However, I think this is non-obvious. (In the wise words of Scott Meyer: Make Interfaces Easy to Use Correctly and Hard to Use Incorrectly)
I present an exemplar below:
dispatcher
) which spins on a queue, popping a 'job' (a std::function
) and executing it.synchronous_job
which blocks the calling thread until the 'job' has been executed on the dispatcher threadstd::promise
and std::future
are members of synchronous_job
- once the future
is set, the blocked calling thread continues, which results in the synchronous_job
popping off the stack and being destructed.dispatcher
was context switched whilst inside promise::set_value
; the future
is flagged, but the call to pthread_once
hasn't executed, and the pthread stack is somehow corrupted, meaning next time around: deadlock I would expect a call to promise::set_value
to be atomic; the fact that it needs to do more work after it has flagged the future
will inevitably lead to this kind of issue when using these classes in this manner.
So my question is: How to achieve this kind of synchronisation using std::promise
and std::future
, keeping their lifetime associated with the class which provides this synchronisation mechanism?
@Jonathan Wakely, could you perhaps use some RAII-style class internally which sets the condition_variable
in its destructor after it flags the future
? This would mean that even if the promise
is destructed in the midst of a call to set_value
, the additional work of setting the condition variable would complete correctly. Just an idea, not sure if you can use it...
A full working example below, and the stack trace of the deadlocked app after:
#include <iostream>
#include <thread>
#include <future>
#include <queue>
struct dispatcher
{
dispatcher()
{
_thread = std::move(std::thread(&dispatcher::loop, this));
}
void post(std::function<void()> job)
{
std::unique_lock<std::mutex> l(_mtx);
_jobs.push(job);
_cnd.notify_one();
}
private:
void loop()
{
for (;;)
{
std::function<void()> job;
{
std::unique_lock<std::mutex> l(_mtx);
while (_jobs.empty())
_cnd.wait(l);
job.swap(_jobs.front());
_jobs.pop();
}
job();
}
}
std::thread _thread;
std::mutex _mtx;
std::condition_variable _cnd;
std::queue<std::function<void()>> _jobs;
};
//-------------------------------------------------------------
struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
, _f(_p.get_future())
{
}
void run()
{
_d.post(std::bind(&synchronous_job::cb, this));
_f.wait();
}
private:
void cb()
{
_job();
_p.set_value();
}
std::function<void()> _job;
dispatcher& _d;
std::promise<void> _p;
std::future<void> _f;
};
//-------------------------------------------------------------
struct test
{
test()
: _count(0)
{
}
void run()
{
synchronous_job job(std::bind(&test::cb, this), _d);
job.run();
}
private:
void cb()
{
std::cout << ++_count << std::endl;
}
int _count;
dispatcher _d;
};
//-------------------------------------------------------------
int main()
{
test t;
for (;;)
{
t.run();
}
}
The stack trace of the deadlocked app:
Thread 1 (main thread)
#0 0x00007fa112ed750c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007fa112a308ec in __gthread_cond_wait (__mutex=<optimized out>, __cond=<optimized out>) at /hostname/tmp/syddev/Build/gcc-4.6.2/gcc-build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:846
#2 std::condition_variable::wait (this=<optimized out>, __lock=...) at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3 0x00000000004291d9 in std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x78e050, __lock=..., __p=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/condition_variable:93
#4 0x00000000004281a8 in std::__future_base::_State_base::wait (this=0x78e018) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:331
#5 0x000000000042a2d6 in std::__basic_future<void>::wait (this=0x7fff0ae515c0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:576
#6 0x0000000000428dd8 in synchronous_job::run (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7 0x0000000000428f97 in test::run (this=0x7fff0ae51660) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8 0x0000000000427ad6 in main () at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99
Thread 2 (dispatcher thread)
#0 0x00007fa112ed8b5b in pthread_once () from /lib64/libpthread.so.0
#1 0x0000000000427946 in __gthread_once (__once=0x78e084, __func=0x4272d0 <__once_proxy@plt>) at /hostname/sdk/gcc470/suse11/x86_64/bin/../lib/gcc/x86_64-unknown-linux-gnu/4.7.0/../../../../include/c++/4.7.0/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2 0x000000000042948b in std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >&&, std::reference_wrapper<bool>&&) (__once=..., __f=
@0x7fa111ff6be0: (void (std::__future_base::_State_base::*)(std::__future_base::_State_base * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> &, bool &)) 0x42848a <std::__future_base::_State_base::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&)>) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3 0x000000000042827d in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool) (this=0x78e018, __res=..., __ignore_failure=false) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
#4 0x00000000004288d5 in std::promise<void>::set_value (this=0x7fff0ae515a8) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5 0x0000000000428e2a in synchronous_job::cb (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6 0x000000000042df53 in std::_Mem_fn<void (synchronous_job::*)()>::operator() (this=0x78c6e0, __object=0x7fff0ae51580) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7 0x000000000042d77c in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x78c6e0, __args=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8 0x000000000042cb28 in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::operator()<, void>() (this=0x78c6e0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9 0x000000000042b772 in std::_Function_handler<void (), std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)> >::_M_invoke(std::_Any_data const&) (__functor=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c in std::function<void ()>::operator()() const (this=0x7fa111ff6da0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher::loop (this=0x7fff0ae51668) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39
std::promise
is just like any other object: you can only access it from one thread at a time. In this case, you are calling set_value()
and destroying the object from separate threads without sufficient synchronization: nowhere in the spec does it say that set_value
will not touch the promise
object after making the future
ready.
However, since this future is used for a one-shot synchronization, you don't need to do that anyway: create the promise/future pair right in run()
, and pass the promise to the thread:
struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
{
}
void run(){
std::promise<void> p;
std::future<void> f=p.get_future();
_d.post(
[&]{
cb(std::move(p));
});
f.wait();
}
private:
void cb(std::promise<void> p)
{
_job();
p.set_value();
}
std::function<void()> _job;
dispatcher& _d;
};