Configuring Apache Spark's MemoryStream to simulate Kafka stream

49 Views Asked by At

I was requested to look into using Apache Spark's MemoryStream to simulate a Kafka stream in a Java Spring Boot service. The documentation / online community is a bit small on this topic so I am seeking aid.

This is my implementation code.

final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);

final Dataset<Row> df = kafkaDataStreamReader.load();

return df.writeStream().foreachBatch((batch, batchId) -> {
    // Process a batch of data received from Kafka
    updateData(name, customizerFunction, avroSchema, batch);
  • KafkaLoader is a class which, depending on the Profile (it/prod), will configure the Kafka Stream differently. It returns a DataStreamReader which might be the reason why I'm struggling to create a MemoryStream.
  • Next, in the writeStream i'm writing to my source destinations.
@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
  @Autowired
  SparkSession sparkSession;

  @SneakyThrows
  @Override
  public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
    options = Map.of();
    MemoryStream<String> stream = null;
    try {
      stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
      String jsonString = "{..}";

      Seq<String> seq = JavaConverters
        .collectionAsScalaIterableConverter(List.of(jsonString))
        .asScala()
        .toSeq();

      Offset currentOffset = stream.addData(seq);
      stream.commit(currentOffset);
    } catch (Exception e){
      log.warn("Error creating MemoryStream: ", e);
      return new DataStreamReader(sparkSession);
    }
    Dataset<Row> data = stream.toDF();
    log.debug("Stream enabled [t/f]: {}", data.isStreaming());
    return data
      .sqlContext()
      .readStream();
      .format("kafka")
      .option("kafka.bootstrap.servers", "test-servers")
      .option("subscribe", "test-data");
  }

ItKafkaLoader is called when I'm running integration tests, hence ActiveProfiles is set to it here, and is where I'm struggling to create a MemoryStream. Because my implementation code is expecting a returned object of type DataStreamReader I believe I need to call on readStream() since it's of type DataStreamReader? However, when I just try readStream() Spark throws an exception about my path not being defined.

java.lang.IllegalArgumentException: 'path' is not specified
    at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError

When searching this error I tend to see that I need to set my format to Kafka. And then doing this, Spark expects a topic and then a broker. I was hoping that since I was using MemoryStream that Spark would just recognize that this is a dummy Kafka cluster & topic and go about kicking of my simulated Kafka Stream through my MemoryStream. That doesn't happen, and when I run my integration test I get these errors.

- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers

Ideally, I would like to figure out how to fix getStream() in ItKafkaLoader, however I have a slight feeling that i don't understand what MemoryStream is really for and might need to do something different.

Update: I have seen that in newer versions of Spark you can just set the format to memory, however, it appears that my Spark version v2.12 does not support that. I also do not have the green light to upgrade my Spark version.

0

There are 0 best solutions below