Troubleshoot .readStream function not working in kafka-spark streaming (pyspark in colab notebook)

23 Views Asked by At

I am trying to perform Kafka and Spark Streaming in colab, using the movielens 1m dataset. (Download Here)

I am having trouble with the .readStream function when reading data from kafka into my spark session, particularly at the .load() line:

# Read data from Kafka as a DataFrame
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", kafka_topic_name) \
  .load()

I am getting this error:



---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-55-97fbf29e82ce> in <cell line: 14>()
     17   .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
     18   .option("subscribe", kafka_topic_name) \
---> 19   .load()
     20 
     21 # Convert value column from Kafka to string

3 frames

/usr/local/lib/python3.10/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o179.load.
: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaSourceProvider$
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
    at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)


To solve the issue, I tried using versions of kafka and spark configured for scala 2.13, and nothing changed. Current versions of kafka and spark i'm using:

Kafka: kafka_2.12-3.5.1 (for scala 2.12)

Spark: spark-3.5.1-bin-hadoop3

Maven: "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar"

I also examined the environment variables which have been set correctly, so that isn't the issue.

I also tried to include kafka-clients dependencies as follows:

spark = SparkSession.builder \
    .appName("StructuredStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.kafka:kafka-clients:3.5.1") \
    .getOrCreate()

No luck.

What can I try to get this to work?

Link to the full colab notebook: kafka-spark-streaming.colab

0

There are 0 best solutions below