Java - Execute multiple Runnables in parallel and trigger to timeout some of long running tasks?

782 Views Asked by At

I've a list of Runnable tasks (e.g. it contains 100 tasks, and each task takes randomly 1 - 10 seconds). The tasks must run in parallel and from the thread pool provided by ExecutorService (e.g. my system has 4 CPUs, then it should run 4 tasks at the same time).

The question is: I wanted to know which tasks took longer than 5 seconds to run from the list of 100 tasks and they should be terminated (with logs of task ids) after 5 seconds to have places for other tasks.

I've looked at Future with executorService.submit(Runnable task) but the Future.get() method will block the main thread and it is not what I wanted. Any suggestions would be great.

public class TestExecutorService {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 10);

    public static void main(String[] args) throws InterruptedException {
        List<Callable<Object>> tasks = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            int finalI = i;
            int min = 1;
            int max = 8;

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        int sleepTime = min + (int)(Math.random() * ((max - min) + 1));
                        System.out.println("## Thread: " + finalI + " will sleep: " + sleepTime + " seconds.");
                        Thread.sleep(sleepTime * 1000);
                        System.out.println("## Thread: " + finalI + " finished after: " + sleepTime + " seconds");
                    } catch (InterruptedException e) {
                        System.out.println("Thread is cancelled!");
                    }
                }
            };

            tasks.add(Executors.callable(runnable));
        }

        // How to make a Runnable task timeout after 5 seconds when running other tasks in parallel
        // instead of total time for 100 tasks in 5 seconds?
        executorService.invokeAll(tasks, 5, TimeUnit.SECONDS);

        executorService.shutdown();
    }
}
2

There are 2 best solutions below

0
matt On BEST ANSWER

Lets say you have some list of tasks, and an Executor to

List<Runnable> tasks = ...;
ExecutorService executor = ...;

Now you want to perform each task, get the time it takes, and cancel the task if it takes too long. I would suggest scheduling a time out action.

ScheduledExecutorService timeoutService = Executors.newSingleThreadScheduledExecutor();

Now when you're submitting your tasks.

List<Future<Long>> results = new ArrayList<>();
for(int i = 0; i<tasks.size(); i++){
    Runnable task = tasks.get(i);
    Future<Long> future = executor.submit( () ->{
        long start = System.currentTimeMillis();
        task.run();
        return System.currentTimeMillis() - start;
    });
    Future<?> timeout = timeoutService.schedule( ()->{
        if(!future.isDone()){
            future.cancel(true);
        }
    }, 5, TimeUnit.SECONDS);
    results.add(future);
}

Now you can just go through results and call get when all of the tasks have finished, either exceptionally or normally, you will finish going through the results list. This assumes your tasks can be cancelled or interrupted If they cannot, then you can use the timeout futures.

4
Bằng Rikimaru On

The idea to solve my problem is to use another ExecutorService single-thread which will invoke in each Runnable task. It has its own timeout so it will not interfere with the thread pools with other tasks.

Here is the full code:

package com.company;

import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class TestThreads {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    public static void main(String[] args) throws InterruptedException {


        List<Callable<Object>> tasks = new ArrayList<>();
        List<Runnable> runnables = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            int finalI = i;
            int min = 1;
            int max = 20;

            Runnable runnable = new Runnable() {
                @Override
                public void run() {

                    String threadName = "### Thread: " + finalI;

                    long maxTime = 5000;

                    ExecutorService executorServiceTmp = Executors.newSingleThreadExecutor();
                    Callable<Object> callable = () -> {
                        int sleepTime = min + (int) (Math.random() * ((max - min) + 1));
                        System.out.println("## Thread: " + finalI + " will sleep: " + sleepTime + " seconds.");
                        Thread.sleep(sleepTime * 1000);
                        System.out.println("## Thread: " + finalI + " finished after: " + sleepTime + " seconds");
                        return null;
                    };

                    long startTime = System.currentTimeMillis();

                    try {
                        executorServiceTmp.invokeAll(Arrays.asList(callable), maxTime, TimeUnit.MILLISECONDS);
                    } catch (Exception e) {
                        System.out.println("Thread: " + threadName + " is cancelled.");
                    } finally {
                        executorServiceTmp.shutdown();
                    }

                    long endTime = System.currentTimeMillis();
                    long totalTime = endTime - startTime;

                    if (totalTime >= maxTime) {
                        System.out.println("(!) Thread: " + threadName + " is cancelled after " + maxTime + " ms");
                    }

                }
            };

            tasks.add(Executors.callable(runnable));
            runnables.add(runnable);
        }

        executorService.invokeAll(tasks);

        System.out.println("### All threads fininshed");

        executorService.shutdown();

    }
}