Failure to restart akka source that is created from Http response

213 Views Asked by At

I have created WS client on play framework2.6 which connects to REST API. This rest API is continuously sending chunks/stream of data. But then for few seconds REST API server goes down and is again restarted. I have written client to restart stream but it seems its not able to restart it.

    def employesCompany() = Action.async { implicit request: Request[AnyContent] =>
    val client =   ws.underlying
    
    val wsrequest: WSRequest = ws.url(playConfiguration.get[String]("employes.service.url"))

    val futureResponse: Future[WSResponse] = wsrequest.stream()

    futureResponse.flatMap { response =>
      val source: Source[ByteString, Any] = response.bodyAsSource
      val publisher = source.toMat(Sink.asPublisher(true))(Keep.right).run()
      val flow = Flow[ByteString].map(res => res.utf8String)
      val ns = Source.fromPublisher(publisher).via(flow)
      val restartSource = RestartSource.withBackoff(Duration.apply(1, "sec"), Duration.apply(3, "sec"), 0.2) { () =>
        ns.map {
          elem =>
            println(elem)
            elem
        }
      }
     
      Future.successful(Ok.chunked(restartSource via EventSource.flow).as(ContentTypes.EVENT_STREAM))
    }
  }

I am facing below error:

ERROR] [12/20/2020 18:35:27.585] [play-dev-mode-akka.actor.default-dispatcher-8] [RestartWithBackoffSource(akka://play-dev-mode)] **Restarting graph due to failure
java.io.IOException: An existing connection was forcibly closed by the remote host**
        at sun.nio.ch.SocketDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at play.shaded.ahc.io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:368)
        at play.shaded.ahc.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:891)
        at play.shaded.ahc.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:277)
        at play.shaded.ahc.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
        at play.shaded.ahc.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at play.shaded.ahc.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
1

There are 1 best solutions below

2
Matthias Berndt On

The problem is that the request is already performed when you call wsrequest.stream(), but you want it to be performed later when the Source is materialized. You therefore need to delay the creation of the Future by not calling wsrequest.stream yourself but having Akka Stream do it.

I haven't worked with Akka Stream in a while, but something like this should work:

    val responseStream = Source.lazyFuture(() => wsrequest.stream())

    responseStream.flatMapConcat { response =>
      // the rest like before