I am trying to perform Kafka and Spark Streaming in colab, using the movielens 1m dataset. (Download Here)
I am having trouble with the .readStream function when reading data from kafka into my spark session, particularly at the .load() line:
# Read data from Kafka as a DataFrame
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.load()
I am getting this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-55-97fbf29e82ce> in <cell line: 14>()
17 .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
18 .option("subscribe", kafka_topic_name) \
---> 19 .load()
20
21 # Convert value column from Kafka to string
3 frames
/usr/local/lib/python3.10/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o179.load.
: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaSourceProvider$
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
To solve the issue, I tried using versions of kafka and spark configured for scala 2.13, and nothing changed. Current versions of kafka and spark i'm using:
Kafka: kafka_2.12-3.5.1 (for scala 2.12)
Spark: spark-3.5.1-bin-hadoop3
Maven: "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar"
I also examined the environment variables which have been set correctly, so that isn't the issue.
I also tried to include kafka-clients dependencies as follows:
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.kafka:kafka-clients:3.5.1") \
.getOrCreate()
No luck.
What can I try to get this to work?
Link to the full colab notebook: kafka-spark-streaming.colab