Most efficient way to signal consumer thread from multiple producer threads using condition variables

92 Views Asked by At

I have N work threads that does a calculation that one other consumer thread is waiting for. I am doing this with a condition_variable (CV) and an atomic counter that starts at N, each worker decreases it and the one reaching 0 signals the consumer using the CV. According to mutiple sources (example), the "shared variable" (the counter in my case) needs to be modified while holding the mutex. You can thereafter signal the CV after releasing the mutex.

I could do this, but it would mean each worker thread will try to aquire the mutex which may cause undesired contention. I'd like to do the decrement without the mutex (since its atomic that should not be a race condition), and only aquire the mutex for the last worker thread, the one actually sending the signal.

Working example:

constexpr int N_WORKERS = 10;

struct WorkData
{
    int input[N_WORKERS];
    int result[N_WORKERS];
    std::atomic<int> remainingWorkers;
    std::condition_variable cv;
    std::mutex mutex;  
};

void workerFunc(WorkData* wd, int index)
{
    // Do some calculation here
    wd->result[index] = wd->input[index] * wd->input[index];
    //-------------------------
    if (--wd->remainingWorkers == 0)
    {
        wd->mutex.lock();
        wd->mutex.unlock();
        
        wd->cv.notify_one();
    }
}

int main()
{
    WorkData wd;
    wd.remainingWorkers.store(N_WORKERS);

    std::thread workerThreads[N_WORKERS];
    for (int i = 0; i < N_WORKERS; i++)
    {
        wd.input[i] = i;
        wd.result[i] = 0;
        workerThreads[i] = std::thread(workerFunc, &wd, i);
    }
    // Wait for the worker threads to finish unless they already have
    if (wd.remainingWorkers.load() > 0)
    {
        std::unique_lock<std::mutex> lock(wd.mutex);
        while (wd.remainingWorkers.load() > 0)
            wd.cv.wait(lock);
    }
    // Consume result of calculations
    for (int i = 0; i < N_WORKERS; i++)
        std::cout << wd.input[i] << "^2 = " << wd.result[i] << std::endl;
    
    for (std::thread& t : workerThreads)
        t.join();
    return 0;
}

I know I still need to aquire the mutex from the singalling thread in order to ensure the signal isnt sent between the points where the waiting thread checks the atomic and where it enters the condition variable wait, but I see no reason why the worker thread cant modify the counter before aquiring the mutex?

Is this safe, or is there a race condition here that I have missed?

2

There are 2 best solutions below

2
Alan Birtles On

There is a race condition as the notification can be raised between wd.remainingWorkers.load() > 0) and wd.cv.wait(lock);. This would lead to wait never completing as the notification would be missed. See https://en.cppreference.com/w/cpp/thread/condition_variable which states:

Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.

You can remove the mutex contention by separating the remaining worker count and the fact that the work is complete. That way only the worker that completes the last calculation needs to lock the mutex and raise the notification:

#include <atomic>
#include <mutex>
#include <condition_variable>
#include <iostream>

constexpr int N_WORKERS = 10;

struct WorkData
{
    int input[N_WORKERS];
    int result[N_WORKERS];
    std::atomic<int> remainingWorkers;
    bool complete;
    std::condition_variable cv;
    std::mutex mutex;  
};

void workerFunc(WorkData* wd, int index)
{
    // Do some calculation here
    wd->result[index] = wd->input[index] * wd->input[index];
    //-------------------------
    if (--wd->remainingWorkers == 0)
    {
        wd->mutex.lock();
        wd->complete = true;
        wd->mutex.unlock();
        
        wd->cv.notify_one();
    }
}

int main()
{
    WorkData wd;
    wd.remainingWorkers.store(N_WORKERS);
    wd.complete = false;

    std::thread workerThreads[N_WORKERS];
    for (int i = 0; i < N_WORKERS; i++)
    {
        wd.input[i] = i;
        wd.result[i] = 0;
        workerThreads[i] = std::thread(workerFunc, &wd, i);
    }
    // Wait for the worker threads to finish unless they already have
    std::unique_lock<std::mutex> lock(wd.mutex);
    wd.cv.wait(lock, [&]{ return wd.complete; });
    // Consume result of calculations
    for (int i = 0; i < N_WORKERS; i++)
        std::cout << wd.input[i] << "^2 = " << wd.result[i] << std::endl;
    
    for (std::thread& t : workerThreads)
        t.join();
    return 0;
}
5
Solomon Slow On

Not an answer, Just a comment with embedded code...


I said,

if each [producer thread] only keeps the mutex locked just long enough to decrement the counter, then the chance of any of them finding the mutex locked is extremely small.

You replied,

@SolomonSlow a small chance repeated enough times will happen. In the situation I am looking at, this pattern occurs several hundreds of times per second,...

So, I tried this on my 2020 Apple Mac Mini, (Apple clang version 14.0.3)

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

using std::chrono::steady_clock;
using std::chrono::duration_cast;
using std::chrono::microseconds;

static std::mutex mx;
volatile long count_a = 1000000;
//static long count_b = 1000000;

static void make_a_thread() {
    std::thread t([](){
        mx.lock();
        std::cout << "ho!\n";
        mx.unlock();
    });
    t.join();
}

static void gopher_it() {
    while (count_a > 0) {
        count_a -= 1;
//      mx.lock();
//      count_b -= 1;
//      mx.unlock();
    }
}

int main(int argc, char *argv[]) {
    std::cout << "yo!\n";
    
    make_a_thread();

    auto t_begin = steady_clock::now();
    gopher_it();
    auto t_end = steady_clock::now();
    auto duration = duration_cast<microseconds>(t_end - t_begin);
    std::cout << "that took " << duration.count() << ".\n";
    return 0;
}

It printed,

g++ -std=c++17 -g -o bizzy bizzy.cpp && ./bizzy && ./bizzy && ./bizzy && ./bizzy && ./bizzy
yo!
ho!
that took 3032.
yo!
ho!
that took 3702.
yo!
ho!
that took 3669.
yo!
ho!
that took 3423.
yo!
ho!
that took 3193.

Then, I uncommented the four commented-out lines, and I ran it again:

g++ -std=c++17 -g -o wizzy wizzy.cpp && ./wizzy && ./wizzy && ./wizzy && ./wizzy && ./wizzy
yo!
ho!
that took 21606.
yo!
ho!
that took 17846.
yo!
ho!
that took 14029.
yo!
ho!
that took 11837.
yo!
ho!
that took 10497.

I calculate that my code, on average, took less than 12 nanoseconds, to lock an uncontested lock, decrement a variable, and unlock the lock.

You said, "...hundreds of times per second..." Let's make that, for sake of argument, 600 times per second. That would be 1.67 million nanoseconds per iteration. If any one thread has the lock locked for 12 nanoseconds out of 1.67 million, that's about 0.00072 percent of the time.

IMO, that's a pretty small fraction.