Making Blocking (Sync) API call

139 Views Asked by At

My project is completely a Synchronized Web Application and i'm try to use netty server to save resource since it is using event loop for each request.

Now my requirement is to make a sync(blocking) external api call and for that I'm using WebClient with .block() at the end. But i'm getting below error

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-6

Here is my code


Mono<Map<String,Object>> resMono=wc.post().body(Mono.just(bodyStr), String.class)
  .retrieve().bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {})
  .subscribeOn(Schedulers.boundedElastic())
  .publishOn(Schedulers.boundedElastic())
//.subscribeOn(Schedulers.fromExecutor(webClientExecutor))
  .timeout(Duration.ofSeconds(10)).log("info");


 resMono.share().block();
// resMono.block();

I've tried with exchangeToMono and SubscribeOn, publishOn with Schedulars.boundedElastic() or custom executors. But nothing helps.

With resMono.share().block() works untill specific threshold and later when concurrent hits reach like 5 to 10, requests start hung (I assume, event loop is full at this point).

I aware that same WebClient code with .block() at end will work fine with Tomcat since it is sync server. But i'm tryng to do blocking call in netty application with WebClient.

Thanks in advance.

1

There are 1 best solutions below

11
Jagan On

With the help of @igor Artamonov & inspiration from SO Answer

Here is the complete solution:

1: Create NioEventLoopGroup with custom Executor (with no. of threads, name, Blocking Queue)

{
    Integer THREADS = 10; 
    Executor EXECUTOR = new ThreadPoolExecutor(THREADS, THREADS, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(10), new CustomizableThreadFactory("ThreadNamePrefix-"));
  NioEventLoopGroup RESOURCE = new NioEventLoopGroup(THREADS, EXECUTOR);
}

2: Create Registory factory

public ReactorResourceFactory getReactorResourceFactory() {
        ReactorResourceFactory rf = new ReactorResourceFactory();
        rf.setLoopResources(new LoopResources() {
            @Override
            public EventLoopGroup onServer(boolean b) {
                return RESOURCE;
            }
        });
        rf.setConnectionProvider(ConnectionProvider.create("Custom-WebClient-Name"));
        return rf;
    }

3: Create ReactorHttpConnector (created with SSL Context to allow insecure sites)

private ClientHttpConnector getCustomReactorHttpConnector() {
    try {
        SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
                .build();
        return new ReactorClientHttpConnector(getReactorResourceFactory(),
                httpClient -> httpClient.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
    } catch (Exception e) {

    }
}

4: Build WebClient ( Custom Exchange strategies to set max in-memory size)

WebClient wc = WebClient.builder().clientConnector(getCustomReactorHttpConnector())
                    .baseUrl(endpoint.getHost() + endpoint.getUrl()).exchangeStrategies(getExchangeStrategies()).build();

5: Call API with block

Map<String,Object> resMap= wc.post().body(Mono.just(bodyStr), String.class).retrieve().bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {}).share().block();