When resuming a failed job from a checkpoint application logic is invoked correctly and RDD's are reinstantiated, however a call to RDD.map results in a NullPointerException.
lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
private def createStreamingContext: StreamingContext = {
val ssc = new StreamingContext(spark.sparkContext, batchInterval)
ssc.checkpoint(checkpointDir)
consumeStreamingContext(ssc)
ssc
}
def consumeStreamingContext(ssc: StreamingContext) = {
//... create dstreams
val dstream = KinesisUtil.createStream(....
...
dstream.checkpoint(batchInterval)
dstream
.foreachRDD(process)
}
def process(events: RDD[Event]) = {
if (!events.isEmpty()) {
logger.info("Transforming events for processing")
//rdd seems to support some operations?
logger.info(s"RDD LENGTH: ${events.count}")
//nullpointer exception on call to .map
val df = events.map(e => {
...
}
}
}
EDIT: Updated to inform that I'm using Kinesis and WAL is enabled. Is WAL checkpointing supported on S3? I'm reading elsewhere that is not well supported. https://issues.apache.org/jira/browse/SPARK-9215
EDIT: Im experiencing similar results with HDFS.
The solution was to call
rdd.checkpoint
after each transformation within foreach. Every RDD transformation must be checkpointed.