I am trying to use GraphX stronglyConnectedComponents algorithm, and I'm getting StsackOverflow failures using 5M edges on 10M vertices (it is a forest of small graphs). The repeating items in the stack trace indicate that serialization is involved:
at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at scala.collection.generic.DefaultSerializationProxy.readObject(DefaultSerializationProxy.scala:58)
This is running on local cluster with 20GB JVM memory and 16 cores. this example builds with Maven and JDK17.
Source follows:
package net.redpoint.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.GraphOps;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import scala.reflect.ClassTag$;
import java.io.BufferedOutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
public class Test1 {
public static void main(String[] args) throws Exception {
var conf = new SparkConf();
var session = SparkSession.builder()
.config(conf)
.appName("Test")
.master("local[*]")
.getOrCreate();
session.sparkContext().setCheckpointDir(System.getProperty("java.io.tmpdir"));
final int NBR_VERTICES = 1000000;
final int NBR_EDGES = NBR_VERTICES / 2;
String pathStr = System.getProperty("java.io.tmpdir") + "/test.csv";
var path = Paths.get(pathStr);
try (var os = new BufferedOutputStream(Files.newOutputStream(path));
var ps = new PrintStream(os);
) {
ps.println("id1,id2");
var r = new Random(0);
for (int j = 0; j < NBR_EDGES; j++) {
String id1 = String.format("%08d", r.nextInt(NBR_VERTICES));
String id2 = String.format("%08d", r.nextInt(NBR_VERTICES));
ps.println(id1 + "," + id2);
}
} catch (Exception ex) {
throw new RuntimeException("Error making CSV file " + pathStr, ex);
}
var schema = StructType.fromDDL("id1 LONG, id2 LONG");
Dataset<Row> csvData = session.read().format("csv").schema(schema).option("header","true").load(pathStr);
var rows = csvData.toJavaRDD();
JavaRDD<Edge<Long>> edges = rows.map(r -> new Edge(r.getLong(0), r.getLong(1), 1L));
// Because GraphX needs directed edges
edges = edges.union(rows.map(r -> new Edge(r.getLong(1), r.getLong(0), 1L)));
org.apache.spark.graphx.Graph<Long,Long> graph = org.apache.spark.graphx.Graph.fromEdges(edges.rdd(), 1L, StorageLevel.MEMORY_AND_DISK(), StorageLevel.MEMORY_AND_DISK(), ClassTag$.MODULE$.apply(Long.class), ClassTag$.MODULE$.apply(Long.class));
// Transform into groups by treating match pairs as vertices of a graph and finding the strongly connected components of the graph.
var ops = new GraphOps<Long,Long>(graph, ClassTag$.MODULE$.apply(Long.class), ClassTag$.MODULE$.apply(Long.class));
var scc = ops.stronglyConnectedComponents(100);
}
}