How to Implement Actor.Ask to Control the flow of data at a certain rate

192 Views Asked by At

I used below Method but it is using large amount of CPU so i want to use Actor.Ask instead of below method can anyone help me how to use Actor.Ask

KafkaConsumer.PlainSource(
   consumerSettings, subscription)
     .RunForeach(result =>
      { 
          _ActorRef.Tell(result.Message.Value);
       }, materializer);
2

There are 2 best solutions below

12
Aaronontheweb On

You should refactor this to use a SelectAsync stage in Akka.Streams:

KafkaConsumer.PlainSource(
consumerSettings, subscription)
  .SelectAsync(10, result => {
  _ActorRef.Ask<TResponse>(result.Message.Value, TimeSpan.FromSeconds(3))
  })
  .RunForeach(result => // will now be of type TResponse
  { 
      // do something with TResponse
   }, materializer);

This will give you backpressure support and will only allow up to 10 concurrent Task<TResponse>s to be outstanding at any given time. With SelectAsync invocation order is preserved - so the results from those tasks will be delivered downstream in the original order in which they were invoked. If you don't care about the order, use SelectAsyncUnordered instead for additional throughput.

5
Aaronontheweb On

My previous answer, having the target actor backpressure via replying to the stream, is the most robust way of doing this. But since the OP indicated that their actor doesn't send back any kind of reply today, here's another way of doing it.

KafkaConsumer.PlainSource(
   consumerSettings, subscription)
     .Throttle(100, TimeSpan.FromSeconds(1), 100, ThrottleMode.Shaping)
     .RunForeach(result =>
      { 
          _ActorRef.Tell(result.Message.Value);
       }, materializer);

That will enforce a maximum output limit of 100 messages per second - and once that threshold is reached the stage will backpressure Kafka in order to keep memory and CPU consumption low.