Lagom topic subscriber - how to retry in Future Exception?

150 Views Asked by At

I have a topic subscriber in lagom as bellow

fooService.fooTopic().subscribe
  .atLeastOnce(
    Flow[fooMsg].map {
      case fooMsg(_) =>
        foo()
      case a =>
        println(a)
    }.async.map{ _ =>
      Done
    }
  )

to subscribe this topic, im using atLeastOnce as the method, so that if theres any exception, I want the flow to be restarted/retried. when im throwing a normal exception, it could keep retrying normally

  private def foo() = {
    throw new RuntimeException("testing error")
  }

but when the exception happens in the future, no matter how I tried it, the Flow wont restart. heres one of my tries to handle the exception in the future

  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))
    val result = for {
      y1 <- test
    } yield (y1)

    result.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }
  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))

    test.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }

it will show an exception, but the Flow wont restart it automatically. How should I handle/throw the exception in the Future?

1

There are 1 best solutions below

3
Vladislav Kievski On

I think you do not need to restart full flow if you have failed only one future. I suggest retrying only Future. For instance, you can write code like this that will retry your call, replace Future.successful(10) on the call of your method:

        val test: Future[Int] = Future(throw new RuntimeException("asd")).recoverWith {
          case NonFatal(e) =>
            Future.successful(10)
        }

        val result = for {
          y1 <- test
        } yield (y1)

Also, you can write code as you want, it will fail and retry, but you need to return result of your Future:

  kafka.topic1.subscribe.atLeastOnce(Flow[String]
    .mapAsync(1) {
      case envelope: String =>

        val test: Future[String] = Future(throw new RuntimeException("asd"))
      /*.recoverWith {
          case NonFatal(e) =>
            Future.successful("10")
        }*/

        val result = for {
          y1 <- test
        } yield (y1)

        println(s"code block $envelope")
       result.onComplete{
          case Success(value) => println(s"Message from topic: $envelope $result")
          case Failure(exception) => println(exception.getMessage)
            throw exception
        }
      result.map(_ => Done)
    }
)