I have N work threads that does a calculation that one other consumer thread is waiting for. I am doing this with a condition_variable (CV) and an atomic counter that starts at N, each worker decreases it and the one reaching 0 signals the consumer using the CV. According to mutiple sources (example), the "shared variable" (the counter in my case) needs to be modified while holding the mutex. You can thereafter signal the CV after releasing the mutex.
I could do this, but it would mean each worker thread will try to aquire the mutex which may cause undesired contention. I'd like to do the decrement without the mutex (since its atomic that should not be a race condition), and only aquire the mutex for the last worker thread, the one actually sending the signal.
Working example:
constexpr int N_WORKERS = 10;
struct WorkData
{
int input[N_WORKERS];
int result[N_WORKERS];
std::atomic<int> remainingWorkers;
std::condition_variable cv;
std::mutex mutex;
};
void workerFunc(WorkData* wd, int index)
{
// Do some calculation here
wd->result[index] = wd->input[index] * wd->input[index];
//-------------------------
if (--wd->remainingWorkers == 0)
{
wd->mutex.lock();
wd->mutex.unlock();
wd->cv.notify_one();
}
}
int main()
{
WorkData wd;
wd.remainingWorkers.store(N_WORKERS);
std::thread workerThreads[N_WORKERS];
for (int i = 0; i < N_WORKERS; i++)
{
wd.input[i] = i;
wd.result[i] = 0;
workerThreads[i] = std::thread(workerFunc, &wd, i);
}
// Wait for the worker threads to finish unless they already have
if (wd.remainingWorkers.load() > 0)
{
std::unique_lock<std::mutex> lock(wd.mutex);
while (wd.remainingWorkers.load() > 0)
wd.cv.wait(lock);
}
// Consume result of calculations
for (int i = 0; i < N_WORKERS; i++)
std::cout << wd.input[i] << "^2 = " << wd.result[i] << std::endl;
for (std::thread& t : workerThreads)
t.join();
return 0;
}
I know I still need to aquire the mutex from the singalling thread in order to ensure the signal isnt sent between the points where the waiting thread checks the atomic and where it enters the condition variable wait, but I see no reason why the worker thread cant modify the counter before aquiring the mutex?
Is this safe, or is there a race condition here that I have missed?
There is a race condition as the notification can be raised between
wd.remainingWorkers.load() > 0)andwd.cv.wait(lock);. This would lead towaitnever completing as the notification would be missed. See https://en.cppreference.com/w/cpp/thread/condition_variable which states:You can remove the mutex contention by separating the remaining worker count and the fact that the work is complete. That way only the worker that completes the last calculation needs to lock the mutex and raise the notification: