Spark streaming, java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$

530 Views Asked by At

i am trying to read data from kafka stream, that uses avro serialization for value. I have no problem reading data, and deserializing key that is string, but when i try to deserialise data using from_avro function i get exception

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$
    at DataFrameExample$.main(DataFrameExample.scala:41)
    at DataFrameExample.main(DataFrameExample.scala)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:578)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.functions$
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 12 more

Sbt file:

name := "test"
organization := "com.databricks"
version := "1"
scalaVersion := "2.12.17"
// Spark Information
val sparkVersion = "3.3.0"
// allows us to include spark packages
resolvers += "bintray-spark-packages" at
  "https://dl.bintray.com/spark-packages/maven/"
resolvers += "Typesafe Simple Repository" at
  "https://repo.typesafe.com/typesafe/simple/maven-releases/"
resolvers += "MavenRepository" at
  "https://mvnrepository.com/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-avro" % sparkVersion % Provided
)

Code:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import java.nio.file.{Files, Paths}

object DataFrameExample extends Serializable {
  def main(args: Array[String]) = {
    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN");

    val currentDirectory = new java.io.File(".").getCanonicalPath
    println(currentDirectory);

    val df = spark.readStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "localhost:9091,localhost:9093,localhost:9094"
      )
      .option("startingOffsets", "latest")
      .option("subscribe", "test-person-activity-partitions-replication-qwe")
      .load()

    val jsonFormatSchema = new String(
      Files.readAllBytes(
        Paths.get("./src/main/resources/avro/person-activity.avsc")
      )
    )

    val df2 = df.select(
      df.col("key").cast("string"),
      from_avro($"value", jsonFormatSchema).as("value")
    )

    df2.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }
}

when running application using spark submit i also provide spark_sql_kafka_0-10, which i cannot provide from sbt since i get some other errors, this is not directly related to this problem, but if someone knows what could be the reason for this feel free to answer to that also.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class DataFrameExample --master local target/scala-2.12/test_2.12-1.jar

Sorry if this is duplicate but i have looked at every answer o so and other places when searching for similar errors. Tnx

2

There are 2 best solutions below

5
Kashyap On BEST ANSWER

As documented, add the package on command line. Ensure version values are correct.

spark-submit --packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0" \
             --class DataFrameExample \
             --master local target/scala-2.12/test_2.12-1.jar

1
Zied Yazidi On

You are specifying the dependency in your Sbt file, but you are mentioning that it is provided: "org.apache.spark" %% "spark-avro" % sparkVersion % Provided
This means that this dependency is excluded from being on the class path by default. As a result, it isn't included into a fat jar artifact. That's why when you run spark-submit command, it doesn't find that dependency.
To solve that you have two options:

  • Pass the jar to the spark-submit command using the --packages option:

    --packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"

  • Change the scope of your dependency as compile:
    "org.apache.spark" %% "spark-avro" % sparkVersion % Compile