TL;DR: Need to slow (dynamically) cosmos Db querying Flux so downstream can publish messages (asynchronously) to service bus.
Full description: I have a Flux that queries Cosmos DB and returns lots of document results that I need to transform and push to Azure Service Bus.
Publishing un-throttled seems to overwhelm the service bus client library (even when publishing ~1000 messages, each less than 1 KB) and I start getting "java.lang.OutOfMemoryError: Java heap space" exceptions so cannot publish all messages:
fluxToRunCosmosQueriesAndReturnLotsOfResults
.filter(...)
.map(toServiceBusMessage)
//.delayElements(Duration.ofMillis(10))
.map(msg ->
{
return messageSender.sendMessage(msg); //returns Mono, actual msg is sent to bus asynchronously
})
.flatMap(Flux::from)
.blockLast();
ServiceBusSenderAsyncClient messageSender = new ServiceBusClientBuilder()
.connectionString(CONNECTION_STRING)
.sender()
.topicName(TOPIC_NAME)
.buildAsyncClient();
Un-commenting the delay above adjusts the rate for me and can publish all messages without any exceptions. But such arbitrary delay seems hacky and I want to dynamically slow/pause the upstream emitter based on how much downstream service bus client can safely publish.
Using Semaphore also works for me: I can delay upstream until certain number of messages are published to service bus (and completed asynchronously by messageSender):
public class FluxCondition
{
Semaphore sem;
public FluxCondition(int permits)
{
sem = new Semaphore(permits);
}
public Mono<Object> fetchPermit()
{
return Mono.fromRunnable(() ->
{
try
{
sem.acquire();
}
catch (InterruptedException e) {}
});
}
public void releasePermit()
{
sem.release();
}
}
FluxCondition semPermits = new FluxCondition(10);
fluxToRunCosmosQueriesAndReturnLotsOfResults
.filter(...)
.map(this::toServiceBusMessage)
.delayUntil(o -> semPermits.fetchPermit())
.map(msg ->
{
return messageSender.sendMessage(msg)
.doFinally(o -> semPermits.releasePermit());
})
.flatMap(Flux::from)
.blockLast();
(Note: For efficiency, I cannot batch messages to service bus as suggested in official docs: https://github.com/Azure/azure-sdk-for-java/blob/cc51514c7d14ff9805fe265cfc72e424c9c80576/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java#L121 since my topic is partitioned with de-dup enabled)
My question is: Is there a better/recommended (best practices) reactor friendly way to adjust the rate of publisher flux here (that returns cosmos Db query results) so that downstream can safely publish messages to service bus and is not overwhelmed. It would be great if upstream emitter can dynamically adjust/adapt to ServiceBusSenderAsyncClient's rate.
(azure-messaging-servicebus: 7.14.1, reactor-core: 3.4.27)
Exceptions when publishing to service bus client in an un-throttled manner:
12:01:05.850 [reactor-executor-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped java.lang.OutOfMemoryError: Java heap space at com.azure.messaging.servicebus.ServiceBusMessageBatch.(ServiceBusMessageBatch.java:42) at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient$AmqpMessageCollector.(ServiceBusSenderAsyncClient.java:848) at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.lambda$sendInternal$25(ServiceBusSenderAsyncClient.java:794) at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient$$Lambda$1648/954915536.apply(Unknown Source) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174) at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:180) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) at com.azure.core.amqp.implementation.ReactorSession.lambda$createProducer$16(ReactorSession.java:515) at com.azure.core.amqp.implementation.ReactorSession$$Lambda$1631/598519268.run(Unknown Source) at com.azure.core.amqp.implementation.handler.DispatchHandler.onTimerTask(DispatchHandler.java:32) at com.azure.core.amqp.implementation.ReactorDispatcher$WorkScheduler.run(ReactorDispatcher.java:207) at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118) java.lang.OutOfMemoryError: Java heap space 12:01:06.696 [reactor-executor-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped java.lang.OutOfMemoryError: Java heap space