How to ensure that all the runnables get fired concurrently, using a ScheduledExecutorService in Java

72 Views Asked by At

I'm trying to implement a Java program that prepares a bunch of runnables for databases(there are hundreds of them, and I want to allocate a thread for each database), and run each of those runnables using a ScheduledExecutorService, with an initial delay and some delay in between executions. This is what I've managed so far:

public static void main(String[] args) throws Exception {

        // Get db name and info as key-value pairs
        Map<String, DbInfo> dbInfoMap = getDbInfoMap();;
        
        // Initialize the scheduled executor service.
        final ScheduledExecutorService executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());


        // For each database, get a list of runnable and process accordingly.
        dbInfoMap.forEach((db, dbInfo) -> {
            // Get a list of runnable(s).
            List<Runnable> runnableList = new MigrationTaskManager(db, dbInfo).getRunnableTasksForDB();
            // Schedule all the runnable(s) using the scheduled executor service, and a delay
            runnableList.forEach(runnable -> executorService
                    .scheduleWithFixedDelay(runnable, INITIAL_DELAY, DELAY, TimeUnit.MILLISECONDS));
        });
    }
}

This sort of does what I intend, albeit with some caveats (It takes a really long time to start processing some databases if the list is too long). How can I implement it so that all the runnables for each database get fired at the same time? For instance, If I have 100 databases to read from, how can I ensure that 100 threads get fired with a bunch of runnables in each thread when the program starts? I'd really appreciate some help with this.

1

There are 1 best solutions below

0
Basil Bourque On

tl;dr

For maximum efficiency, pass a collection of your hundreds of tasks to an executor service assigning one virtual thread per task.

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
executorService.invokeAll ( collectionOfTasks )

Scheduling of a thread’s execution is out of your control

Your title says:

ensure that all the runnables get fired concurrently

If you mean that you want all of the Runnable tasks to execute simultaneously, that is not possible using Java threads.

Platform threads

In Java, you ask for a thread to be executed, but when execution begins, and how long execution lasts, is up to the whims of the host OS’ thread scheduler. Threads in today’s Java are each mapped to a host OS thread. (Virtual threads in upcoming Java 21+ will be quite different than these “platform” threads.) A Java programmer has no control over when a thread runs.

You can collect a bunch of tasks to be submitted together to an executor service. Pass the collection of tasks ExecutorService#invokeAll. But invokeAll does not mean they will be executed together. They can be executed in any order, with each task’s execution beginning at any time, and completing at any time. The invokeAll method is merely a convenience for you the programmer if you have chosen to instantiate all your tasks up front. You could just as well write a loop that submits each instantiated task object to the executor service, one task at a time.

Here is an example app to demonstrate passing a collection of Callable objects to the invokeAll method of an ExecutorService.

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

public class ManyDatabases
{
    public static void main ( String[] args )
    {
        ManyDatabases app = new ManyDatabases ( );
        app.demo ( );
    }

    private void demo ( )
    {
        System.out.println ( "INFO - Demo done. " + Instant.now ( ) );
        // Dummy data
        List < DatabaseInfo > databaseInfos =
                IntStream
                        .rangeClosed ( 1 , 1_000 )
                        .mapToObj ( ( int index ) ->
                                new DatabaseInfo (
                                        ENGINE.values ( )[ ThreadLocalRandom.current ( ).nextInt ( ENGINE.values ( ).length ) ] ,
                                        index
                                )
                        )
                        .toList ( );
        System.out.println ( "databaseInfos = " + databaseInfos );

        // Make tasks.
        List < DbTask > tasks = databaseInfos.stream ( ).map ( DbTask :: new ).toList ( );

        // Execute tasks.
        try (
                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
        )
        {
            try
            {
                List < Future < Boolean > > futures = executorService.invokeAll ( tasks );
            }
            catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
        }
        System.out.println ( "INFO - Demo done. " + Instant.now ( ) );
    }
}

enum ENGINE { POSTGRES, MS_SQL_SERVER, MYSQL }

