How to get the results of execution from ThreadPoolTaskScheduler

36 Views Asked by At

In my application I retrieve from the database some metadata and from it I build an instance of MyJob class implementing Runnable interface and then submit it to ThreadPoolTaskScheduler. Here is the code that does it.

@Scheduled(fixedRateString = "${check.rate.minutes}", 
           timeUnit = TimeUnit.MINUTES)
public void checkForJobs() {
    LOG.info("Executing check at {}", Instant.now().toString());
    int avail = threadPool.getPoolSize() - threadPool.getActiveCount();
    // will retrieve jobs metadata to fill vacancies
    List<MetaJob> availableJobs = this.repository.getAvailableJobs(avail);
    if(!(availableJobs == null || availableJobs.isEmpty())) {
        LOG.info("Retrieved {} jobs for scheduling", availableJobs.size());
        for (MetaJob job : availableJobs) {
            scheduleJob(job);
        }
    } else {
        LOG.info("No available jobs at this moment");
    }
}

private void scheduleJob(MetaJob job) {
    JobSchedule schedule = job.getSchedule();
    JobRunner jobRunner = getExecutor(job); // Creating Runnable out of metadata
    if(schedule == null) {
        threadPool.schedule(jobRunner, Instant.now());
    } else {
        Trigger trigger;
        if(schedule.getType() == SchedType.CRON) {
            trigger = new CronTrigger(schedule.getExpression());
        } else {
            trigger = new PeriodicTrigger(Duration.parse(schedule.getExpression()));
        }
        threadPool.schedule(jobRunner, trigger);
    }
}

My job is being executed as expected, but I need to process results of its completion or failure. I.e. if number of execution exceeds limit specified in the metadata, I need to cancel subsequent executions. I also need to update database with some results stored within MyJob instance. If job fails, I do need to analyze the cause of failure. If it was due to some temporary outage, I will let job run again, otherwise I also need to cancel it.

So, I saw that ThreadPoolTaskScheduler has this protected method

afterExecute(Runnable task, @Nullable Throwable ex) 

and thought that it can solve my problem. I wrote a class MyJobScheduler extending ThreadPoolTaskScheduler and overload that method with the following code:

@Override
protected void afterExecute(Runnable task, Throwable ex) {
    super.afterExecute(task, ex); 
    if(ex == null) {
       MyJubRunner runner = (MyJobRunner)task;
       MyJob job = runner.getJob();
       // whatever I need to do with it
    } else {
      // analyze the exception
    }
}

I placed a breakpoint inside this function and found that it is called twice before and after job execution. Secondly, I got ClassCastException because the Runnable this function receives as a parameter is not the task I have submitted, but an instance of java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask. I tried to do my own mapping between the MyJob and ScheduledFuture instance that is received upon submitting my job to a scheduler, but found out that this parameter received by afterExecute function has nothing in common with mentioned above ScheduledFuture instance.

My application can have dozens of jobs processed simultaneously and I do need to find which one has finished and how.

My question is quite clear: is it possible to find out which task was completed from afterExecute method. If it is possible - how to do it. If not, what would be other way to monitor completion of scheduled jobs? If someone can help me to find it out, I will appreciate it greatly.

1

There are 1 best solutions below

3
Easterwood On

You need to store all ScheduledFuture instanced returned from threadPool.schedule(jobRunner, Instant.now()); in an array or list or something and check all of them periodically if they are finished e.g. with another job and a separate thread pool.

The type ScheduledFuture<?> wraps your JobRunner instance and provides some methods to check the state of the job e.g. canceled or done, see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledFuture.html

Maybe you can use the following code with an overridden schedule(....) method returning ScheduledFuture<JobRunner>.

ScheduledFuture<JobRunner> schedule = threadPool.schedule(jobRunner, Instant.now());
if (schedule.isDone()) {
  JobRunner jobRunner = schedule.get();
}