Why blocking on thenApplyAsync works but not with thenApply

496 Views Asked by At

We saw some interesting behavior in our application. The following Spock spec captures the behavior. I am trying to understand why the second test passes but the first one throws a TimeoutException.

Summary: There is a mock server with a mock endpoint that responds with a success after a 10ms delay. We use AsyncHttpClient to make a nonblocking call to this mock endpoint. The first call is chained with a second blocking call to the same endpoint. The first call succeeds but the second fails with timeout if thenApply is used but succeeds if thenApplyAsync is used. In both cases, the mock server seems to respond within 10ms.

Dependencies:


    implementation 'com.google.guava:guava:29.0-jre'
    implementation 'org.asynchttpclient:async-http-client:2.12.1'

    // Use the latest Groovy version for Spock testing
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.11'

    // Use the awesome Spock testing and specification framework even with Java
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.objenesis:objenesis:1.4'
    testImplementation "cglib:cglib:2.2"
    testImplementation 'junit:junit:4.13'
    testImplementation 'org.mock-server:mockserver-netty:5.11.1'

Spock Spec:


package com.switchcase.asyncthroughput

import com.google.common.base.Charsets
import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.RequestBuilder
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpResponse
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import static org.mockserver.integration.ClientAndServer.startClientAndServer
import static org.mockserver.model.HttpRequest.request

class CompletableFutureThreadsTest extends Specification {

    @Shared
    ClientAndServer mockServer

    def asyncHttpClient = new DefaultAsyncHttpClient();

    def setupSpec() {
        mockServer = startClientAndServer(9192);
        //create a mock server which response with "done" after 100ms.
        mockServer.when(request()
                .withMethod("POST")
                .withPath("/validate"))
                .respond(HttpResponse.response().withBody("done")
                        .withStatusCode(200)
                        .withDelay(TimeUnit.MILLISECONDS, 10));
    }

    def "Calls external using AHC with a blocking call with 1sec timeout results in TimeoutException."() {
        when:
        callExternal().thenApply({ resp -> callExternalBlocking() }).join()

        then:
        def exception = thrown(CompletionException)
        exception instanceof CompletionException
        exception.getCause() instanceof TimeoutException
        exception.printStackTrace()
    }

    def "Calls external using AHC with a blocking call on ForkJoinPool with 1sec timeout results in success."() {
        when:
        def value = callExternal().thenApplyAsync({ resp -> callExternalBlocking() }).join()

        then:
        value == "done"
    }

    def cleanupSpec() {
        mockServer.stop(true)
    }

    private CompletableFuture<String> callExternal(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternal Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        })
    }

    private String callExternalBlocking(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternalBlocking Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        }).join()
    }
}

EDIT:

Debug log and stack trace for timeout: (The timeout happens on the remote call in callExternalBlocking)

17:37:38.885 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.timeout.TimeoutTimerTask - Request timeout to localhost/127.0.0.1:9192 after 1000 ms for NettyResponseFuture{currentRetry=0,
    isDone=0,
    isCancelled=0,
    asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
    nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
    future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
    uri=http://localhost:9192/validate,
    keepAlive=true,
    redirectCount=0,
    timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
    inAuth=0,
    touch=1622248657866} after 1019 ms
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.channel.ChannelManager - Closing Channel [id: 0x5485056c, L:/127.0.0.1:58076 - R:localhost/127.0.0.1:9192] 
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.request.NettyRequestSender - Aborting Future NettyResponseFuture{currentRetry=0,
    isDone=0,
    isCancelled=0,
    asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
    nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
    future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
    uri=http://localhost:9192/validate,
    keepAlive=true,
    redirectCount=0,
    timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
    inAuth=0,
    touch=1622248657866}

java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273)
    at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473)
    at org.asynchttpclient.netty.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:43)
    at org.asynchttpclient.netty.timeout.RequestTimeoutTimerTask.run(RequestTimeoutTimerTask.java:50)
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    ... 7 more
0

There are 0 best solutions below