record DatabaseInfo( ENGINE engine , int address ) { }

class DbTask implements Callable < Boolean >
{
    final private DatabaseInfo databaseInfo;

    public DbTask ( final DatabaseInfo databaseInfo )
    {
        this.databaseInfo = databaseInfo;
    }

    @Override
    public Boolean call ( )
    {
        System.out.println ( Instant.now ( ) + " TASK Simulating update to database: " + this.databaseInfo );
        try { Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 400 , 800 ) ) ); /* Simulate real work being done. */ } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
        return true;
    }
}

When run.

INFO - Demo done. 2023-09-01T01:54:11.199225Z
databaseInfos = [DatabaseInfo[engine=MS_SQL_SERVER, address=1], DatabaseInfo[engine=POSTGRES, address=2], DatabaseInfo[engine=POSTGRES, address=3], DatabaseInfo[engine=POSTGRES, address=4], DatabaseInfo[engine=POSTGRES, address=5], DatabaseInfo[engine=MYSQL, address=6], DatabaseInfo[engine=MYSQL, address=7]]
2023-09-01T01:54:11.229579Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=5]
2023-09-01T01:54:11.229285Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=2]
2023-09-01T01:54:11.229423Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=4]
2023-09-01T01:54:11.229342Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=3]
2023-09-01T01:54:11.229840Z TASK Simulating update to database: DatabaseInfo[engine=MYSQL, address=7]
2023-09-01T01:54:11.229797Z TASK Simulating update to database: DatabaseInfo[engine=MYSQL, address=6]
2023-09-01T01:54:11.229291Z TASK Simulating update to database: DatabaseInfo[engine=MS_SQL_SERVER, address=1]
INFO - Demo done. 2023-09-01T01:54:12.051448Z

Virtual threads

In Java 21+, we have virtual threads, an alternative to platform threads. The scheduling of virtual threads is managed within the JVM rather than by the host OS. When the JVM detects a virtual thread is blocking, the virtual thread is dismounted from the “real” platform thread where it executes. The virtual thread is parked while waiting for the blocking to resolve. In the meantime, the JVM mounts another virtual thread onto the platform thread. This means vastly greater utilization of your CPU cores as the host OS threads rarely sit idle.

Database calls always involve much blocking, taking a long time while waiting for a response. You are making hundreds of such calls at a time. So this is ideal situation for switching to virtual threads rather than platform threads.

Switching to virtual threads is utterly simple: just swap out the Executors call.

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

And we can increase our example calls to a thousand, or even a million. Virtual threads are vastly lightweight than platform threads. Millions of concurrent virtual threads is reasonable.

.rangeClosed ( 1 , 1_000 )

And let's not dump the entire databaseInfos list of a thousand. Just first and last, now easier to do as List implements SequencedCollection with convenient getFirst & getLast methods. See JEP 431: Sequenced Collections.

System.out.println ( "databaseInfos = " + databaseInfos.getFirst () + " … " + databaseInfos.getLast () );

When run:

INFO - Demo done. 2023-09-01T02:03:50.729476Z
databaseInfos = DatabaseInfo[engine=MYSQL, address=1] … DatabaseInfo[engine=POSTGRES, address=1000]
2023-09-01T02:03:50.781280Z TASK Simulating update to database: DatabaseInfo[engine=MS_SQL_SERVER, address=3]
2023-09-01T02:03:50.781686Z TASK Simulating update to database: DatabaseInfo[engine=MYSQL, address=8]
2023-09-01T02:03:50.781739Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=7]
…
2023-09-01T02:03:50.807384Z TASK Simulating update to database: DatabaseInfo[engine=POSTGRES, address=1000]
2023-09-01T02:03:50.807377Z TASK Simulating update to database: DatabaseInfo[engine=MYSQL, address=997]
2023-09-01T02:03:50.806946Z TASK Simulating update to database: DatabaseInfo[engine=MS_SQL_SERVER, address=951]
INFO - Demo done. 2023-09-01T02:03:51.638269Z