Spark GraphX stronglyConnectedComponents crashing with StackOverflow

35 Views Asked by At

I am trying to use GraphX stronglyConnectedComponents algorithm, and I'm getting StsackOverflow failures using 5M edges on 10M vertices (it is a forest of small graphs). The repeating items in the stack trace indicate that serialization is involved:

    at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at scala.collection.generic.DefaultSerializationProxy.readObject(DefaultSerializationProxy.scala:58)

This is running on local cluster with 20GB JVM memory and 16 cores. this example builds with Maven and JDK17.

Source follows:

package net.redpoint.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.GraphOps;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import scala.reflect.ClassTag$;
import java.io.BufferedOutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
public class Test1 {
    public static void main(String[] args) throws Exception {
        var conf = new SparkConf();
        var session = SparkSession.builder()
            .config(conf)
            .appName("Test")
            .master("local[*]")
            .getOrCreate();
        session.sparkContext().setCheckpointDir(System.getProperty("java.io.tmpdir"));
        final int NBR_VERTICES = 1000000;
        final int NBR_EDGES = NBR_VERTICES  / 2;
        String pathStr = System.getProperty("java.io.tmpdir") + "/test.csv";
        var path = Paths.get(pathStr);
        try (var os = new BufferedOutputStream(Files.newOutputStream(path));
            var ps = new PrintStream(os);
        ) {
            ps.println("id1,id2");
            var r = new Random(0);
            for (int j = 0; j < NBR_EDGES; j++) {
                String id1 = String.format("%08d", r.nextInt(NBR_VERTICES));
                String id2 = String.format("%08d", r.nextInt(NBR_VERTICES));
                ps.println(id1 + "," + id2);
            }
        } catch (Exception ex) {
            throw new RuntimeException("Error making CSV file " + pathStr, ex);
        }
        var schema = StructType.fromDDL("id1 LONG, id2 LONG");
        Dataset<Row> csvData = session.read().format("csv").schema(schema).option("header","true").load(pathStr);
        var rows = csvData.toJavaRDD();
        JavaRDD<Edge<Long>> edges = rows.map(r -> new Edge(r.getLong(0), r.getLong(1), 1L));
        // Because GraphX needs directed edges
        edges = edges.union(rows.map(r -> new Edge(r.getLong(1), r.getLong(0), 1L)));
        org.apache.spark.graphx.Graph<Long,Long> graph = org.apache.spark.graphx.Graph.fromEdges(edges.rdd(), 1L, StorageLevel.MEMORY_AND_DISK(), StorageLevel.MEMORY_AND_DISK(), ClassTag$.MODULE$.apply(Long.class), ClassTag$.MODULE$.apply(Long.class));
        // Transform into groups by treating match pairs as vertices of a graph and finding the strongly connected components of the graph.
        var ops = new GraphOps<Long,Long>(graph, ClassTag$.MODULE$.apply(Long.class), ClassTag$.MODULE$.apply(Long.class));
        var scc = ops.stronglyConnectedComponents(100);
    }
}

0

There are 0 best solutions below