Flexible CountDownLatch can't use Phaser because of limit

375 Views Asked by At

I'm receiving a big file, with N entries. For each entry, I'm creating a new thread. I need to wait for all the N threads to be terminated.

At the beginning I was using Phaser, but its implementation is limited to 65K parties. So, is blowing up because N could be like 100K.

Then, I tried CountDownLatch. This works fine, very simple concept and very simple implementation. But I don't know the number of N.

Phaser is my solution, but is has that limit.

Any ideas?

This post is related: Flexible CountDownLatch?

3

There are 3 best solutions below

1
Zoltan On BEST ANSWER

Sounds like the problem you are trying to solve is processing a large amount of tasks as fast as possible and wait for the processing to finish.

The problem with processing a large amount of tasks simultaneously is that it could cause too many context switches and would essentially cripple your machine and slow the processing down above a certain (hardware dependent) number of concurrent threads. This means you need to have an upper limit on the concurrent working threads being executed.

Phaser and CountDownLatch are both synchronisation primitives, their purpose is providing access control to critical code blocks, not managing parallel execution.

I would use an Executor service in this case. It supports the addition of tasks (in many forms, including Runnable).

You can easily create an ExecutorService using the Executors class. I'd recommend using a fixed size thread pool for this with 20-100 max threads - depending on how CPU intensive your tasks are. The more computation power required for a task, the less number of parallel threads can be processed without serious performance degradation.

There are multiple ways to wait for all the tasks to finish:

  • Collect all the Future instances returned by the submit method and simply call get on all of them. This ensures that each of the tasks are executed by the time your loop finished.
  • Shut down the executor service and wait for all the submitted tasks to finish. The downside of this method is that you have to specify a maximum amount of time to wait for the tasks to finish. Also, it's less elegant, you don't always want to shut the Executor down, it depends on if you're writing a single shot application or a server which keeps running afterwards - in case of a server app, you'll definitely have to go with the previous approach.

Finally, here is a code snippet illustrating all this:

List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);

for(TaskFromFile task : tasks) {
    // createRunnable is not necessary in case your task implements Runnable
    executor.submit(createRunnable(task));
}

// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
8
Zbynek Vyskovsky - kvr000 On

With AtomicInteger you can easily achieve the same. Initialize with 1 and increment with every new thread. Once done in both worker and producer, decrement and get. If zero run your finishing Runnable.

0
Matej Tymes On

ReusableCountLatch is a CountDownLatch alternative that allows increments as well.

The usage is as follows:

ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10

latch.increment(); // increments counter

latch.decrement(); // decrement counter

latch.waitTillZero(); // blocks until counts falls to zero

boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero

int count = latch.getCount(); // gets actual count

To use it just add this gradle/maven dependency to your project: 'com.github.matejtymes:javafixes:1.3.1'

More details can be found here: https://github.com/MatejTymes/JavaFixes