I created a tiny worker system to run parallel jobs with maximum multi-core processor utilization. It seems to work fine, but at some point, when working with a larger amount of jobs, an error appears (no error message, just hangs), which I suspect to be a low-level race condition. I cannot decide whether this is the fault of cats-effect, which I use to implement parallelism, or Atomic or TrieMap.
Here's a minified implementation which can be used to illustrate and test the issue:
import cats.effect.IO
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.concurrent.TrieMap
import cats.effect.unsafe.implicits.global
import java.util.concurrent.ConcurrentHashMap
object ThreadingError extends App:
val jobIdsAdded = (0L until 10000L).toList
for (_ <- jobIdsAdded.iterator) {
ParallelJobs.addJob(() => {})
}
while(ParallelJobs.count.get() < 10000L) {
print(s"${ParallelJobs.count.get()}\r")
Thread.sleep(200)
}
object ParallelJobs:
private val allCores = Runtime.getRuntime.availableProcessors()
private val availableCores = allCores - 1
private val assignedTillJobId: AtomicLong = AtomicLong(0L)
val jobsTrieMap: TrieMap[Long, () => Any] = TrieMap.empty[Long, () => Any]
val jobsConcurrentHashMap: ConcurrentHashMap[Long, () => Any] = ConcurrentHashMap[Long, () => Any]()
val locked: AtomicBoolean = AtomicBoolean(false)
val count: AtomicLong = AtomicLong(0L)
workerGroup
.unsafeRunAsync(either => {
if (either.isLeft)
println(either.left.get.getMessage)
either.left.get.printStackTrace()
})
def addJob(jobFn: () => Any): Unit =
val jobId = jobsTrieMap.size
jobsTrieMap(jobId) = jobFn
//val jobId = jobsConcurrentHashMap.size()
//jobsConcurrentHashMap.put(jobId, jobFn)
private def workerGroup: IO[Unit] = (0 until availableCores).map(_ => worker).reduce(_ &> _)
private def worker: IO[Unit] =
IO({
while (true) {
if (!locked.get() && jobsTrieMap.nonEmpty)
//if (!locked.get() && !jobsConcurrentHashMap.isEmpty)
locked.set(true)
val jobId = assignedTillJobId.getAndIncrement()
val toDo = jobsTrieMap(jobId)
//val toDo = jobsConcurrentHashMap.get(jobId)
jobsTrieMap -= jobId
//jobsConcurrentHashMap.remove(jobId)
locked.set(false)
toDo() // long running job
count.incrementAndGet()
else
Thread.sleep(100)
}
})
As you can see, I also tried ConcurrentHashMap; using it simply stopped the app from running. I came up with some locking mechanism to test whether the issue was caused by multiple workers trying to write the TrieMap, but that did not help either.
I use Scala 3.3 and cats-effect 3.5.1
This code doesn't seem to be aware of how the concept of
IOtype is supposed to work.The idea behind all these IO types on Scala (Future, Cats Effect's IO, ZIO, Monix's Task...) is that you write kinda declarative code (and surely higher level than individual
Threads and locks) and it is send to a runner with aScheduler. Since all of you code is supposed to be build with:map,flatMap,recover,traverse, and other combinations, the assumption is that each individual uninterruptible piece inside IO is:It allows the Scheduler to:
This means among others:
Thread.sleep- there are instructions to let the fiber sleep without blocking a wholeThreadin a thread pool (and potentially blocking a whole thread pool with operations thatsleepnaively)while(true)- because it hijacks a wholeThreadand makes it impossible to cancel the operation (and wRefs,Semaphores, etc).It doesn't mean that it is absolutely not possible to use these in with IO monads, just that these were constructed to make it easy to build concurrent app... as long as you use them, because they expects that you don't break the assumptions they are build upon (similarly to how you might have a bad experience in Spring Framework + Hibernate if you decide that
Threadpinning is not a thing and start running you own thing on the side). So, messing around low-level is something I recommend waiting until you understand how the runtime works underneath.Meanwhile, I recommend to rewrite this whole code to something which uses the build-in combinators and operations:
All of the monads for side effects:
scala.concurrent.Future,cats.effect.IO,monix.eval.Task,zio.ZIOhave a defaultSchedulerwhich usesRuntime.getRuntime.availableProcessors()to decide the size of theThreadpool. And they are additionally aware of things like how to handle blocking (defined declaratively) in a way that prevents stucking all of the threads in "waiting" state where the job that could "notify" them cannot get to the thread pool. Or handlingSIGTERMsignals to cancel ongoing tasks but in a predicatable manner (jobs that need to finish could be madeuncancellable, streams could be adjusted to safely finish processing that one task and then not take another,Resources could be safely closed and cleaned, etc).Meaning that the entirety of your code can be replaced with just some
parTraverseor something to get the same result but without race conditions.If you want to control the size of your parallelism more granularly, you can use:
Long story short: don't roll your own low-level solution (until you understand what the library does under the hood) and it will be fine. There is enough helpers to do roll your own solution on top of the library's API to deliver the same thing easier, faster and without race conditions or other issues.