Spark Kinesis Streaming Checkpoint Recovery: RDD nullpointer exception

472 Views Asked by At

When resuming a failed job from a checkpoint application logic is invoked correctly and RDD's are reinstantiated, however a call to RDD.map results in a NullPointerException.

lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

private def createStreamingContext: StreamingContext = {
  val ssc = new StreamingContext(spark.sparkContext, batchInterval)
  ssc.checkpoint(checkpointDir)
  consumeStreamingContext(ssc)
  ssc
}

def consumeStreamingContext(ssc: StreamingContext) = {
  //... create dstreams
  val dstream = KinesisUtil.createStream(....
  ...

  dstream.checkpoint(batchInterval)

  dstream
    .foreachRDD(process)
}

def process(events: RDD[Event]) = {
  if (!events.isEmpty()) {
    logger.info("Transforming events for processing")
    //rdd seems to support some operations? 
    logger.info(s"RDD LENGTH: ${events.count}")
    //nullpointer exception on call to .map
    val df = events.map(e => {
      ...
    }

  }
}

EDIT: Updated to inform that I'm using Kinesis and WAL is enabled. Is WAL checkpointing supported on S3? I'm reading elsewhere that is not well supported. https://issues.apache.org/jira/browse/SPARK-9215

EDIT: Im experiencing similar results with HDFS.

2

There are 2 best solutions below

0
On

The solution was to call rdd.checkpoint after each transformation within foreach. Every RDD transformation must be checkpointed.

0
On

I get into the similar problem - let me explain my problem first and then how I was able to solve it.

Problem Statement: Consuming Kinesis data using a spark streaming. When the spark stream works on top of kinesis we still get an unstructured stream (DStream) instead of Structured Stream which we get while listening Kafka.

Issue: Null pointer exception when converting RDD to DF or DataSet. Below is the code with problem:

def processData(spark: SparkSession, jobArgs: JobArgs, kinesisStream:ReceiverInputDStream[Array[Byte]]):Unit={
    val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
    // Import spark implicits which add encoders for case classes.
    import spark.implicits._
    val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))
    events.foreachRDD(rdd => {
      spark.createDataset(rdd)
        .write
        .partitionBy("date") // TODO add hour
        .mode(SaveMode.Append.name())
        .parquet(jobArgs.outputPath)
    })
}

What was the problem: This code works when the checkpoint directory doesn't exists but fails with the Null Pointer Exception when checkpoint directory exists.

Why: My theory is it tries to get SQLContext and other object through deserialization but they are not available.

How did I solve this: By building the SQLContext again before converting rdd to dataset. See the below code:

def processData(spark: SparkSession, kinesisStream: ReceiverInputDStream[Array[Byte]]): Unit = {
    val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
    // Import spark implicits which add encoders for case classes.
    import spark.implicits._
    val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))

    events.foreachRDD(rdd => {
      val sqlContext = SparkSession.builder().getOrCreate().sqlContext
      import sqlContext.implicits._

      val outputPath: String = sqlContext.sparkSession.conf.get("output.path")
      sqlContext.createDataset(rdd)
        .write
        .partitionBy("date") // TODO add hour
        .mode(SaveMode.Append.name())
        .parquet(outputPath)
    })
}

Let me know if it helps.

Thanks, Hussain Bohra