Reactive way to process a zipped CSV file with CSVReader

197 Views Asked by At

This is a follow-up question to this: How to convert a Flux to a Mono with multiple counters

I found out that the problem lies with the CSVReader in this context. For some reason it does not properly create a Flux from it to be processed.

This doesn't work:

@Test
void testCsvReader() {
    StepVerifier.create(csvFlux()).verifyComplete();
}

@SneakyThrows
Mono<Void> csvFlux() {
var failure = new AtomicInteger();
var filtered = new ArrayList<String>();
var is = new ByteArrayInputStream(createZipContent(CSV_FILE_NAME, TEST_CSV.getBytes(StandardCharsets.UTF_8)));
var reader = new CSVReader(new InputStreamReader(new ZipInputStream(is)))) {
return Flux.fromIterable(reader)
        .flatMap(this::ean)
        .filter(ean -> filter(ean, filtered))
        .onErrorContinue((ex, integer) -> failure.getAndIncrement())
        .collectList()
        .flatMap(list -> print(list.size(), failure, filtered));
}

Mono<String> ean(final String[] record) {
    System.out.println("processing: " + Arrays.toString(record));
    var ean = record[1];
    if ("product1".equals(ean)) {
      return Mono.just(ean);
    }
    return Mono.error(RuntimeException::new);
}

boolean filter(final String ean, final List<String> filtered) {
    if ("product2".equals(ean)) {
      filtered.add(ean);
      return false;
    }
    return true;
}

Mono<Void> print(final int success, final AtomicInteger errorCount, final List<String> filtered) {
    System.out.println("success:" + success + "; failure:" + errorCount + "; filter: " + filtered);
    return Mono.empty();
}

But if I create the CSV like this, it works:

Mono<Void> run() {
    var csv = new ArrayList<String[]>();
    csv.add(new String[] {"1234", "product1", "100"});
    csv.add(new String[] {"1234", "product2", "200"});
    csv.add(new String[] {"1234", "product3", "300"});
    var failure = new AtomicInteger();
    var filtered = new ArrayList<String>();
      return Flux.fromIterable(csv)
              .flatMap(this::ean)
              .filter(ean -> filter(ean, filtered))
              .onErrorContinue((ex, integer) -> failure.getAndIncrement())
              .collectList()
              .flatMap(list -> print(list.size(), failure, filtered));
}

Could it be because we have CSV Files within a ZIP file and thus the CSVReader get's confused?

EDIT:
It is indeed because we pass in a ZipInputStream to the CSVReader. If I pass in normal InputStream of a CSV it works. This begs the question: Why does it work if I directly subscribe to the Flux but not if I use collectList()?

1

There are 1 best solutions below

0
Thomas On

I finally figured it out, with the help of some other SO posts, each bringing a piece of the puzzle:

@SneakyThrows
Mono<Void> run() {
  var failure = new AtomicInteger();
  var filtered = new ArrayList<String>();
  var is = getSequenceInputStream(createZipContent(TEST_FEED_CSV_FILE_NAME, TEST_CSV.getBytes(StandardCharsets.UTF_8)));
  return Flux.<CSVReader>generate(sink -> extractZipEntry(is, sink))
      .flatMap(Flux::fromIterable)
      .flatMap(this::ean)
      .filter(ean -> filter(ean, filtered))
      .onErrorContinue((ex, integer) -> failure.getAndIncrement()).collectList()
      .flatMap(list -> print(list.size(), failure, filtered));
}

void extractZipEntry(InputStream is, SynchronousSink<CSVReader> sink) {
  var zis = new ZipInputStream(is);
  try {
    var zipEntry = zis.getNextEntry();
    if (zipEntry == null) {
      sink.complete();
    } else {
      var reader = new CSVReader(new InputStreamReader(zis));
      reader.readNextSilently();
      sink.next(reader);
    }
  } catch (IOException ex) {
    sink.error(new RuntimeException(ex));
  }
}