Spark Graphx deadlock when execute my program in mutilple process

28 Views Asked by At

When I push my undergraduate design. I have encountered a strange problem,when I execute my program in single process, the program can finish it normally. But when I change it to multiple process mode, the program suspend and not recover from suspend state. I find some log in run terminal log, like is :

23/04/23 10:29:07 INFO Executor: 1 block locks were not released by TID = 2:
[rdd_2_0]
23/04/23 10:29:07 INFO Executor: 1 block locks were not released by TID = 3:

My code is as follows:

def sample_process_test2(file_path: String): Unit = {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  val conf = new SparkConf().setAppName("SimpleGraphX")
    .setMaster("local[4]")
  val sc = SparkContext.getOrCreate(conf)
  val graph = GraphLoader.edgeListFile(sc,
    file_path).partitionBy(PartitionStrategy.RandomVertexCut)
  val canonicalGraph = graph.removeSelfEdges().convertToCanonicalEdges()
  val sortDegreeEdges = optimize.sort_edges_with_degree(canonicalGraph, sc)
  val totalEdgeCount = sortDegreeEdges.length
  val degreeLocationDict = optimize.gen_degree_location_Dictionary(canonicalGraph, sortDegreeEdges)
  val sortDegreeDict = mutable.LinkedHashMap(degreeLocationDict.toSeq.sortBy(_._1): _*)
  def collect_edge(proportion: Double,
                   sortDegreeDict: mutable.LinkedHashMap[Int, (Int, Int)]): (Int, Int) = {
    var cnt = 0
    for ((key, value) <- sortDegreeDict) {
      cnt = cnt + (value._2 - value._1)
      if (cnt * 1.0 / totalEdgeCount > proportion)
        return (key, cnt)
    }
    (-1, cnt)
  }
  val degreeMap = canonicalGraph.degrees.collect().toMap
  var subGraphMap = new mutable.HashMap[Int, Graph[Int, Int]]()

  val begin = System.currentTimeMillis()

  val (targetDegree, endIdx) = collect_edge(0.45, sortDegreeDict)
  (1 to targetDegree).par.map(it => {
    val subGraph = canonicalGraph.subgraph(vpred = (id, attr) => degreeMap.get(id).get < it)
    subGraphMap.put(it, subGraph)
    Unit
  })
  val subGraphMapBroadcast = sc.broadcast(subGraphMap)

  def par_compute(iterator: Iterator[Int]): Iterator[(Int, Int)] = {
    iterator.toArray.par.map(it => {
      val subGraph = subGraphMapBroadcast.value.get(it).get
      val triangleCount = subGraph.triangleCount().vertices.map(_._2).reduce(_ + _) / 3
      println(it,triangleCount)
      (it, triangleCount)
    }).toIterator
  }
  val rdd = sc.parallelize(1 to targetDegree, 4)
  val rdd_result = rdd.mapPartitions(par_compute).collect().foreach(println)
  val end = System.currentTimeMillis()
  println(end - begin)
}

here is my spark task web ui. It seems like the jobs don't release the cpu.

enter image description here

Can someone help me see why this task cannot be continued?

0

There are 0 best solutions below