I am learning concurrency through the CompletableFuture API. Let's say I have two tasks: one takes 250ms and another takes 2500ms. In the following code:
Supplier<List<Long>> supplyIds = () -> {
sleep(200);
return(Arrays.asList(1L, 2L, 3L));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
sleep(250);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
sleep(2500);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Consumer<List<User>> displayer = users -> {
users.forEach(System.out::println);
};
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
sleep(6000);
I get the following result:
User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1
I understand that the code is running synchronously since the same common fork join pool thread is being used and we don't specify the thread. I am confused as to why the fetchUsers2task is being executed first and then fetchUsers1task (this seems to be consistent with each run). I assumed that since thenCompose is called on fetchUsers1 first in the code, it would be 'queued up' first.
Nothing in the documentation says that the order of invocation matters for
thenCompose.Since you define two independent stages, both only depending on
completableFuture, there is no defined order betweenusers1anduser2and the resulting order is just implementation dependent.You may reproducibly get a particular order in one environment, but a different order in a different environment. Even in your environment, it’s possible to get a different order in some runs. If the initiating thread loses the CPU after calling
supplyAsync(supplyIds)for 200 milliseconds, it might execute the action specified withthenCompose(fetchUsers1)immediately at the invocation, before the invocation ofthenCompose(fetchUsers2).When the order between two actions matters, you must model a dependency between them.
Note that likewise the code
defines entirely independent actions. Since
acceptEitheris applied tousers1andusers2, rather than the completion stages returned by thethenRuncalls, it is not dependent on the completion of the print statements. These three actions may be performed in any order.