Quarkus Multi<> multithreading

38 Views Asked by At

I can't understand how the items produced by a Multi are handled. In my understanding of Quarkus, they should (at best) be executed asynchronously, so the order, in some cases, should not be preserved.

I have created a small program to test my knowledge of Quarkus:

@Path("/test")
@Slf4j
public class Test {
    
    public void wait(boolean type) {
        if (type)
            return;
        try {
            sleep( 10000 );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public Multi<Integer> generateStream(boolean[] type) {
        return Multi.createFrom()
                .emitter(emitter -> {
                    for(int i = 0; i < type.length; i++) {
                        emitter.emit(i);
                    }
                    emitter.complete();
                }, BackPressureStrategy.IGNORE
                );
    }

    public String intensiveComputation(Integer data, boolean[] type) {
        boolean isFast = type[data];
        wait(isFast);
        String ret = "";
        if (isFast) {
            ret = data + " FAST -> DONE";
        }
        else {
            ret = data + " SLOW -> DONE";
        }
        return ret + " on " + Thread.currentThread().getName() + "\n";
    }

    public Multi<String> quarkusTest(boolean[] type) {
        return generateStream(type)
                .onItem().transform(item -> this.intensiveComputation(item, type))
                .onItem().invoke(item -> log.info(item));
    }
    
    @GET()
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<String> test() {
        boolean[] type = new boolean[5];
        type[0] = false;  // SLOW
        type[1] = false;  // SLOW
        type[2] = false;  // SLOW
        type[3] = true;   // FAST
        type[4] = false;  // SLOW
        return quarkusTest(type);
    }
}

When I call curl "http://localhost:8080/test", I get the following output:

[0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-0
,2 SLOW -> DONE on vert.x-eventloop-thread-0
,3 FAST -> DONE on vert.x-eventloop-thread-0
,4 SLOW -> DONE on vert.x-eventloop-thread-0
]

However, I would have expected something like this:

[3 FAST -> DONE on vert.x-eventloop-thread-3
,0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-1
,2 SLOW -> DONE on vert.x-eventloop-thread-2
,4 SLOW -> DONE on vert.x-eventloop-thread-3
]

I have also tried the .runSubscriptionOn(executor) and I get some threading with it but not as expected.

1

There are 1 best solutions below

2
jponge On

transform() is synchronous, so it handles items in order, possibly blocking the current thread.

You should look into transformToUniAndMerge() instead.