Checkpointing With NOT Serializable

700 Views Asked by At

Want to understand a basic issue. Here is my code:

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int ) = {

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))

ssc
}

val ssc = StreamingContext.getOrCreate(sparkCheckpointDir, () => createStreamingContext(sparkCheckpointDir, batchDuration))


val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody))


inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) =>
    val df = spark.read.json(rdd)
    df.show(truncate=false)

}

ssc.start()
ssc.awaitTermination()

the above code works, and I can see the DF.

The issue is: If I enable check point by

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int ) = {

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc.checkpoint(sparkCheckpointDir)
ssc
}

Then the ssc.start() fails with "

DStream checkpointing has been enabled but the DStreams with their functions are not serializable"

What I am doing wrong? I want to process DF with checkpoint enabled.

Spark Version: version 2.0.2.2.5.4.2-7 Launch: spark-shell --jars spark-streaming-eventhubs_2.11-2.1.1.jar

2

There are 2 best solutions below

2
On

I think Why is my Spark Streaming application throwing a NotSerializableException when I enable checkpointing? will solve your problem:

If you enable checkpointing in Spark Streaming, then objects used in a function called in foreachRDD should be Serializable

Solutions:

  • Turn off checkpointing by removing the jssc.checkpoint line.
  • Make the object being used Serializable.
  • Declare NotSerializable inside the forEachRDD function, so the following code sample would be fine:

In your code, what is the EventHubsUtils.createDirectStreams() doing ? maybe you can make it serializable.

0
On

You can try transforming the Dstream[EventHub] to Dstream of type scala/java primitives as serialization and deserialization is handled by spark. You are facing this serialization error because in forEachRDD you are using the "show" action. This action is pulling the data from executor to driver over the wire and this requires serialization. You can convert it to Dstream of scala/java type (like Dstream[Tuple*] which would suit your case) and map each attribute in your EventHub object to a tuple element.