How do you capture the error of the RestartSource after the max amount of restarts? I would like to do something after the source has failed the max amount of times. I can see the source restarting in the logs. I have tried adding a withAttributes but it is never invoked.
return RestartSource.onFailuresWithBackoff(restartSettings, () -> Consumer
.committableSource(getConsumerSettings(), topics)
.log("error on receiver topic")
.mapMaterializedValue(ctrl -> {
control = ctrl;
return NotUsed.getInstance();
})
.withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
log.error("Stream has failed", e);
return (Supervision.Directive) Supervision.stop();
})));
Any advice would be appreciated.
Ok, I got this working. It seems like the attributes are never invoked in this case but this causes the entire stream to fail so the result will be on the attached flow and sink completion. In my case I had something like:
Then we can get the failure on the streamCompletion like follows: