I want use kafka-sparkstreaming to process data(maven: spark-streaming-kafka-0-10_2.11). Spark Streaming consumes streaming from Kafka. And I have already finish a producer with springboot. But when I try to run my kafka-sparkstreaming code, it keep return like this:
24/03/01 13:42:39 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Discovered group coordinator Host2:9092 (id: 2147483645 rack: null)
24/03/01 13:42:39 INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Revoking previously assigned partitions []
24/03/01 13:42:39 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] (Re-)joining group
24/03/01 13:42:39 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Group coordinator Host2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
24/03/01 13:42:39 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Discovered group coordinator Host2:9092 (id: 2147483645 rack: null)
24/03/01 13:42:39 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Group coordinator Host2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Discovered group coordinator Host2:9092 (id: 2147483645 rack: null)
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] (Re-)joining group
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Group coordinator Host2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Discovered group coordinator Host2:9092 (id: 2147483645 rack: null)
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Group coordinator Host2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Discovered group coordinator Host2:9092 (id: 2147483645 rack: null)
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] (Re-)joining group
24/03/01 13:42:40 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-streaming-application] Group coordinator Host2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
My env:
I use three virtual machines in VM. IPs set as [Host0, Host1, Host2]
OS: all of them is Ubuntu 22.04.
Code: Java 8, maven project
Kafka: 2.0.0 with scale 2.11, both Server and Client
Zookeeper: 3.4.12
Spark: 2.4.0
Work topic: test-topic
The kafka-topic.sh return like this:
root@ProjectTest:/opt/kafka_2.11-2.0.0# bin/kafka-topics.sh --zookeeper Host1:2181,Host0:2181,Host2:2181/kafka --describe
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 6 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 8 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 9 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 12 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 15 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 16 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 17 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 18 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 19 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 20 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 21 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 22 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 23 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 24 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 25 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 27 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 28 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 29 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 30 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 31 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 32 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 33 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 34 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 35 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 36 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 37 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 39 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 40 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 41 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 42 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 43 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 44 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 45 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 46 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 47 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 48 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1
Topic:test-topic PartitionCount:4 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,0
Topic: test-topic Partition: 1 Leader: 1 Replicas: 0,2,1 Isr: 1,0,2
Topic: test-topic Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test-topic Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 2,0
And my Client properties:
# kafka conf
kafka.bootstrap.servers=Host0:9092,Host1:9092,Host2:9092
kafka.topic.name=test-topic
kafka.group.id=spark-streaming-application
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.groupm.sparkstreaming.deserialize.JSONDeserializer
kafka.auto.offset.reset=latest
kafka.enable.auto.commit=false
# spark streaming conf
spark.app.name=SparkStreaming-Kafka-Application
spark.master=spark://Host0:7077
# No hadoop, Standalone model
spark.streaming.batch.duration=500
spark.streaming.checkpoint.dir=file:///opt/tmp/spark/checkpoint
My code with kafka:
SparkConf sparkConf = new SparkConf()
.setAppName(appConfig.getProperty("spark.app.name"))
.setMaster(appConfig.getProperty("spark.master"));
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations
.milliseconds( Long.parseLong(appConfig.getProperty("spark.streaming.batch.duration")) ));
ssc.checkpoint(appConfig.getProperty("spark.streaming.checkpoint.dir"));
Collection<String> topics = Arrays.asList(appConfig.getProperty("kafka.topic.name"));
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaConfig)
);
It use Dstream.
I try to connect Host2:9092 and it works. Then I look up $KAFKA_HOME/logs/server.log but there have no log message about Coordinator.
I clean the /tmp/kafka-logs/__consumer* and restart server and client, but problem still here.
I tried delete topic __consumer_offsets,which I found in other qusetion post, but Kafka refused to delete it.
The most puzzling thing is that I try to find is there any problems with my kafka, so I write a new programme. It is a simple consumer and it have same config with my kafka-sparkstreaming app. I run the new programme at the same computer, same environment. And the new programme receive the message successfully. (I don't know if the reason is difference of KafkaUtils and KafkaConsumer).
So now I really don't know where is wrong, what is the root problem and how to solve it.