I am working on SpringWebflux application and trying read a JSON file that is stored on the classpath. But I want to do it in a non-blocking way and having a problem.
Whatever I could come up with is doing it like this:
@Override
public Flux<SomePojo> getData() {
Flux<DataBuffer> data = DataBufferUtils.readInputStream(() -> {
return new ClassPathResource("config/someData.json").getInputStream();
}, new DefaultDataBufferFactory(), 4096);
return jackson2JsonDecoder
.decode(data, ResolvableType.forClass(SomePojo.class), MimeType.valueOf(MediaType.APPLICATION_JSON_VALUE), null)
.map(SomePojo.class::cast);
}
jackson2JsonDecoder is created as bean with default ObjectMapper that comes from Spring.
@Bean
public Jackson2JsonDecoder jackson2JsonDecoder(ObjectMapper objectMapper) {
return new Jackson2JsonDecoder(objectMapper);
}
However that gives me an error and I can't figure out why:
org.springframework.core.codec.DecodingException: JSON decoding error: Already closed, can not feed more input
at org.springframework.http.codec.json.Jackson2Tokenizer.tokenize(Jackson2Tokenizer.java:113) ~[spring-web-6.1.3.jar:6.1.3]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
Any idea?