Vertx Http Webclient request hangs even though worker threads are available

2.1k Views Asked by At

I have a program where I have deployed a worker verticle (vertx version 3.6.3). From this, worker verticle, I am making an HTTP request using Vertx WebClient library. My vertx worker pool size is 20 and event loop pool size is 10. Right after, http request is made (after send() call) I am blocking the worker thread which has made the HTTP request (worker thread) using completable future. When I block the worker thread, HTTP request never responds and always timesout. It responds when worker thread is not blocked. I thought, If I block the worker thread, there are other worker threads from the pool to honour the HTTP requests. What am I doing wrong here ? Also, I have enabled network log activity but I don't see any network logs getting printed in the logs.

Following is program I have tried and I am running a sample HTTP server running at localhost at port 8080 which responds fine.

import java.util.concurrent.TimeUnit;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;

public class VertxTestMain extends AbstractVerticle {

    private static int workerPoolSize = 20;
    private static int eventLoopPoolSize = 10;

    @Override
    public void start() {
        vertx.eventBus().consumer("vertx.address", event -> {
            CompletableFuture<String> future = new CompletableFuture<>();
            doAHttpRequest(vertx, future);
            try {
                //future.get(20, TimeUnit.SECONDS);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName()+ " ----- HTTP request never responded");
                e.printStackTrace();
            }
        });
    }

    private void doAHttpRequest(Vertx vertx, CompletableFuture<String> future)  {

        //System.setProperty("java.util.logging.config.file", "/opt/maglev/persisted/data/vertx-default-jul-logging.properties");

        WebClientOptions options = new WebClientOptions();
        options.setLogActivity(true);
        WebClient webClient = WebClient.create(vertx, options );

        int port = 8080;
        String host = "localhost";
        String url = "/";

        System.out.println(Thread.currentThread().getName()+ " ----- Sending http://" + host + ":" + port + "/" + url);

        // Send a GET request
        webClient
        .get(port, host, url)
        .timeout(10000)
        .send(ar -> {
            if (ar.succeeded()) {
                HttpResponse<Buffer> response = ar.result();
                System.out.println(Thread.currentThread().getName()+ " ----- Received response.  " + response.bodyAsString());
                System.out.println(Thread.currentThread().getName()+ " ----- Received response with status code.  " + response.statusCode());
                future.complete("success");
            } else {
                System.out.println(Thread.currentThread().getName()+ " ----- Something went wrong. " + ar.cause().getMessage());
                future.completeExceptionally(ar.cause());
            }
        });
    }

    public static void main(String[] args) {

        DeploymentOptions deploymentOptions = new DeploymentOptions().setWorker(true).setInstances(2);
        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.setWorkerPoolSize(workerPoolSize);
        vertxOptions.setEventLoopPoolSize(eventLoopPoolSize);
        Vertx vertx = Vertx.vertx(vertxOptions);

        vertx.deployVerticle(VertxTestMain.class.getName(), deploymentOptions, event -> {
            if(event.succeeded()) {
                System.out.println(Thread.currentThread().getName()+ " ----- VertxTestMain verticle is deployed");
                vertx.eventBus().send("vertx.address", "send");
            }
            else {
                System.out.println(Thread.currentThread().getName()+ " ----- VertxTestMain verticle deployment failed. " + event.cause());
            }
        });
    }
}
1

There are 1 best solutions below

4
Alexey Soshin On

You don't allow your HTTP request to start.

Return Future from your method, instead of passing it:

private CompletableFuture<String> doAHttpRequest(Vertx vertx)  {
    CompletableFuture<String> future = new CompletableFuture<>();
    WebClientOptions options = new WebClientOptions();
    options.setLogActivity(true);
    WebClient webClient = WebClient.create(vertx, options );

    int port = 8080;
    String host = "localhost";
    String url = "/";

    System.out.println(Thread.currentThread().getName()+ " ----- Sending http://" + host + ":" + port + "/" + url);

    // Send a GET request 
    webClient
      .get(port, host, url)
      .timeout(10000)
      .send(ar -> {
          if (ar.succeeded()) {
              HttpResponse<Buffer> response = ar.result();
              System.out.println(Thread.currentThread().getName()+ " ----- Received response.  " + response.bodyAsString());
              System.out.println(Thread.currentThread().getName()+ " ----- Received response with status code.  " + response.statusCode());
              future.complete("success");
          } else {
              System.out.println(Thread.currentThread().getName()+ " ----- Something went wrong. " + ar.cause().getMessage());
              future.completeExceptionally(ar.cause());
          }
      });
       
    return future;
}

You can also reuse the WebClient, there's no need to create it for every request.

Also, take a look into Promise API Vert.x provides, as it may be better suited for your use case:

https://vertx.io/docs/apidocs/io/vertx/core/Promise.html