Mutlithreading: synchronize threads to perform several steps race condition

183 Views Asked by At

I want to create 15 threads and have them performed 4 successive steps (that I call Init, Process, Terminate and WriteOutputs).
For each step I want all threads to finish it before passing to the following step.

I am trying to implement it (cf code below) using a std::condition_variable and calling the wait() and notify_all() methods but somehow I do not manage to do it and even worse I have a race condition when counting the number of operations done (which should be 15*4 = 60) I sometimes have some prints that are indeed not printed and the m_counter in my class at the end is less than 60 which should not be the case

I use two std::mutex objects: one for printing messages and another one for the step synchronization

Could someone explain to me the problem?
What would be a solution ?
Many thanks in advance

#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable> 
#include<vector>
#include<functional>

class MTHandler
{ 
  public:
    MTHandler(){
      // 15 threads 
      std::function<void(int)> funcThread = std::bind(&MTHandler::ThreadFunction, this, std::placeholders::_1);
      for (int i=0; i<15; i++){
        m_vectThreads.push_back(std::thread(funcThread,i));
      }
      for (std::thread & th : m_vectThreads) {
        th.join();
      }

      std::cout << "m_counter = " << m_counter << std::endl; 
    }
  
  private: 
     enum class ManagerStep{
      Init, 
      Process,
      Terminate,
      WriteOutputs,
    };

    std::vector<ManagerStep> m_vectSteps = {
      ManagerStep::Init, 
      ManagerStep::Process,
      ManagerStep::Terminate,
      ManagerStep::WriteOutputs
    };

    unsigned int m_iCurrentStep = 0 ; 

    unsigned int m_counter = 0;

    std::mutex m_mutex;
    std::mutex m_mutexStep;
    std::condition_variable m_condVar;

    bool m_finishedAllSteps    = false;
    unsigned int m_nThreadsFinishedStep = 0;
    std::vector<std::thread> m_vectThreads = {};

    void ThreadFunction (int id) {
      while(!m_finishedAllSteps){
        m_mutex.lock();
        m_counter+=1;
        m_mutex.unlock();
        switch (m_vectSteps[m_iCurrentStep])
        {
          case ManagerStep::Init:{
            m_mutex.lock();
            std::cout << "thread " << id  << " --> Init step" << "\n";
            m_mutex.unlock();
            break;
          }
          case ManagerStep::Process:{
            m_mutex.lock();
            std::cout << "thread " << id  << " --> Process step" << "\n";
            m_mutex.unlock();
            break;
          }
          case ManagerStep::Terminate:{
            m_mutex.lock();
            std::cout << "thread " << id  << " --> Terminate step" << "\n";
            m_mutex.unlock();
            break;
          }
          case ManagerStep::WriteOutputs:{
            m_mutex.lock();
            std::cout << "thread " << id  << " --> WriteOutputs step" << "\n";
            m_mutex.unlock();
            break;
          }
          default:
          {
            break;
          }
        }

        unsigned int iCurrentStep = m_iCurrentStep; 
        bool isCurrentStepFinished = getIsFinishedStatus();
        if (!isCurrentStepFinished){
          // wait for other threads to finish current step
          std::unique_lock<std::mutex> lck(m_mutexStep);
          m_condVar.wait(lck, [iCurrentStep,this]{return iCurrentStep != m_iCurrentStep;});
        }
      }
    }

    bool getIsFinishedStatus(){
      m_mutexStep.lock();
      bool isCurrentStepFinished = false;
      m_nThreadsFinishedStep +=1;
      if (m_nThreadsFinishedStep == m_vectThreads.size()){
        // all threads have completed the current step 
        // pass to the next step
        m_iCurrentStep += 1;
        m_nThreadsFinishedStep = 0;
        m_finishedAllSteps = (m_iCurrentStep == m_vectSteps.size());
        isCurrentStepFinished = true;
      }
      if (isCurrentStepFinished){m_condVar.notify_all();}
      m_mutexStep.unlock();
      return isCurrentStepFinished;
    }
};

int main ()
{
  
  MTHandler mt;

  return 0;
}
0

There are 0 best solutions below