When I push my undergraduate design. I have encountered a strange problem,when I execute my program in single process, the program can finish it normally. But when I change it to multiple process mode, the program suspend and not recover from suspend state. I find some log in run terminal log, like is :
23/04/23 10:29:07 INFO Executor: 1 block locks were not released by TID = 2:
[rdd_2_0]
23/04/23 10:29:07 INFO Executor: 1 block locks were not released by TID = 3:
My code is as follows:
def sample_process_test2(file_path: String): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SimpleGraphX")
.setMaster("local[4]")
val sc = SparkContext.getOrCreate(conf)
val graph = GraphLoader.edgeListFile(sc,
file_path).partitionBy(PartitionStrategy.RandomVertexCut)
val canonicalGraph = graph.removeSelfEdges().convertToCanonicalEdges()
val sortDegreeEdges = optimize.sort_edges_with_degree(canonicalGraph, sc)
val totalEdgeCount = sortDegreeEdges.length
val degreeLocationDict = optimize.gen_degree_location_Dictionary(canonicalGraph, sortDegreeEdges)
val sortDegreeDict = mutable.LinkedHashMap(degreeLocationDict.toSeq.sortBy(_._1): _*)
def collect_edge(proportion: Double,
sortDegreeDict: mutable.LinkedHashMap[Int, (Int, Int)]): (Int, Int) = {
var cnt = 0
for ((key, value) <- sortDegreeDict) {
cnt = cnt + (value._2 - value._1)
if (cnt * 1.0 / totalEdgeCount > proportion)
return (key, cnt)
}
(-1, cnt)
}
val degreeMap = canonicalGraph.degrees.collect().toMap
var subGraphMap = new mutable.HashMap[Int, Graph[Int, Int]]()
val begin = System.currentTimeMillis()
val (targetDegree, endIdx) = collect_edge(0.45, sortDegreeDict)
(1 to targetDegree).par.map(it => {
val subGraph = canonicalGraph.subgraph(vpred = (id, attr) => degreeMap.get(id).get < it)
subGraphMap.put(it, subGraph)
Unit
})
val subGraphMapBroadcast = sc.broadcast(subGraphMap)
def par_compute(iterator: Iterator[Int]): Iterator[(Int, Int)] = {
iterator.toArray.par.map(it => {
val subGraph = subGraphMapBroadcast.value.get(it).get
val triangleCount = subGraph.triangleCount().vertices.map(_._2).reduce(_ + _) / 3
println(it,triangleCount)
(it, triangleCount)
}).toIterator
}
val rdd = sc.parallelize(1 to targetDegree, 4)
val rdd_result = rdd.mapPartitions(par_compute).collect().foreach(println)
val end = System.currentTimeMillis()
println(end - begin)
}
here is my spark task web ui. It seems like the jobs don't release the cpu.
Can someone help me see why this task cannot be continued?
