The following API is used by multiple threads updating buckets in a buffer, while a single consumer processes them. Until MAX_CAPACITY is reached.
struct Bucket {
std::atomic<int> lock{0};
Data data;
};
std::atomic<int64_t> readIndex{0};
std::atomic<int64_t> writeIndex{0}; //expected to always be >= readIndex
std::array<Bucket, MAX_CAPACITY> buffer;
//Called from multiple updater threads
bool updateData(/* arguments */) {
while (true) {
int64_t myWriteIndex = writeIndex.load(std::memory_order_relaxed);
if (myWriteIndex == MAX_CAPACITY) {
return false; //Buffer full
}
Bucket& bucket = buffer[myWriteIndex];
int expected = 0;
if (bucket.lock.compare_exchange_strong(expected, 1, std::memory_order_relaxed)) { //Ensures processData is able to read the bucket only after any of its writers (specially the one updating bucket.data) that succeeded on the CAS releases bucket.lock
//Several threads can end up here
if (writeIndex.compare_exchange_strong(myWriteIndex, myWriteIndex + 1, std::memory_order_relaxed)) {
//Only 1 thread will be able to increment 'writeIndex'
//update bucket.data
bucket.lock.store(0, std::memory_order_release); //Sync1
break;
}
bucket.lock.store(0, std::memory_order_release);
}
}
return true;
}
//Called from just 1 consumer thread
processData() {
while (true) {
int64_t myReadIndex = readIndex.load(std::memory_order_relaxed);
if (myReadIndex == writeIndex.load(std::memory_order_relaxed)) {
return false; //No more buckets to process
}
...
Bucket& bucket = buffer[myReadIndex];
if (bucket.lock.load(std::memory_order_acquire) != 0) { //Sync1; writers lock the buffer while trying to update it
continue;
}
//process bucket.data, then destroy it
readIndex.fetch_add(1, std::memory_order_relaxed);
return true;
}
}
In updateData(): do I really need acquire/release ordering for writeIndex? I do not believe so because:
(1) bucket.lock enables acquire/release ordering between updateData() and processData(), which is enough to ensure updates to Bucket::data done by the former are visible to the latter.
(2) bucket.lock.compare_exchange_strong(), even with std::memory_order_relaxed, ensures all updater threads see the latest value of bucket.lock.
(3) In updateData(): I do not expect compiler re-ordering of the conditionals for bucket.lock.compare_exchange_strong() and writeIndex.compare_exchange_strong() as that would seem to break the "as-if" rule.
Please share your thoughts.
Follow up: updateData() can be re-written more concisely and assertively with the help of a scope lock class
struct ScopeLock {
ScopeLock(std::atomic<int>& lock_) : lock(lock_) {}
bool tryLock() {
int expected = 1;
locked = lock.compare_exchange_strong(expected, 1, std::memory_order_relaxed);
}
~ScopeLock() {
if (locked) {
lock.store(0, std::memory_order_release); //Sync1
}
}
bool locked;
std::atomic<int>& lock;
}
bool updateData(/* arguments */) {
while (true) {
int64_t myWriteIndex = writeIndex.load(std::memory_order_relaxed);
if (myWriteIndex == MAX_CAPACITY) {
return false; //Buffer full
}
Bucket& bucket = buffer[myWriteIndex];
ScopeLock sl(bucket.lock);
if (sl.tryLock() &&
writeIndex.compare_exchange_strong(myWriteIndex, myWriteIndex + 1, std::memory_order_relaxed)) { //Short-circuit guarantees compile-time reordering will not happen between the && operands
//update bucket.data
break;
}
}
}
return true;
}
So regarding the scenario mentioned in one of my comments:
- the reader finds that readIndex = writeIndex = n, and starts looping.
- a writer locks Bucket::lock for bucket n (mo release) and succeeds incrementing writeIndex.
- while the writer is modifying Bucket::data (i.e. ScopeLock destructor has not run yet), the reader sees that writeIndex is now = n+1, so it can load Bucket::lock (mo acquire).
Given the acquire/release ordering on Bucket::lock and the fact that reader already saw writeIndex incremented to n+1: would the C++ ISO or any cache coherent architecture allow for the reader to see/load a value of 0 for the memory location corresponding to (buffer[myReadIndex = n]).lock ?
PS RMW handling of cache lines discussed here: Synchronization of std::atomic::compare_exchange_weak
PPS I'm writing this code for x86-64, but I'm still quite interested in understanding what the minimal ordering is that will ensure correctness