When add dependency to `libraryDependencies` without "provided", and when use `--packages`?

137 Views Asked by At

I have a simple Spark application doing structured streaming.

Initially, my build.sbt looks like this:

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)

I succeed sbt assembly, but later when I run spark-submit ..., I got error:

Exception in thread "main" org.apache.spark.sql.AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        
    at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1070)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:673)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:40)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Then I removed provided for spark-sql-kafka-0-10:

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)

But this time when I run sbt assembly, I got this error:

[error] 1 error(s) were encountered during the merge:

  | => ingest-from-s3-to-kafka / assembly 1s
[error] java.lang.RuntimeException: 
[error] Deduplicate found different file contents in the following:
[error]   Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]     at sbtassembly.Assembly$.merge(Assembly.scala:624)
[error]     at sbtassembly.Assembly$.$anonfun$assemble$36(Assembly.scala:330)
[error]     at sbtassembly.Assembly$.timed$1(Assembly.scala:228)
[error]     at sbtassembly.Assembly$.$anonfun$assemble$35(Assembly.scala:329)
[error]     at sbtassembly.Assembly$.$anonfun$cachedAssembly$2(Assembly.scala:523)
[error]     at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error]     at sbtassembly.Assembly$.cachedAssembly(Assembly.scala:527)
[error]     at sbtassembly.Assembly$.assemble(Assembly.scala:414)
[error]     at sbtassembly.Assembly$.$anonfun$assemblyTask$1(Assembly.scala:196)
[error]     at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error]     at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error]     at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error]     at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error]     at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error]     at sbt.Execute.work(Execute.scala:291)
[error]     at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error]     at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error]     at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error]     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
[error]     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[error]     at java.base/java.lang.Thread.run(Thread.java:833)
[error] (assembly) 
[error] Deduplicate found different file contents in the following:
[error]   Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error]   Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Total time: 3 s, completed Apr 24, 2023, 11:13:44 PM
make: *** [sbt-clean-compile-assembly] Error 1

Usually I saw "Deduplicate" error when sbt assembly multiple dependencies, but this time, there is only one spark-sql-kafka-0-10 (the rest of dependencies are "provided"). Is it because class in the sub dependencies got conflict?

Then I found --packages in this doc.

And this time spark-submit --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 ... succeed.

My question is when should add dependency to libraryDependencies without "provided", and when should use --packages? Thanks!

1

There are 1 best solutions below

0
OneCricketeer On BEST ANSWER

Try the following for that specific error

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

when should add dependency to libraryDependencies without "provided", and when should use --packages?

Use --packages if your Spark cluster has open access to the internet or an internal Maven mirror. This will download any necessary libraries at runtime, allowing you to keep your application JAR smaller, with the tradeoff that you'll need to remember to always add extra cli arguments to spark-submit.

Otherwise, adding non-provided (aka compile time) dependencies as libraries with the assembly plugin will bundle all in one JAR, but you'll need to define merge strategies for duplicate elements

spark-sql-kafka-0-10 is part of the "contrib" path in the Spark source code; thus not considered "provided" as a common runtime dependency. This is dependent on your Spark cluster, however, as you can copy the jar into each executor classpath, then marking as provided would work fine.