I am sure this is due to my inability to fully grasp the concept of .par under the hood, but I am seeing something a bit strange when using it along with a ForkJoinPool.
I have an ETL process that uses multi-threading to make parallel requests to the source system (normal stuff - Postgres, SQL Server, Oracle, etc) and then does quite a bit of work before putting the data into Azure Data Lake storage, and visualizing the tables through Databricks. We are using Databricks for our processing as well. The ETL process is written in Scala and is a JAR file that gets pulled from our internal Nexus repo upon ETL start, for a given ETL pipeline.
So, we have a bit of code like this:
val jobConcurrency = 5
// Get all the "things" we want to ingest, for a given pipeline. (This is a spark dataframe)
val configDF = getIngestionObjects(....)
// Collect and make par
val configDFPAR = configDF.toJSON.collect().par
// Add to new pool. (I have used both scala and java ForkJoinPool, with the exact same results)
configDFPAR.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(jobConcurrency))
// Do things
configDFPAR.foreach{element =>
// GO TO SOURCE
// PARTITION AND CLEAN DATA
// INSERT OR MERGE DATA
// RUN OPTIMIZE STATEMENTS
// CLEAN UP STAGING TABLES
// BUILD BRONZE/SILVER/GOLD TABLES
// TO ANY POST-PROCESSING
// NEXT
}
So, using grouped(x) with .par, it works exactly as I would expect... 5 tables come in, 5 get .par, and the next 5 drop in once all 5 original tables finish. The issue is one slow buffalo of of a table causes the entire thing to wait. I would rather keep it all under .par, and enable this queue based approach, but what I am seeing is that the job runs 5 tables, then 5, then 5, etc.. there is no waiting until one is finished. I can watch the spark logs, and see that at some point I might have 8 or 16 or even 32 tables ingesting at the same time (based on my cluster size).
So, is my understanding of .par wrong? I assume it hits some "checkpoint" where it says, "Ok, next", but I am surprised that its using more threads than 5 at a time. Again, I am probably doing something wrong here.
Thanks for any suggestions!