I am trying to read in the spark streaming offset into my consumer but I cannot seem to do it correctly.
Here is my code.
val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
fromOffsets += topicAndPartition
}
val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
directKafkaStream.foreachRDD(rdd1 => { ..
Here is the output from showing the dataframe
partition_number|topic_name|current_offset|
+----------------+----------+--------------+
| 0|TOPIC_NAME| 4421|
Any help is greatly appreciated.
I am using spark 1.6 , Scala 2.10.5, kafka 10
As the official document shown KafkaUtils.createDirectStream, you should pass the
fromOffsets
as the 3rd parameter of createDirectStream(and don't forget about the 4th parametermessageHandler
).The
fromOffsets
parameter suppose to be acollection.immutable.Map[TopicAndPartition, Long]
, we usually use immutable instead of mutable as possible in Scala.You may transform the
dfoffsetArray
toimmutable.Map[TopicAndPartition, Long]
with the following:And the
messageHandler
is type of(MessageAndMetadata[K, V]) ⇒ R)
, which deals key and value of messages. You can define a simple handler as the following:Then your
createDirectStream
will look like...Now you are free to do some transformation to your stream. Happy streaming!
I was tutored by this article months ago. Maybe you will find it helpful.