Akka Kafka restart on internal failure

32 Views Asked by At

I use the Alpakka Kafka connector for running a Consumer stream inside akka streams. The stream starts streams inside for each partition that is assigned to this consumer instance. That means if there is any error inside a partition stream, then I need to restart either the partition or everything.

The code is following:

Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsyncUnordered(
        maxPartitions,
        pair -> {
          Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
              pair.second();
          return source
              .via(businessThatMayCrash())
              .map(message -> message.committableOffset())
              .runWith(Committer.sink(committerSettings), system);
        })
    .toMat(Sink.ignore(), Consumer::createDrainingControl)
    .run(system);

See Separate streams per partition https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition

But how I can restart the streams? Please note: If there is any error on Kafka side, then it resumes automatically.

I tried to use the RestartSource for each partition but if the stream is failed then the Kafka consume does not work anymore. https://doc.akka.io/docs/akka/current/stream/operators/RestartSource/onFailuresWithBackoff.html Also control.streamCompletion() does not get completed.

There is also the option to add a watchTermination() for each partition that calls control.drainAndShutdown(..) but that seems quite complicated. https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html

What is the best solution for that?

1

There are 1 best solutions below

1
Levi Ramsey On BEST ANSWER

The trick will be to keep the failure of the stream you're running inside mapAsyncUnordered from failing the mapAsyncUnordered.

The easiest way to do this is to construct a CompletableFuture which will succeed when the stream succeeds or fails.

// inside mapAsyncUnordered, apologies for any Java atrocities (Scala dev...)
pair -> {
  return
     pair.second()
        .via(businessLogicThatMayCrash())
        .map(message -> message.committableOffset())
        .runWith(Committer.sink(committerSettings), system)
        .handle((ignoredResult, ignoredFailure) -> akka.Done.done());
}

When the stream being run fails, a new source should be picked up by the committablePartitionedSource.