Java Thread pool with always one thread running per customer

68 Views Asked by At

Usecase: I have 50+ customers and each customers activity is recorded in a separate excel file. A new file per customer is added to a shared file system location every 5 mins.

On application startup I am starting a thread to query the customer records and start one thread per customer. Customers details are stored in a database table. Using executor service thread pool and submitting a task per customer.

Each of the customer thread reads the excel file and processes each row in excel concurrently using multithreading. Each customer thread spawns a thread for each row in the excel file. Customer thread waits for all the child to complete before exiting.

I want to achieve below things in the customer executor service thread pool,

  1. Schedule a thread for each customer, which spawns threads for each row in the excel file and waits for all child threads to complete.
  2. Only one thread per customer must be running at any point of time
  3. Each run can take different amount of time based on number of entries in the excel.

Main challenge I am facing is, how to ensure only one customer thread per customer is running.

Thanks

2

There are 2 best solutions below

0
Ahmed Osama On

You have to use The ForkJoinPool API which is optimized for recursive operations. This will allow you to recursively invoke multiple threads under a main thread that deals with the Customer.

By using the previous method you can do no.2 requirement by Synced Creation of threads for each customer.

0
Basil Bourque On

A new file per customer is added to a shared file system location every 5 mins.

Run a scheduled executor service to process inputs every five minutes.

ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor() ;
Runnable checkForMoreCustomerfilesTask = … ;
ses.scheduleAtFixedRate(
    checkForMoreCustomerfilesTask, 
    0, 
    5, 
    TimeUnit.MINUTES 
);

Be sure to always shutdown gracefully your executer service before your app ends. Otherwise, the backing pool of threads may run indefinitely, even past the end of your app — like a zombie ‍♂️. See the shutdownAndAwaitTermination boilerplate code given on the Javadoc for ExecutorService.

ensure only one customer thread per customer is running

You have only fifty customers. So you can have fifty executor service objects running at a time. Depending on the workload, this may be a reasonable burden, especially if not all customers are always being processed and/or if you have a multi-core machine.

Make each executor service single-threaded. With a single-thread, you can queue up multiple tasks per customer but ensure that only one task per customer is ever executing.

Map< UUID , ExecutorService > executorPerCustomerMap = new HashMap<>() ;
for ( UUID customerId : customerIds ) {
    executorPerCustomerMap.put( customerId , Executors.newSingleThreadExecutor() ) ;
}

Again… be sure to always shutdown gracefully your executer services before your app ends.

When you implement that Runnable checkForMoreCustomerfilesTask shown above, retrieve the appropriate customer-focused executor service from that map, and submit a Runnable to process that file.

// Found file for a particular customer.
ExecutorService customerExecutorSerivce = executorPerCustomerMap.get( customerId ) ;
customerExecutorSerivce.submit( new CustomerFileProcessorTask( file ) ) ;

You said each customer file being processed has a bunch of rows that can be processed concurrently. I suggest you learn about virtual threads in Java 21+. You can reasonably run thousands, even millions, of virtual threads at the same time because they are so lightweight. Read Java JEP 444. Watch the excellent talks on YouTube.com etc. by Alan Bateman, Ron Pressler, and José Paumard.

If your spreadsheet row processing work involves blocking (basically any I/O such as network calls, file access, logging, database calls, etc.) then you should use virtual threads. One catch with with virtual threads is that if your task code has any long-running code marked with synchronized, consider replacing synchronized with a RentrantLock to gain optimal performance.

In modern Java, an executor service is AutoCloseable. So you can conveniently and safely use try-with-resources syntax to automatically close after all submitted tasks are done.

try (
    ExecutorService rowsProcessingExecutorService = Executors.newVirtualThreadPerTaskExecutor() ;
) {
    for( Row row : rows ) {
        rowsProcessingExecutorService.submit( new RowProcessingTask( row ) ) ;
    }
}
// Code flow blocks here until all submitted tasks have finished or failed.

Lastly, consider dropping the middle part, the map of executor services, assigned per customer ID. Given that you can perform bursts of tasks with virtual threads, you may not really need to run customer files concurrently. You may want to process each customer file found successively, one file at a time, but with all tho rows in that file being processed concurrently via virtual threads. In this approach, you would feed each file as a task to a single-threaded executor service to run only one file at a time thereby never having two of the same customer's being processed at a time.