Want to understand a basic issue. Here is my code:
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int ) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc
}
val ssc = StreamingContext.getOrCreate(sparkCheckpointDir, () => createStreamingContext(sparkCheckpointDir, batchDuration))
val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody))
inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) =>
val df = spark.read.json(rdd)
df.show(truncate=false)
}
ssc.start()
ssc.awaitTermination()
the above code works, and I can see the DF.
The issue is: If I enable check point by
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int ) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc.checkpoint(sparkCheckpointDir)
ssc
}
Then the ssc.start() fails with "
DStream checkpointing has been enabled but the DStreams with their functions are not serializable"
What I am doing wrong? I want to process DF with checkpoint enabled.
Spark Version: version 2.0.2.2.5.4.2-7 Launch: spark-shell --jars spark-streaming-eventhubs_2.11-2.1.1.jar
I think Why is my Spark Streaming application throwing a NotSerializableException when I enable checkpointing? will solve your problem:
Solutions:
In your code, what is the
EventHubsUtils.createDirectStreams()
doing ? maybe you can make it serializable.