Minimal memory ordering to ensure functionality and prevent compiler re-ordering in a lock-free non-circular MPSC queue

145 Views Asked by At

The following API is used by multiple threads updating buckets in a buffer, while a single consumer processes them. Until MAX_CAPACITY is reached.

    struct Bucket {
        std::atomic<int> lock{0};
        Data data;
    };

    std::atomic<int64_t> readIndex{0};
    std::atomic<int64_t> writeIndex{0}; //expected to always be >= readIndex
    std::array<Bucket, MAX_CAPACITY> buffer;

    //Called from multiple updater threads
    bool updateData(/* arguments */) {
        while (true) {
            int64_t myWriteIndex = writeIndex.load(std::memory_order_relaxed);
            if (myWriteIndex == MAX_CAPACITY) {
                return false; //Buffer full
            }
            Bucket& bucket = buffer[myWriteIndex];
            int expected = 0;
            if (bucket.lock.compare_exchange_strong(expected, 1, std::memory_order_relaxed)) { //Ensures processData is able to read the bucket only after any of its writers (specially the one updating bucket.data) that succeeded on the CAS releases bucket.lock
                //Several threads can end up here
                if (writeIndex.compare_exchange_strong(myWriteIndex, myWriteIndex + 1, std::memory_order_relaxed)) {
                    //Only 1 thread will be able to increment 'writeIndex'
                    //update bucket.data 
                    bucket.lock.store(0, std::memory_order_release); //Sync1
                    break;
                }
                bucket.lock.store(0, std::memory_order_release);
            }
        }
        return true;
    }

    //Called from just 1 consumer thread
    processData() {
        while (true) {
            int64_t myReadIndex = readIndex.load(std::memory_order_relaxed);
            if (myReadIndex == writeIndex.load(std::memory_order_relaxed)) {
                return false; //No more buckets to process
            }
            ...
            Bucket& bucket = buffer[myReadIndex];
            if (bucket.lock.load(std::memory_order_acquire) != 0) { //Sync1; writers lock the buffer while trying to update it
                continue;
            }
            //process bucket.data, then destroy it
            readIndex.fetch_add(1, std::memory_order_relaxed);
            return true;
       }
    }

In updateData(): do I really need acquire/release ordering for writeIndex? I do not believe so because:

(1) bucket.lock enables acquire/release ordering between updateData() and processData(), which is enough to ensure updates to Bucket::data done by the former are visible to the latter.

(2) bucket.lock.compare_exchange_strong(), even with std::memory_order_relaxed, ensures all updater threads see the latest value of bucket.lock.

(3) In updateData(): I do not expect compiler re-ordering of the conditionals for bucket.lock.compare_exchange_strong() and writeIndex.compare_exchange_strong() as that would seem to break the "as-if" rule.

Please share your thoughts.

Follow up: updateData() can be re-written more concisely and assertively with the help of a scope lock class

    struct ScopeLock {
        ScopeLock(std::atomic<int>& lock_) : lock(lock_) {}
        bool tryLock() {
            int expected = 1;
            locked = lock.compare_exchange_strong(expected, 1, std::memory_order_relaxed);
        } 
        ~ScopeLock() {
            if (locked) {
                lock.store(0, std::memory_order_release); //Sync1
            }
        }
        bool locked;
        std::atomic<int>& lock;
    }

    bool updateData(/* arguments */) {
        while (true) {
            int64_t myWriteIndex = writeIndex.load(std::memory_order_relaxed); 
            if (myWriteIndex == MAX_CAPACITY) {
                return false; //Buffer full
            }
            Bucket& bucket = buffer[myWriteIndex];
            ScopeLock sl(bucket.lock);
            if (sl.tryLock() && 
                writeIndex.compare_exchange_strong(myWriteIndex, myWriteIndex + 1, std::memory_order_relaxed)) { //Short-circuit guarantees compile-time reordering will not happen between the && operands
                    //update bucket.data 
                    break;
                }
            }
        }
        return true;
    }

So regarding the scenario mentioned in one of my comments:

  • the reader finds that readIndex = writeIndex = n, and starts looping.
  • a writer locks Bucket::lock for bucket n (mo release) and succeeds incrementing writeIndex.
  • while the writer is modifying Bucket::data (i.e. ScopeLock destructor has not run yet), the reader sees that writeIndex is now = n+1, so it can load Bucket::lock (mo acquire).

Given the acquire/release ordering on Bucket::lock and the fact that reader already saw writeIndex incremented to n+1: would the C++ ISO or any cache coherent architecture allow for the reader to see/load a value of 0 for the memory location corresponding to (buffer[myReadIndex = n]).lock ?

PS RMW handling of cache lines discussed here: Synchronization of std::atomic::compare_exchange_weak

PPS I'm writing this code for x86-64, but I'm still quite interested in understanding what the minimal ordering is that will ensure correctness

0

There are 0 best solutions below