Check if ConcurrentHashMap.forEach(PARALLELISM_THR, (k, v)->) has completed

75 Views Asked by At

is there a way to know if a parallel foreach on a concurrent hashmap has ended

here is an example of what I want to do:

import java.util.concurrent.ConcurrentHashMap;
import java.util.Random;
public class Main
{
    public static void main(String[] args) {
     
        System.out.println("Hello World");
        
        var mainMap = new ConcurrentHashMap<Integer, String>();
        
         Random rnd = new Random();
        
        //I don't know from the beginning how many objects
        for(int i=0; i<rnd.nextInt(100); i++){
            mainMap.put(i,"elem "+i);
        }
        
        
        
            mainMap.forEach(1, (k, v) -> {
                
                //this modelize a sub-task my application executes asynchronously
                Thread t = new Thread(()->{
                System.out.println("Object "+k+" started working <" + v+">");
                
                try {
                    Thread.sleep(k*500);    
                    } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
            }
        
        
                System.out.println("Object "+k+" done after "+(k*500)+" ms");
                });
                
                t.start();
            });
            
            //I want to print this only after all jobs are finished
            //but still I don't want to block the main (GUI) thread
            System.out.println("All job done !");
            
       return;
    }
}

right now an output is like this:

Hello World
All job done !
Object 0 started working <elem 0>
Object 2 started working <elem 2>
Object 4 started working <elem 4>
Object 3 started working <elem 3>
Object 1 started working <elem 1>
Object 0 done after 0 ms
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 done after 1500 ms

But I expect something like this:

Hello World
Object 0 started working <elem 0>
Object 2 started working <elem 2>
Object 4 started working <elem 4>
Object 3 started working <elem 3>
Object 1 started working <elem 1>
Object 0 done after 0 ms
Object 4 done after 2000 ms
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 done after 1500 ms
All job done !
2

There are 2 best solutions below

0
tgdavies On

A simple way to do that is to collect all the Threads into a list, then join each of them (in a separate Thread, so you don't block the main thread)

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

public class X {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World");

        var mainMap = new ConcurrentHashMap<Integer, String>();

        Random rnd = new Random();

        //I don't know from the beginning how many objects
        for (int i = 0; i < rnd.nextInt(100); i++) {
            mainMap.put(i, "elem " + i);
        }

        List<Thread> threads = new ArrayList<>();

        mainMap.forEach(1, (k, v) -> {

            //this modelize a sub-task my application executes asynchronously
            Thread t = new Thread(() -> {
                System.out.println("Object " + k + " started working <" + v + ">");

                try {
                    Thread.sleep(k * 500);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }


                System.out.println("Object " + k + " done after " + (k * 500) + " ms");
            });
            threads.add(t);
            t.start();
        });
        Executors.newSingleThreadExecutor().submit(() -> {
            for (Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println("All job done !");
        });
        System.out.println("Main thread not blocked");
    }
}

But I don't think your strategy of an unlimited number of threads is good -- usually there is an optimal degree of parallelism, depending on the number of processors and the amount of IO each thread performs.

I think it's better to use a thread pool:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class X {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World");

        var mainMap = new ConcurrentHashMap<Integer, String>();

        Random rnd = new Random();

        //I don't know from the beginning how many objects
        for (int i = 0; i < rnd.nextInt(100); i++) {
            mainMap.put(i, "elem " + i);
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);
        mainMap.forEach(1, (k, v) -> {
            executor.execute(() -> {
                System.out.println("Object " + k + " started working <" + v + ">");
                try {
                    Thread.sleep(k * 500);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                System.out.println("Object " + k + " done after " + (k * 500) + " ms");
            });
        });
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                executor.shutdown();
                executor.awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("All job done !");
        });
        System.out.println("Main thread not blocked");
    }
}
0
Holger On

It seems, you are completely missing the point of the API. When you call mainMap.forEach(1, …), the action is already executed in parallel and there’s no sense in creating new threads, especially not one for each element. You are subverting the entire work that has been done in the forEach implementation (the use of thread pools) and create the very problem you are now trying to solve.

The forEach method does run the actions in parallel and when it returns, the operation has completed.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

public class Main {
  public static void main(String[] args) {
    System.out.println("Hello World");
    var mainMap = new ConcurrentHashMap<Integer, String>();
    var rnd = ThreadLocalRandom.current();

    // I don't know from the beginning how many objects
    for(int i = 0; i < rnd.nextInt(100); i++) {
      mainMap.put(i, "elem " + i);
    }

    mainMap.forEach(1, (k, v) -> {
      System.out.println("Object " + k
          + " started working <" + v + "> " + Thread.currentThread());
      try {
        Thread.sleep(k * 500);
      }
      catch(InterruptedException e1) {
        e1.printStackTrace();
      }
      System.out.println("Object " + k + " done after " + (k * 500) + " ms");
    });

    System.out.println("All job done !");
  }
}
Hello World
Object 0 started working <elem 0> Thread[#1,main,5,main]
Object 0 done after 0 ms
Object 1 started working <elem 1> Thread[#1,main,5,main]
Object 2 started working <elem 2> Thread[#22,ForkJoinPool.commonPool-worker-1,5,main]
Object 4 started working <elem 4> Thread[#23,ForkJoinPool.commonPool-worker-2,5,main]
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 started working <elem 3> Thread[#22,ForkJoinPool.commonPool-worker-1,5,main]
Object 4 done after 2000 ms
Object 5 started working <elem 5> Thread[#23,ForkJoinPool.commonPool-worker-2,5,main]
Object 3 done after 1500 ms
Object 5 done after 2500 ms
All job done !

The result differs from your expectation slightly, as the waiting times for the first keys are too small.


If you want to avoid blocking the initiator thread, e.g. because it’s the event dispatch thread, submit the entire operation to a background thread:

CompletableFuture<Void> operation = CompletableFuture.runAsync(() -> {
  mainMap.forEach(1, (k, v) -> {
    System.out.println("Object " + k
        + " started working <" + v + "> " + Thread.currentThread());
    try {
      Thread.sleep(k * 500);
    }
    catch(InterruptedException e1) {
      e1.printStackTrace();
    }
    System.out.println("Object " + k + " done after " + (k * 500) + " ms");
  });
});

Then, you can poll the operation status, e.g.

System.out.println(operation.isDone()? "done.": "still running");

but also chain dependent operations like

operation.thenAccept(_void -> System.out.println("All job done !"));

or, more interesting for your actual use case,

operation.thenAcceptAsync(_void -> {
    /* update UI components */
  }, EventQueue::invokeLater);

to update the UI components in the event dispatch thread after completion while not blocking the event dispatch thread while running.