Spring Reactive implementation calling db multiple times

958 Views Asked by At

I a new to reactive and trying to accomplish the below task in efficient way. I have a table having events for each user. I am trying to get latest row of each event name for a given user filtered with latest category.

table structure
user id , category , event name, details , insert timestamp, event text( payload of event)

Approach in reactive // call 1 to Cassandra

Mono<Event> latestCategory = repository.findByUserId(userId).sort().next(); // sort is by insert 
timestamp;

//call 2 to Cassandra

Flux<Event> fluxEvents = repository.findByUserId(userId)
                .groupBy(Event::name) //grouping by event name
                 .flatmap(grp -> {
                 grp.sort() // sorting for each event
                 grp.next().zipWith(latestCategory) // picking latest row for each event
                 .filter(eventWithLatestCategory -> 
eventWithLatestCategory.getT1().category.equals(eventWithLatestCategory.getT2().category) //filtering by each category
.map(Tuple2::getT1)// picking latest event row for latest category
};

functionally every thing works fine but the problem i see DB call happening for each row in the table. In imperative programming, I could have just achieved it with one db call and then apply above logic. How can I do the same in reactive world?

enter code here
1

There are 1 best solutions below

0
Michael Berry On

The "lightest-touch" approach to what you have would be to just do:

Mono<Event> latestCategory = repository.findByUserId(userId).sort().next().cache();

...which means that latestCategory is only ever fetched once, then cached for all further subscriptions.

That's probably not the optimal solution though.

In this case, as it currently stands, you're doing the sorting in the Flux itself. That's not usually wise, but if you absolutely need to do that then you may as well just do:

repository.findByUserId(userId).sort().collectList().map(eventList -> {
    //Deal imperatively with a List<Event>
});

...and then have access to the list, from a single database call, which you can query randomly as necessary. You're actually gaining, not losing performance this way, since there's just one database call - and as a sorted Flux can never output anything until the entire publisher completes, there's no material difference between that and simply collecting to a list.

The vastly better way however would be to get cassandra to do the underlying sorting, then use switchOnFirst() to zip the first element (your latest category) with every other that occurs:

repository.findByUserId(userId).switchOnFirst((signal, flux) ->
    flux.map(val -> Tuples.of(signal.get(), val)) //(In real-world use, check the `flux` actually has a value first)
)
//...etc

This means that:

  • You only need a single database query;
  • You can reactively deal with each value as it appears, without having to wait for the entire stream to end;
  • No publishers need to be cached, which is best avoided if necessary.