I have overridden the execute method for java.util.concurrent.Executor in ThreadPoolExecutor implementation. The new implementation just decorates the runnable and then calls the original execute. The issue I'm having is that if I have two such executors, then following:
supplyAsync(() -> foo(), firstExecutor).thenApplyAsync(firstResult -> bar(), secondExecutor)
translates to two execute calls. Usually they are executed by main and firstExecutor, but sometimes it's main two times.
Does it depend on how long it takes to complete the Suppplier in supplyAsync?
Here's a minimal reproducible example (10k repeats, for me it fails about 3 times java.lang.AssertionError: Unexpected second decorator: main):
package com.foo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class DecorationTest {
record WhoCalled(String decorator, String runnable) {}
static class DecoratedExecutor extends ThreadPoolExecutor{
private final List<WhoCalled> callers;
public DecoratedExecutor(List<WhoCalled> callers, String threadName) {
super(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), runnable -> new Thread(runnable, threadName));
this.callers = callers;
}
@Override
public void execute(final Runnable command) {
String decoratingThread = Thread.currentThread().getName();
Runnable decorated = () -> {
String runningThread = Thread.currentThread().getName();
callers.add(new WhoCalled(decoratingThread, runningThread));
command.run();
};
super.execute(decorated);
}
}
List<WhoCalled> callers;
ExecutorService firstExecutor;
ExecutorService secondExecutor;
@BeforeEach
void beforeEach() {
callers = new ArrayList<>();
firstExecutor = new DecoratedExecutor(callers, "firstExecutor");
secondExecutor = new DecoratedExecutor(callers, "secondExecutor");
}
@AfterEach
void afterEach() {
firstExecutor.shutdown();
secondExecutor.shutdown();
}
@RepeatedTest(10_000)
void testWhoCalled() throws Exception {
Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
assert result == 1;
WhoCalled firstCallers = callers.get(0);
assert firstCallers.decorator().equals("main");
assert firstCallers.runnable().equals("firstExecutor");
WhoCalled secondCallers = callers.get(1);
assert secondCallers.decorator().equals("firstExecutor") : "Unexpected second decorator: " + secondCallers.decorator;
assert secondCallers.runnable().equals("secondExecutor");
}
}
This depends on whether or not the
supplyAsyncpart has been completed before thethenApplyAsynccall.Explanation
Let's split the
CompletableFuturechain in thetestWhoCalledtest for explanation. Instead of:do:
The test still fails for me sometimes. There are no changes in logic, but it will allow me easier to explain.
When I create the
firstFutureusing thesupplyAsyncmethod, thefirstExecutorcan already execute the lamba's (Supplier's) body.In other words, computation can be already running. This is different from other frameworks. For example, in Project Reactor nothing happens until you
subscribe.So the computation can be already running after we return from the
supplyAsync. Let's understand what happens next. There are two possibilities:firstExecutorhas executed the() -> 1lambda. And theCompletableFuturehas the result() -> 1lambda is executing or will be executed later. We don't have result.Now we do:
Who will submit the second lambda into the
secondExecutor?It will do either
mainthread or a thread from thefirstExecutor:mainthread will submit the task into thesecondExecutor. Because no one can do it here except themainthread: the task is already done, thefirstExecutorcan not submit the task to thesecondExecutorfirstExecutorsubmits the task into thesecondExecutorExperiment: add sleep in between
If I add a
sleepin the test like:I have 9986 failures. The
sleepcall increases the probability that thefirstExecutorhas completed the task before we add a new callback inthenApplyAsync.See also "Asynchronous API with CompletableFuture: Performance Tips and Tricks" talk