I a new to reactive and trying to accomplish the below task in efficient way. I have a table having events for each user. I am trying to get latest row of each event name for a given user filtered with latest category.
table structure
user id , category , event name, details , insert timestamp, event text( payload of event)
Approach in reactive // call 1 to Cassandra
Mono<Event> latestCategory = repository.findByUserId(userId).sort().next(); // sort is by insert
timestamp;
//call 2 to Cassandra
Flux<Event> fluxEvents = repository.findByUserId(userId)
.groupBy(Event::name) //grouping by event name
.flatmap(grp -> {
grp.sort() // sorting for each event
grp.next().zipWith(latestCategory) // picking latest row for each event
.filter(eventWithLatestCategory ->
eventWithLatestCategory.getT1().category.equals(eventWithLatestCategory.getT2().category) //filtering by each category
.map(Tuple2::getT1)// picking latest event row for latest category
};
functionally every thing works fine but the problem i see DB call happening for each row in the table. In imperative programming, I could have just achieved it with one db call and then apply above logic. How can I do the same in reactive world?
enter code here
The "lightest-touch" approach to what you have would be to just do:
...which means that
latestCategoryis only ever fetched once, then cached for all further subscriptions.That's probably not the optimal solution though.
In this case, as it currently stands, you're doing the sorting in the
Fluxitself. That's not usually wise, but if you absolutely need to do that then you may as well just do:...and then have access to the list, from a single database call, which you can query randomly as necessary. You're actually gaining, not losing performance this way, since there's just one database call - and as a sorted
Fluxcan never output anything until the entire publisher completes, there's no material difference between that and simply collecting to a list.The vastly better way however would be to get cassandra to do the underlying sorting, then use
switchOnFirst()to zip the first element (your latest category) with every other that occurs:This means that: