I can't understand how the items produced by a Multi are handled. In my understanding of Quarkus, they should (at best) be executed asynchronously, so the order, in some cases, should not be preserved.
I have created a small program to test my knowledge of Quarkus:
@Path("/test")
@Slf4j
public class Test {
public void wait(boolean type) {
if (type)
return;
try {
sleep( 10000 );
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Multi<Integer> generateStream(boolean[] type) {
return Multi.createFrom()
.emitter(emitter -> {
for(int i = 0; i < type.length; i++) {
emitter.emit(i);
}
emitter.complete();
}, BackPressureStrategy.IGNORE
);
}
public String intensiveComputation(Integer data, boolean[] type) {
boolean isFast = type[data];
wait(isFast);
String ret = "";
if (isFast) {
ret = data + " FAST -> DONE";
}
else {
ret = data + " SLOW -> DONE";
}
return ret + " on " + Thread.currentThread().getName() + "\n";
}
public Multi<String> quarkusTest(boolean[] type) {
return generateStream(type)
.onItem().transform(item -> this.intensiveComputation(item, type))
.onItem().invoke(item -> log.info(item));
}
@GET()
@Produces(MediaType.APPLICATION_JSON)
public Multi<String> test() {
boolean[] type = new boolean[5];
type[0] = false; // SLOW
type[1] = false; // SLOW
type[2] = false; // SLOW
type[3] = true; // FAST
type[4] = false; // SLOW
return quarkusTest(type);
}
}
When I call curl "http://localhost:8080/test", I get the following output:
[0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-0
,2 SLOW -> DONE on vert.x-eventloop-thread-0
,3 FAST -> DONE on vert.x-eventloop-thread-0
,4 SLOW -> DONE on vert.x-eventloop-thread-0
]
However, I would have expected something like this:
[3 FAST -> DONE on vert.x-eventloop-thread-3
,0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-1
,2 SLOW -> DONE on vert.x-eventloop-thread-2
,4 SLOW -> DONE on vert.x-eventloop-thread-3
]
I have also tried the .runSubscriptionOn(executor) and I get some threading with it but not as expected.
transform()is synchronous, so it handles items in order, possibly blocking the current thread.You should look into
transformToUniAndMerge()instead.