How to block() Reactor Http thread while calling third party api using Spring boot webclient?

35 Views Asked by At

I am writing a generic method to call third party API from my micro-service.

private <R,P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {

    HttpHeaders httpHeaders = requestEntity.getHeaders();

    Mono<R> result =  getResult(method, url, httpHeaders, type);
    R  responseBody =  result.block();

    return responseBody;
}

private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
    return webClient.method(method)
            .uri(url)
            .accept(MediaType.ALL)
            .contentType(MediaType.APPLICATION_JSON)
            .headers(headers -> headers.putAll(httpHeaders))
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError,
                    clientResponse -> Mono.error(new RuntimeException("Client error")))
            .onStatus(HttpStatusCode::is5xxServerError,
                    clientResponse -> Mono.error(new RuntimeException("Server error")))
            .bodyToMono(type);
}

But, when I am calling processRequest() method, I am getting the below error.

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

Can anyone please help to solve this issue.

Tech stack : Java : 17 spring-boot-starter-parent : 3.1.1 spring-boot-starter-webflux : 3.1.1

P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reactive code

1

There are 1 best solutions below

0
Peng On BEST ANSWER

Reactor is a non-blocking lib ,so it is not correct usage that block it event in spring http thread pool (PS but it is general that there may be some occasional required , reactor api is very useful);

In a nutshell , you can build a new bussiness process thread pool to submit non-blocking api request ; here is some code snippet for reference.


@Component
public class MyApiClient {

    private final WebClient webClient;

    public MyApiClient(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.build();
    }

    public <R, P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
        HttpHeaders httpHeaders = requestEntity.getHeaders();
        Mono<R> result = getResult(method, url, httpHeaders, type);
        return blockMono(result);
    }

    private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
        return webClient.method(method)
                .uri(url)
                .accept(MediaType.ALL)
                .contentType(MediaType.APPLICATION_JSON)
                .headers(headers -> headers.putAll(httpHeaders))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError,
                        clientResponse -> Mono.error(new RuntimeException("Client error")))
                .onStatus(HttpStatus::is5xxServerError,
                        clientResponse -> Mono.error(new RuntimeException("Server error")))
                .bodyToMono(type);
    }

    private <R> R blockMono(Mono<R> mono) {
// just init a thread pool , any way you favorite is ok 
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            return executor.submit((Callable<R>) mono::block).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Error occurred while blocking Mono", e);
        } finally {
            executor.shutdown();
        }
    }
}

you are in spring environment so it is necessary that init your bussiness process thread pool as a bean wired into spring like following .


   @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("Bussiness-Pool-");
        executor.initialize();
        return executor;
    }

annotation bean like following or build default construction method to autowired

  @Resource
    ThreadPoolTaskExecutor executor;