I am using projectreactor Sinks with backPressureBuffer support to handle incoming requests in my spring boot application.
In my FooService.kt
private val requestProcessor = Sinks.many().multicast().onBackpressureBuffer<FooRequest>(100)
An EmitFailureHandler as follows
val emitFailureHandler = EmitFailureHandler { _: SignalType?, emitResult: EmitResult ->
(emitResult == EmitResult.FAIL_NON_SERIALIZED || emitResult == EmitResult.FAIL_OVERFLOW)
}
I have defined an init method inside the class as follows:
init {
requestProcessor.asFlux()
.flatMap { request ->
try {
logger.debug("handleByRabbitMq : {}", request)
footGateway.handleByRabbitMq(request)
} catch (e: Exception) {
logger.error("unknown error when processing foo request", e)
Mono.empty()
}
}
.subscribe()
}
fun handleRequest(fooRequest: FooRequest): Mono<FooResponse> {
val (_, id) = fooRequest
val responseSink = Sinks.one<FooResponse>()
responseListeners[id] = responseSink
requestProcessor.emitNext(fooRequest, emitFailureHandler)
return responseSink.asMono()
.timeout(Duration.ofSeconds(10))
.onErrorResume { Mono.just(FooResponse(REQUEST_SUBMITTED)) }
.doOnTerminate { responseListeners.remove(id) }
}
FooController.kt
@GetMapping("/foo/request")
@Timed(value = "foo.request")
fun fooRequest(fooRequest: FooRequest): Mono<String> {
return performAction(fooRequest)
.flatMap { fooResponse ->
Mono.fromCallable {
"FooResponse: $fooResponse"
}
}
.doOnNext { logger.debug("response {}", it) }
}
fun performAction(fooRequest: FooRequest): Mono<FooResponse> {
return Mono.just(fooRequest))
.flatMap { request ->
fooService.handleRequest(request)
}
.doOnNext { response ->
//some operations here
logger.debug(response)
}.subscribeOn(Schedulers.boundedElastic())
}
Everything works great. The requests are emitted and processed normally initially.
Problem: When the app goes under a load of requests or when I do the load tesing using frameworks like k6, the requestProcessor stops emitting the items. In other words, the init block stops executing despite the requestProcessor.emitNext(fooRequest, emitFailureHandler) getting called.
Initially, the app threw FAIL_NON_SERIALIZED and FAIL_OVERFLOW error. For this, I added the above emitFailureHandler. It helped but postponed the issue for a few hours.
- The App still responds though the success rate is 96%. This is still good.
- The second time execution: The success rate jumps down to 88%.
After execution of this, the app throws
503 http reponseand no longer accepts requests.
3.logs.
New findings:
There seems to be some issues with the RabbitMQ [fooGateway.handleByRabbitMq(request)]. I have been digging around it but failed to see any errors. With removing the rabbitMQ in the init block, the app becomes responsive. Yet, I need to know the cause?



My initial assumption was that the issue must be with the project reactor. However, the issue seems to be with
spring integration.The flow of requests in my application is as follows:
As explained in the question, this flow works fine for sometimes. However, after processing few requests i.e. 300 requests, the app hangs and fails to accept more requests i.e. returning
503 http reponse.After a through research, it seems the requests stop in step
(3)which is delegating the tasks to spring integration for some unknown reasons.Solution:
Note: delegating the tasks to Spring Integration is convenient and recommended. So, the more ideal scenario would be as follow should the root cause get resolved with
spring integration (4).