How to customize the Shutdown policy of Java 21's Structured Concurrency API?

64 Views Asked by At

I have the following use case where I want to retrieve multiple error status information for a device from multiple data sources. I want to terminate all tasks immediately and return the error status to the main task if any of the statuses is true. How can I achieve this? Here is my query logic:

try (var scope = new StructuredTaskScope<>()) {
            scope.fork(()->posRepo.findInspectionByUnitId(unitId));
            scope.fork(()->posRepo.findEventOfFireByUnitId(unitId));
            scope.fork(()->faultRepo.findGeneralErrorByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdModeByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdPowerStatusByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdModeByUnitId(unitId));
            // Once a task return true, close all remaining tasks,and return the  signal:"true" to main task;
            scope.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
public interface ElePosRepo extends CrudRepository<PositionInformation,Integer> {

    Optional<Boolean> findInspectionByUnitId(Integer unitId);
    Optional<Boolean> findEventOfFireByUnitId(Integer unitId);
}

I expect that when a subtask returns true, all other subtasks within the task group should be immediately terminated. If all subtasks return false or are empty, the main task should return false after completing all subtasks.

1

There are 1 best solutions below

0
igor.zh On
class ShutdownOnFirstTrue extends StructuredTaskScope<Optional<Boolean>> {
    
    private volatile boolean succeeded;
    
    @Override
    protected void handleComplete(Subtask<? extends Optional<Boolean>> subtask) {
        if (subtask.state() == State.SUCCESS) {
            subtask.get().filter((b) -> b.booleanValue()).ifPresent( (b) -> {
                succeeded = true;
                shutdown();
            });
        }
    }
    
    public boolean isSucceeded() {
        return succeeded;
    }

}

and isSucceeded should be accessed after join completes

try (var scope = new ShutdownOnFirstTrue()) {
    scope.fork...
    scope.fork...
    scope.fork...
    scope.join();
    boolean succeeded = scope.isSucceeded();
}

but instead of relying on the haphazard solution to equally haphazard (for example, you didn't specify a behavior on exceptions, thrown by subtasks) requirements, expressed in the question, I'd suggest to learn handleComplete overriding technique, it is recommended for definition of custom behavior, was introduced right in StructuredTaskScope.ShutdownOnFailure and StructuredTaskScope.ShutdownOnSuccess and @Holger gave another excellent example in his answer.

Some technical details:

  • The class is not reentrant with next round of forks after join. To achieve it, either have a (re)setter for succeeded field or override both joins like @Holger did, the second way is more elegant, but it is a bit more prone to errors as it might silently fail if next version will add third join; (but would like to have joinImpl protected in next version or have some kind of joinCompleted placeholder in the base class).
  • shutdown is not guarded against concurrent invocations, as I believe that implShutdown already does this, so I found a guarding in ShutdownOnSuccess.handleComplete excessive.
  • there is no any special action provisioned on an exception, thrown by a subtask. It will be just silently consumed. To handle an exception if it is expected please see if the result of StructuredTaskScope.Subtask in overridden handleComplete method is Subtask.State.FAILED and act accordingly. The good example of such handling is stock ShutdownOnFailure.handleComplete, ShutdownOnFailure is a final class, though, and cannot be reused for your purpose. Once again, due to the lack of clearly pronounced system requirements the ShutdownOnFirstTrue above is only a hint, not a fully working tool.