Threaded DelayedJob workers dry up ActiveRecord connection pool

530 Views Asked by At

I want to run jobs scheduling (with rufus_scheduler) and processing (with delayed_job, DJ) with the ActiveRecord backend from the same jRuby process, but in different threads.

This works fine when just one DJ worker is in use, but when I set up multiple queues (with one worker per queue), the scheduler terminates with the following message:

rufus-scheduler intercepted an error:                                                                               
   job:                                                                                                                          
     Rufus::Scheduler::CronJob "*/10 * * * * *" {}                                                                   
   error:                                                                                                            
     ActiveRecord::ConnectionTimeoutError                                                                             
     could not obtain a connection from the pool within 5.000 seconds (waited 5.000 seconds); all pooled connections were in use

This leads me to believe that the workers do not free the database connections upon termination of the job, but I have yet to find the relevant code. The amount of connections in the pool is configured to be 50.

The script is supposed to keep running until it receives SIGTERM and should have access to the Rails environment. A MWE is given below:

require './config/environment'

module Jobs
  class ExampleJob < ActiveJob::Base
    queue_as :system

    def perform
      puts 'Performing ExampleJob'
    end
  end
end

QUEUES = %i(system database).freeze
NUM_QUEUES = QUEUES.size

workers = []
worker_threads = []

NUM_QUEUES.times do |queue_number|
  worker = Delayed::Worker.new(quiet: false)
  worker.name = queue_number
  workers.append(worker)
  worker_threads.append(Thread.new do
    worker.start
  end)
end

scheduler = Rufus::Scheduler.new

scheduler.cron '*/10 * * * * *' do
  puts 'Scheduled ExampleJob'
  Jobs::ExampleJob.perform_later
end

Signal.trap('TERM') do
  scheduler.shutdown
  workers.each do |worker|
    worker.stop
  end
end

scheduler.join
worker_threads.each do |thread|
  thread.join
end

Is there a way to get this to run properly?

1

There are 1 best solutions below

1
jmettraux On

How about

module Jobs
  class ExampleJob < ActiveJob::Base
    queue_as :system

    def perform
      puts 'Performing ExampleJob'
    end

    after_perform do
      ActiveRecord::Base.clear_active_connections!
    end
  end
end

?