Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

1k Views Asked by At

I am trying to read in the spark streaming offset into my consumer but I cannot seem to do it correctly.

Here is my code.

val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
  val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
  fromOffsets += topicAndPartition
}

val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here 
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

directKafkaStream.foreachRDD(rdd1 => { ..

Here is the output from showing the dataframe

partition_number|topic_name|current_offset|
+----------------+----------+--------------+
|               0|TOPIC_NAME|          4421|

Any help is greatly appreciated.

I am using spark 1.6 , Scala 2.10.5, kafka 10

1

There are 1 best solutions below

1
On BEST ANSWER

As the official document shown KafkaUtils.createDirectStream, you should pass the fromOffsets as the 3rd parameter of createDirectStream(and don't forget about the 4th parameter messageHandler).

The fromOffsets parameter suppose to be a collection.immutable.Map[TopicAndPartition, Long], we usually use immutable instead of mutable as possible in Scala.
You may transform the dfoffsetArray to immutable.Map[TopicAndPartition, Long] with the following:

val fromOffsets = dfoffsetArray.map( i =>
  TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap

And the messageHandler is type of (MessageAndMetadata[K, V]) ⇒ R), which deals key and value of messages. You can define a simple handler as the following:

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

Then your createDirectStream will look like...

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
  (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

Now you are free to do some transformation to your stream. Happy streaming!


I was tutored by this article months ago. Maybe you will find it helpful.