I have an application at $work where I have to move between two real-time threads that are scheduled at different frequencies. (The actual scheduling is beyond my control.) The application is hard real-time-ish (one of the threads has to drive a hardware interface), so the data transfer between the threads should be lock-free and wait-free to the extent possible.
It is important to note that only one block of data needs to be transferred: because the two threads run at different rates, there will be times when two iterations of the faster thread are completed between two wakeups of the slower thread; in this case it is OK to overwrite the data in the write buffer so that the slower thread gets only the latest data.
In other words, instead of a queue a double buffered solution suffices. The two buffers are allocated during initialization, and the reader and write threads can call methods of the class to get pointers to one of these buffers.
C++ code:
#include <mutex>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}
~ProducerConsumerDoubleBuffer() { }
// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = true;
m_write_idx = 1 - m_read_idx;
return &m_buf[m_write_idx];
}
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = false;
}
// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_write_busy) {
m_read_idx = m_write_idx;
}
return &m_buf[m_read_idx];
}
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_read_idx = m_write_idx;
}
private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
};
To avoid stale data in the reader thread the payload structure is versioned. To facilitate bidirectional data transfer between the threads, two instances of the above monstrosity are used, in opposite directions.
Questions:
Very interesting problem! Way trickier than I first thought :-) I like lock-free solutions, so I've tried to work one out below.
There are many ways to think about this system. You can model it as a fixed-size circular buffer/queue (with two entries), but then you lose the ability to update the next available value for consumption, since you don't know if the consumer has started to read the most recently published value or is still (potentially) reading the previous one. So extra state is needed beyond that of a standard ring buffer in order to reach a more optimal solution.
First note that there is always a cell that the producer can safely write to at any given point in time; if one cell is being read by the consumer, the other can be written to. Let's call the cell that can be safely written to the "active" cell (the cell that can be potentially read from is whatever cell isn't the active one). The active cell can only be switched if the other cell is not currently being read from.
Unlike the active cell, which can always be written to, the non-active cell can only be read from if it contains a value; once that value is consumed, it's gone. (This means that livelock is avoided in the case of an aggressive producer; at some point, the consumer will have emptied a cell and will stop touching the cells. Once that happens, the producer can definitely publish a value, whereas before that point, it can only publish a value (change the active cell) if the consumer is not in the middle of a read.)
If there is a value that's ready to be consumed, only the consumer can change that fact (for the non-active cell, anyway); subsequent productions may change which cell is active and the published value, but a value will always be ready to be read until it's consumed.
Once the producer is done writing to the active cell, it can "publish" this value by changing which cell is the active one (swapping the index), provided the consumer is not in the middle of reading the other cell. If the consumer is in the middle of reading the other cell, the swap cannot occur, but in that case the consumer can swap after it's done reading the value, provided the producer is not in the middle of a write (and if it is, the producer will swap once it's done). In fact, in general the consumer can always swap after it's done reading (if it's the only one accessing the system) because spurious swaps by the consumer are benign: if there is something in the other cell, then swapping will cause that to be read next, and if there isn't, swapping affects nothing.
So, we need a shared variable to track what the active cell is, and we also need a way for both the producer and consumer to indicate if they're in the middle of an operation. We can store these three pieces of state into one atomic variable in order to be able to affect them all at once (atomically). We also need a way for the consumer to check if there's anything in the non-active cell in the first place, and for both threads to modify that state as appropriate. I tried a few other approaches, but in the end the easiest was just to include this information in the other atomic variable too. This makes things much simpler to reason about, since all state changes in the system are atomic this way.
I've come up with a wait-free implementation (lock-free, and all operations complete in a bounded number of instructions).
Code time!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
Note that the semantics are such that the consumer can never read a given value twice, and a value it does read is always newer than the last value it read. It's also fairly efficient in memory usage (two buffers, like your original solution). I avoided CAS loops because they're generally less efficient than a single atomic operation under contention.
If you decide use the above code, I suggest you write some comprehensive (threaded) unit tests for it first. And proper benchmarks. I did test it, but only just barely. Let me know if you find any bugs :-)
My unit test:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
As for your original implementation, I only looked at it cursorily (it's much more fun to design new stuff, heh), but david.pfx's answer seems to address that part of your question.