barrier and wait using atomic_bool and atomic_int

71 Views Asked by At

I have a method to wait for all of the threads to arrive after which some action is taken.

What is wrong with the implementation. I see kind of deadlock where all 4 threads are waiting at waitForAllThreads, atomic_flag_guard is true but atomic_counter is only 2.

#include <iostream>
#include <pthread.h>
#include <stdio.h>
#include <atomic>
#include <thread>
#define TOTAL_THREADS 16
using namespace std;
volatile atomic_bool atomic_flag_guard = 0; // 0th thread decide if need to lock if certain %age of processing is done.
volatile atomic_int atomic_counter = 0; // to track number of thread arrived.
volatile atomic_int all_processed_object_count = 0; // number of object processed.
volatile atomic_int total_number_thread = TOTAL_THREADS ;
struct thread_local_data {
  int t_index = 0;
  int total_thread = 0;
};

void waitForAllThread(int thread_index, int &stop_index, size_t interval)
{
  if(thread_index == 0 && all_processed_object_count >= stop_index) {
    atomic_flag_guard.store(true);
    printf("guard up\n");
    while(atomic_counter != total_number_thread - 1);
    // do the task
    atomic_counter.store(0);
    stop_index += interval;
    atomic_flag_guard.store(false);
  } else if( thread_index != 0 && atomic_flag_guard.load()) {
    ++atomic_counter;
    printf("arrived %d total arrived %d\n", thread_index, atomic_counter.load());
    while(atomic_flag_guard.load());
  }
}
void* thread_processing(void *thread_data)
{
    thread_local_data *tld = static_cast<thread_local_data*>(thread_data);
    int stop_index = 100;
    int interval = stop_index;
    for(int i = 0 ; i < 1000; ++i)
    {
        std::this_thread::sleep_for(2ms);
        ++all_processed_object_count;
        waitForAllThread(tld->t_index, stop_index, interval);
    }
    printf("Exit thread %d\n", tld->t_index);
    --total_number_thread;
    return nullptr;
}
int main() {
  thread_local_data tld[TOTAL_THREADS ];
  for(int i =0; i < TOTAL_THREADS ; ++i){tld[i].total_thread = TOTAL_THREADS ; tld[i].t_index = i;}

    pthread_t threads[TOTAL_THREADS ];

    for (int i = 0; i < TOTAL_THREADS ; i++)
    {
      auto *obj = &tld[i];
      void *userData = static_cast<void*>(obj);
      pthread_create(threads + i, NULL, thread_processing, userData);
    }
    for (int i = 0; i < TOTAL_THREADS ; i++)
    {
      pthread_join(threads[i], NULL);
    }
    return 0;
}

compiled using /pkgs/gccv9.3.0p4/bin/g++ thread_barrier_impl.cpp -std=gnu++17 -lpthread And it hangs in few trials.

I see program is hanging/dead lock?. Hooked up it in gdb and observed that all 16 threads are waiting in waitForAllThreads, atomic_flag_guard is true but atomic_counter is only 2(should have been 15).

1

There are 1 best solutions below

0
Pepijn Kramer On

Something like this :

#include <condition_variable>
#include <mutex>
#include <iostream>

class latch_t
{
public:
    latch_t(std::size_t count) :
        m_count{ count }
    {
    }

    ~latch_t()
    {
    }

    void synchronize()
    {
        std::unique_lock<std::mutex> lock(m_mtx);
        if (m_count > 0)
        {
            --m_count;
            m_cv.notify_all();
        }

        m_cv.wait(lock, [this] { return m_count == 0; });
    }

private:
    std::mutex m_mtx;
    std::condition_variable m_cv;
    std::size_t m_count;
};

int main()
{
    {
        latch_t latch{ 3ul };
        std::jthread t1{[&latch]
        {
            std::cout << "1\n";
            latch.synchronize();
            std::cout << "1\n";
        }};

        std::jthread t2{[&latch]
        {
            std::cout << "2\n";
            latch.synchronize();
            std::cout << "2\n";
        }};

        std::jthread t3{[&latch]
        {
            std::cout << "3\n";
            latch.synchronize();
            std::cout << "3\n";
        }};
    }
    std::cout << std::flush;

    return 0;
}