I'm trying to use Spark Streaming with Kafka in a Java application, and I'm encountering issues when using spark-submit with Spark 2.11.0. I've set up my dependencies in Maven as follows:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
I've also created a SparkKafkaMain class that reads from Kafka and performs word count, and it runs without errors when I run it locally. However, when I try to submit it using spark-submit, I encounter issues.
Here's the SparkKafkaMain class:
public class SparkKafkaMain {
private static final Pattern SPACE = Pattern.compile(" ");
private SparkKafkaMain() {
}
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: SparkKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts =
words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
When I submit the application using the following command:
spark-submit --class spark.kafka.SparkKafkaMain --master local[2] test.jar localhost:2181 test MOVIE 1 >> out
I get this error: java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Json$ at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:251).
I've tried different combinations of dependencies, but nothing seems to work. I'm new to Kafka and Spark, so any help would be appreciated.