Spark-Cassandra data retrieval error, although I can describe columns: java.lang.NoClassDefFoundError

164 Views Asked by At

(first StackOverflow post - I've searched but may have overseen an answer - if so, I appologize...)

I have a local Spark standalone instance running, and have tested it quite thoroughly using Sparklyr (writing to and from memory, creating tables, pipelines, etc.). I have a Kafka installation working from which I can stream data, and the idea is to process this and save some results in a Cassandra (cluster).

I have installed a standalone Cassandra (listening on localhost) and created the following schema and table via cqlsh 127.0.0.1 9042 (so the connection to Cassandra works) with authentication switched off so that it cannot be a source of error:

CREATE KEYSPACE IF NOT EXISTS SparkTestKeyspace
WITH REPLICATION = { 
  'class' : 'SimpleStrategy', 
  'replication_factor' : 1 
};
USE SparkTestKeyspace;
CREATE TABLE emp(
  emp_id int PRIMARY KEY,
  emp_name text,
  emp_city text,
  emp_sal varint,
  emp_phone varint
);
 INSERT INTO emp (emp_id, emp_name, emp_city,
                 emp_phone, emp_sal) VALUES(1,'ram', 'Hyderabad', 9848022338, 50000);
INSERT INTO emp (emp_id, emp_name, emp_city,
                 emp_phone, emp_sal) VALUES(2,'robin', 'Hyderabad', 9848022339, 40000);
INSERT INTO emp (emp_id, emp_name, emp_city,
                 emp_phone, emp_sal) VALUES(3,'rahman', 'Chennai', 9848022330, 45000);
SELECT * FROM emp;

*emp_id | emp_city  | emp_name | emp_phone  | emp_sal
--------+-----------+----------+------------+---------
      1 | Hyderabad |      ram | 9848022338 |   50000
      2 | Hyderabad |    robin | 9848022339 |   40000
      3 |   Chennai |   rahman | 9848022330 |   45000

I've configured the Sparklyr environment as follows and get a connection to spark with no problem (I've added options(sparklyr.log.console = TRUE) to the environment to see as many errors as possible, see below.)

#Paranoid that the environment is correct so initialize it explicitly in R
Sys.setenv(JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64")
Sys.setenv(SPARK_HOME = "/home/x/spark/spark-3.2.1-bin-hadoop3.2")
#Added jar paths for cassandra and kafka (didn't help)
Sys.setenv(CLASSPATH = "/usr/share/cassandra/lib/:/opt/kafka/libs/*")

library(sparklyr)
LConfig <- spark_config()
LConfig$spark.executor.memory <- "16G"
LConfig$spark.eventLog.dir <- "file:///home/x/spark/spark-3.2.1-bin-hadoop3.2/log"
LConfig$spark.history.fs.logDirectory <- "file:///home/x/spark/spark-3.2.1-bin-hadoop3.2/log"
LConfig$'spark.cassandra.connection.host' <- "127.0.0.1" 
LConfig$'spark.cassandra.connection.port' <- "9042"
LConfig$'spark.cassandra.output.batch.grouping.key' <- "none"
LConfig$'sparklyr.connect.packages' <- "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0"

sc <- spark_connect(master = "local", version = "3.2.1",
                    config = LConfig
)

I generate a reference to the table (mapped, not in memory - see below) as follows:

EmpPointer <- spark_read_source(
  sc,
  name = "emp",
  source = "org.apache.spark.sql.cassandra",
  options = list(keyspace = "sparktestkeyspace", table = "emp"),
  memory = FALSE)

After which I can "see" the table and query its columns (metadata):

library(dplyr)
tbl_vars(EmpPointer)

<dplyr:::vars>
[1] "emp_id"    "emp_city"  "emp_name"  "emp_phone" "emp_sal"  

However, when I try to draw data from the table, etc. I get the following Java error:

S

parkTables <- src_tbls(sc) 
tbl_name <- SparkTables[1]
tbl_name

[1] "emp_939777ab_c0f4_4494_bc14_31b9b3356459"

library(DBI)
dbGetQuery(sc, paste("SELECT 'emp_city' FROM ", tbl_name))


Error in value[[3L]](cond) : 
  Failed to fetch data: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/partitioning/KeyGroupedPartitioning
    at com.datastax.spark.connector.datasource.CassandraScan.outputPartitioning(CassandraScanBuilder.scala:311)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.outputPartitioning(DataSourceV2ScanExecBase.scala:87)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.outputPartitioning$(DataSourceV2ScanExecBase.scala:81)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.outputPartitioning(BatchScanExec.scala:35)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:53)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:

Same thing if I want to describe the table with

sdf_describe(EmpPointer)

or

glimpse(EmpPointer)

or if I try to load it into memory with

EmpPointer <- spark_read_source(
  sc,
  name = "emp",
  source = "org.apache.spark.sql.cassandra",
  options = list(keyspace = "sparktestkeyspace", table = "emp"),
  memory = TRUE)

As said, I can import and export text/JSON/XML/CSV, but connecting to Cassandra eludes me. Its probably obvious, but does someone know where the error comes from and how can I fix it?

23/06/14 20:45:58 ERROR sparklyr: Gateway (6527) failed calling describe on 31: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/partitioning/KeyGroupedPartitioning 

(There are about 5 feet of Java errors following this one that I can post if useful.)

I have also removed all of the cached spark packets (spark/work and below .ivy) so that fresh ones are directly downloaded from the repos. No change.

I then used this URL as a guide https://dzone.com/articles/spark-cassandra-connector-on-spark-shell and connected directly with spark-shell:

~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.3.0 --conf spark.cassandra.connection.host=127.0.0.1
sc.stop
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
sc.cassandraTable("sparktestkeyspace", "emp").select("emp_id").as((id:Int) => (id,1)).reduceByKey(_ + _).collect.foreach(println)

(1,1)                                                                           
(2,1)
(3,1)

So it would seem that the problem is with Sparklyr or how I'm doing things, since the connector is the same version and the connection works on localhost as the cqlsh program does.

Best regards Les James

Here is my R environment:

> print(sessionInfo())
R version 4.3.0 (2023-04-21)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Linux Mint 21.1

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3 
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.20.so;  LAPACK version 3.10.0

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=de_DE.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=de_DE.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=de_DE.UTF-8 LC_IDENTIFICATION=C       

time zone: Europe/Berlin
tzcode source: system (glibc)

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] DBI_1.1.3      dplyr_1.1.2    sparklyr_1.8.1
3

There are 3 best solutions below

0
stevenlacerda On

That's a version problem with your connector:

  Failed to fetch data: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/partitioning/KeyGroupedPartitioning

You'll have to verify the version of spark and match that with the connector:

https://github.com/datastax/spark-cassandra-connector/tree/master#version-compatibility

0
Andrew On

As Per https://github.com/datastax/spark-cassandra-connector - the 3.3.0 release of the connection is compatible to Spark 3.3, but you are spinning up and connecting to a Spark 3.2.1.

Can you try align the versions, either use spark 3.3 or the version 3.2 of the connector and retest.

The oddity is that in spark shell its ok - but this appears to be a version mismatch.

0
James75 On

I switched to the correct version, com.datastax.spark:spark-cassandra-connector_2.12:3.2.0, and it worked correctly. Couldn't see the wood for all of the trees.