After compiling my code using sbt package and submitting them in spark:
sudo -u spark spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 6 --class viterbiAlgorithm.viterbiAlgo ./target/scala-2.11/vibertialgo_2.11-1.3.4.jar
I got this error:
Exception in thread "main" java.lang.NoSuchMethodError: breeze.linalg.DenseVector$.tabulate$mDc$sp(ILscala/Function1;Lscala/reflect/ClassTag;)Lbreeze/linalg/DenseVector;
at viterbiAlgorithm.User$$anonfun$eval$2.apply(viterbiAlgo.scala:84)
at viterbiAlgorithm.User$$anonfun$eval$2.apply(viterbiAlgo.scala:80)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at viterbiAlgorithm.User.eval(viterbiAlgo.scala:80)
at viterbiAlgorithm.viterbiAlgo$.main(viterbiAlgo.scala:28)
at viterbiAlgorithm.viterbiAlgo.main(viterbiAlgo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
The sbt build file is as follows:
name := "vibertiAlgo"
version := "1.3.4"
scalaVersion := "2.11.2"
libraryDependencies ++= Seq(
"org.scalanlp" %% "breeze" % "1.0",
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0")
I can successfully run the code locally though with sbt run, so I don't this there is anything wrong with my code. Also, the compile and run-time version of scala and spark are the same.
The code for viterbiAlgo.scala is:
package viterbiAlgorithm
import breeze.linalg._
// import org.apache.spark.sql.SparkSession
object viterbiAlgo {
def main(arg: Array[String]) {
val A = DenseMatrix((0.5,0.2,0.3),
(0.3,0.5,0.2),
(0.2,0.3,0.5))
val B = DenseMatrix((0.5,0.5),
(0.4,0.6),
(0.7,0.3))
val pi = DenseVector(0.2,0.4,0.4)
val o = DenseVector[Int](0,1,0) //Hive time + cell_id
val model = new Model(A,B,pi)
val user = new User("Jack", model, o) //Hive
user.eval() // run algorithm
user.printResult()
//spark sql
// val warehouseLocation = "spark-warehouse"
// val spark = SparkSession.builder().appName("Spark.sql.warehouse.dir").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
// import spark.implicits._
// import spark.sql
// val usr = "1"
// val model = new Model(A,B,pi)
// val get_statement = "SELECT * FROM viterbi.observation"
// val df = sql(get_statement)
// val o = DenseVector(df.filter(df("usr")===usr).select(df("obs")).collect().map(_.getInt(0)))
// val user = new User(usr, model, o)
// user.eval()
// user.printResult()
}
}
class Model (val A: DenseMatrix[Double], val B:DenseMatrix[Double], val pi: DenseVector[Double]) {
def info():Unit = {
println("The model is:")
println("A:")
println(A)
println("B:")
println(B)
println("Pi:")
println(pi)
}
}
class User (val usr_name: String, val model: Model, val o:DenseVector[Int]) {
val N = model.A.rows // state number
val M = model.B.cols // observation state
val T = o.length // time
val delta = DenseMatrix.zeros[Double](N,T)
val psi = DenseMatrix.zeros[Int](N,T)
val best_route = DenseVector.zeros[Int](T)
def eval():Unit = {
//1. Initialization
delta(::,0) := model.pi * model.B(::, o(0))
psi(::,0) := DenseVector.zeros[Int](N)
/*2. Induction
*/
val tempDelta = DenseMatrix.zeros[Double](N,N)// Initialization
val tempB = DenseMatrix.zeros[Double](N,N)// Initialization
for (t <- 1 to T-1) {
// Delta
tempDelta := DenseMatrix.tabulate(N, N){case (i, j) => delta(i,t-1)}
tempB := DenseMatrix.tabulate(N, N){case (i, j) => model.B(j, o(t))}
delta(::, t) := DenseVector.tabulate(N){i => max((tempDelta *:* model.A *:* tempB).t.t(::,i))}
}
//3. Maximum
val P_star = max(delta(::, T-1))
val i_star_T = argmax(delta(::, T-1))
best_route(T-1) = i_star_T
//4. Backward
for (t <- T-2 to 0 by -1) {
best_route(t) = psi(best_route(t+1),t+1)
}
}
def printResult():Unit = {
println("User: " + usr_name)
model.info()
println
println("Observed: ")
printRoute(o)
println("Best_route is: ")
printRoute(best_route)
println("delta is")
println(delta)
println("psi is: ")
println(psi)
}
def printRoute(v: DenseVector[Int]):Unit = {
for (i <- v(0 to -2)){
print(i + "->")
}
println(v(-1))
}
}
I also tried --jars argument and passed the location of breeze library, but got the same error.
I need to mention that the I tested the code "locally" on the server and also tested all the method on spark-shell (I can import breeze library on spark-shell on the server).
The server scala version matches the one in sbt build file. Although the spark version is 2.4.0-cdh6.2.1 for which the sbt would not compile if I added "cdh6.2.1" after "2.4.0".
I tried the two possible solutions Victor provided, but did not succeed. However, I changed the breeze version in sbt build file to 0.13.2 from 1.0, everything worked. But I have no idea what went wrong.
If you run the code locally but not in the server, that means that you are not providing the libraries in the classpath of the job you are submitting.
You have two choices:
--jarsargument and pass the location of all libraries (in your case, it seems to be thebreezelibrary).sbt assemblyplugin that will generate a fat JAR with all the dependencies needed, and then submit that JAR to the job.