How to reliably pair a thread to an object instance in a concurrency::parallel_for?

61 Views Asked by At

I have a vector of M images that has to be processed in parallel by up to N threads, where N is a parameter set by the user.

I also have a vector of N Detector instances that take care of the processing, but each instance must run in its own thread (i.e., if two threads call detect() on the same instance before the previous call ended, bad things will happen).

Detector is a self-contained class (that I can modify if necessary) with one void Detector::detect(cv::Mat image) method I call that alters the internal state of the detector for the duration of the (lengthy) detection process (hence the need to prevent parallel calls of detect() from different threads).

I originally implemented this with OpenMP as:

#pragma omp parallel for num_threads(N)
for(int i=0; i<M; i++)
{
    detectors[omp_get_thread_num()].detect(images[i]);
}

However, since the detection can throw exceptions, I thought of using PPL's parallel_for instead, which comes with thread-originated-exception catching in the main thread.

The problem is, I can't find an equivalent of omp_get_thread_num that I can use to map a Detector to a specific thread:

concurrency::CurrentScheduler::Create( concurrency::SchedulerPolicy( 2, 
concurrency::MinConcurrency, 1, concurrency::MaxConcurrency, N ) );
concurrency::parallel_for(0, M, [&](int i)
{
    detectors[?????].detect(images[i]);
});
concurrency::CurrentScheduler::Detach(); // clear scheduler

How can I ensure that one thread always uses the same instance from the detectors pool? Or, if this is the wrong approach, how can I map the execution of detect() over the pool of detectors I already have?

1

There are 1 best solutions below

0
GPhilo On

Upon suggestion from @NathanOliver, I ended up using a concurrent_queue to solve the issue:

using namespace concurrency;
CurrentScheduler::Create( SchedulerPolicy( 2, 
MinConcurrency, 1, MaxConcurrency, N ) );
concurrent_queue<std::shared_ptr<Detector>> detectors_queue;
for(auto& det : obj->instances)
{
    detectors_queue.push(det);
}
parallel_for(0, M, [&](int i)
{
    std::shared_ptr<Detector> thread_det = nullptr;
    while(!detectors_queue.try_pop(thread_det))
    {
        wait(100);
    }
    thread_det->detect(images[i]);
    detectors_queue.push(thread_det);
});
CurrentScheduler::Detach(); // clear scheduler