Optimizing CPU Utilization and Throughput in Akka / Pekko Streams on HTTP

46 Views Asked by At

Consider the following code:

def getFlow()(implicit appConfig: AppConfig, actorSystem: ActorSystem): Flow[BrandSafetyServiceRequest, HttpResponse, NotUsed] = {

    implicit val brandSafetyServiceRequestCodec: JsonValueCodec[BrandSafetyServiceRequest] = JsonCodecMaker.make
    implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher

    val parallelism = Runtime.getRuntime.availableProcessors()
    val flow = Flow[BrandSafetyServiceRequest].mapAsyncUnordered(parallelism) { request =>
      Http().singleRequest(
        Post(appConfig.brandSafetyPhoenixServiceEndpoint).withEntity(HttpEntity(MediaTypes.`application/json`, writeToString(request)))
      )
    }

    val backpressure = RetryFlow.withBackoff(minBackoff = 15.seconds, maxBackoff = 30.minutes, randomFactor = 0d, maxRetries = 5, flow)(
      decideRetry = {
        case (request, HttpResponse(statusCode, _, _, _)) =>
          statusCode match {
            case StatusCodes.OK | StatusCodes.BadRequest => None
            case _ => Some(request)
          }
      }
    )

    Flow[BrandSafetyServiceRequest].via(backpressure)
  }

I want to optimize CPU utilization and maximize throughput. Currently, I'm not overriding the default dispatcher or any HTTP Akka configurations. I'm using a parallelism of Runtime.getRuntime.availableProcessors().

The source is finite, and the sink is doing nothing. Currently, I'm utilizing about 1/8 of the machine CPU.

The questions are:

  1. Is there an option to not use a low-level API with parallelism but only use Akka configurations? Is this best practice?
  2. If not, what parallelism should I use? The app is only doing this flow and is not CPU-intensive at all, just waiting for a response and forwarding the requests from the Slick.source("databricks").
  3. Which Akka configurations should I use? Maybe increase the host connections? Use another dispatcher?

If you could update the code -it would be great.
Thanks!

Note: The appConfig.brandSafetyPhoenixServiceEndpoint is a Kubernetes DNS with a path, I'm waiting for the HTTP response to back-pressure when the Backend is not avaliable, thats why i'm waiting for the responses

1

There are 1 best solutions below

0
On

As Gael noted, your stream is I/O bound (both in terms of getting data from the Slick source and in terms of sending the HTTP requests and waiting for the responses).

The advice to bound parallelism to available processors (or really 1 or 2 less than that) is only applicable to CPU-bound tasks; this would appear to be an application that admits much greater parallelism than that.

As for how much greater parallelism, one approach would be to introduce a parallelism factor to multiply the available processors by. Iteratively benchmark and if a run shows utilization of 1/n, multiply the previous factor by n; keep doing this until the improvement in CPU utilization is exhibiting markedly diminished returns from increased parallelism (this diminishing returns indicates that something else is the bottleneck: stream-based telemetry (e.g. my employer's offering) might be helpful here; a reasonable heuristic for reaching the point of diminishing returns could be along the lines of "increasing parallelism by a factor of x increased throughput by a factor of less than sqrt(x)").

As a side note, I hope you're aware of the semantics of RestartFlow around failure (especially with flows that buffer, as the mapAsync family of operators does): elements that have been handed to the wrapped flow will not be retried if the wrapped flow fails. In other words, it is very likely that your stream will process elements from the Slick source at-most-once.