sending message to RabbitMQ using spring integration stops processing items when the app is under load

114 Views Asked by At

I am using projectreactor Sinks with backPressureBuffer support to handle incoming requests in my spring boot application. In my FooService.kt

private val requestProcessor = Sinks.many().multicast().onBackpressureBuffer<FooRequest>(100)

An EmitFailureHandler as follows

  val emitFailureHandler = EmitFailureHandler { _: SignalType?, emitResult: EmitResult ->
    (emitResult == EmitResult.FAIL_NON_SERIALIZED || emitResult == EmitResult.FAIL_OVERFLOW)
  }

I have defined an init method inside the class as follows:

  init {
    requestProcessor.asFlux()
      .flatMap { request ->
        try {
          logger.debug("handleByRabbitMq : {}", request)
          footGateway.handleByRabbitMq(request)
        } catch (e: Exception) {
          logger.error("unknown error when processing foo request", e)
          Mono.empty()
        }
      }
      .subscribe()
  }


  fun handleRequest(fooRequest: FooRequest): Mono<FooResponse> {
    val (_, id) = fooRequest
    val responseSink = Sinks.one<FooResponse>()
    responseListeners[id] = responseSink
    requestProcessor.emitNext(fooRequest, emitFailureHandler)
    return responseSink.asMono()
      .timeout(Duration.ofSeconds(10))
      .onErrorResume { Mono.just(FooResponse(REQUEST_SUBMITTED)) }
      .doOnTerminate { responseListeners.remove(id) }
  }

FooController.kt

  @GetMapping("/foo/request")
  @Timed(value = "foo.request")
  fun fooRequest(fooRequest: FooRequest): Mono<String> {
    return performAction(fooRequest)
      .flatMap { fooResponse ->
        Mono.fromCallable {
          "FooResponse: $fooResponse"
        }
      }
      .doOnNext { logger.debug("response {}", it) }
  }

  fun performAction(fooRequest: FooRequest): Mono<FooResponse> {
    return Mono.just(fooRequest))
      .flatMap { request ->
        fooService.handleRequest(request)
      }
      .doOnNext { response ->
        //some operations here
        logger.debug(response)
      }.subscribeOn(Schedulers.boundedElastic())
  }

Everything works great. The requests are emitted and processed normally initially.

Problem: When the app goes under a load of requests or when I do the load tesing using frameworks like k6, the requestProcessor stops emitting the items. In other words, the init block stops executing despite the requestProcessor.emitNext(fooRequest, emitFailureHandler) getting called.

Initially, the app threw FAIL_NON_SERIALIZED and FAIL_OVERFLOW error. For this, I added the above emitFailureHandler. It helped but postponed the issue for a few hours.

  1. The App still responds though the success rate is 96%. This is still good.
  2. The second time execution: The success rate jumps down to 88%. After execution of this, the app throws 503 http reponse and no longer accepts requests.

3.logs.

enter image description here

enter image description here

enter image description here

New findings: There seems to be some issues with the RabbitMQ [fooGateway.handleByRabbitMq(request)]. I have been digging around it but failed to see any errors. With removing the rabbitMQ in the init block, the app becomes responsive. Yet, I need to know the cause?

1

There are 1 best solutions below

0
Niamatullah Bakhshi On

My initial assumption was that the issue must be with the project reactor. However, the issue seems to be with spring integration.

The flow of requests in my application is as follows:

FooController (1) ➝ FooService (2) ➝ Spring Integration (3) ➝ RabbitMQ (4)

As explained in the question, this flow works fine for sometimes. However, after processing few requests i.e. 300 requests, the app hangs and fails to accept more requests i.e. returning 503 http reponse.

After a through research, it seems the requests stop in step (3) which is delegating the tasks to spring integration for some unknown reasons.

Solution:

FooController (1) ➝ FooService (2) ➝ RabbitMQ (3)

Note: delegating the tasks to Spring Integration is convenient and recommended. So, the more ideal scenario would be as follow should the root cause get resolved with spring integration (4).

FooController (1) ➝ Spring Integration(2) ➝ FooService (3) ➝ Spring Integration(4) ➝ RabbitMQ (5)