A Multithreaded Job Queue For Ruby (on Rails)

Tags: , , ,
Posted 1040 Tage zuvor.

The Job Queue Problem

I have a Ruby on Rails application with a rapidly growing database of currently 50,000 jobs per day to be executed. Each jobs fetches data over the internet and takes about 1-10 seconds to execute. These tasks are scheduled for specific times and should be executed with as little delay as possible.

I experimented with different solutions. They were either too slow or too complex or too hard to integrate into my code. Since I like solving problems and I had some time for the project, I rolled my own job queue.

A Multithreaded Queue is Needed For The Job

It was clear for me that the solution had to be multithreaded, so multiple jobs could be processed at the same time and no job would have to wait unnecessary. The problem with reading jobs from a database that are processed in separate threads is that you are very likely reading jobs that are already being processed. You could solve this by read locking the jobs in the database. But then the jobs would not be readable for other parts of the application also. You could also write a "being processed" flag into each job row in the database, but that again would slow down the application by requiring one extra write access for each job.

A Simple Solution

Ruby on Rails Job Queue - Diagram

The first solution I came up with: jobs are read from the database into an thread safe IN-queue inside the main loop. From this queue the worker threads pull theirs jobs. When they are done, they write the processed job back into the thread safe OUT-queue. Writing the jobs back into the database is also done in the main loop. To prevent jobs from being read from the database that are already being processed, a list of these job's primary ids is kept and updated in the main loop to exclude them from being read like this:

"SELECT * FROM jobs WHERE id NOT IN (5,7,9,20,8,33,...)"

Look again at the diagram on top: everything you see on the left of the thin pink dividing line is done in the main loop. Everything on the right is done in the threaded workers. There's only one problem with this solution: writing back to the database is done sequentially in one place inside the main loop. When many jobs are written back into the database, the queue-IN runs empty and the workers starve.

A Slightly More Complex Solution

With Rails, Active Record is thread safe and supports multiple concurrent connections. So why not do the saving inside the worker? The OUT-queue is then just used to update the locked ids list. For me this runs really smooth. Just make sure you set the pool of database connections greater than the number of WORKER_THREADS. I use an Active Record pool of 25 connections and 20 threads for maximum performance. If you test this code, I would recommend to benchmark you application with the database saving done inside the main loop vs. inside the worker. In this diagram, the grey arrows represent the data used for updating die locked ids list, whereas the black arrows represent the actual job data.

A Multithreaded Ruby on Rails Queue - More Complex

It's Scalable

That's it basically. The job queue is scalable to some extend. While it would be possible to access the queues from other workers in a cloud, this would slow down the main loop, because the queue needs to be locked internally while a thread is accessing it over a possibly slow network connection. Maybe it would be a better approach, if each worker acted as a distributer itself and gave some of it's jobs away to other workers in the cloud.

For my project I will probably not scale this job queue over multiple machines. I ran some benchmarks and the results show that in theory 500,000 jobs could be processed per day on one server, that's almost 6 jobs per second, each running ~5 seconds in average. If I get close to this number, I will put new clients jobs onto another server setup.

It's Stable in Production

I should note that this job queue code (with some more details) is running stable in production for some months now and never showed any problems. I just make sure the script is running and the number of "over aged" jobs in the database is under a certain limit by external monitoring and restart the script if it should be ever necessary.

Please do comment if you have suggestions how to improve it. If you use the following code it in your project, I would be happy if you let me know.

The Full Code

Here is the full code to the Job Queue for Ruby on Rails. It's a little simplified from the version I run in production to make it more clear how it works. Do with it whatever you want. With a few changes you could also run it as a Ruby script without the Rails environment:

require 'thread'

RAILS_ENV = ARGV[0] || 'development'
require File.dirname(__FILE__) + '/../config/environment.rb'

queue_in   = Queue.new # Thread safe
queue_out  = Queue.new # Threaf safe
locked_ids = Array.new # NOT thread safe

WORKER_THREADS = 20
worker_threads = []

def start_worker_threads(worker_threads, queue_in, queue_out)
 
  # Start all threads at once
  while worker_threads.size < WORKER_THREADS
   
    # Start thread
    worker_threads << Thread.new {
   
      # Do the worker stuff!
      while true
        sleep(0.1) # Take a little break, so an idle running worker doesn't keep the system busy
        if queue_in.length > 0           
          job = queue_in.pop         
          job.work!       
          queue_out.push job           
        end
      end 
    }
  end
end

# The main loop
while true
  begin
   
    # Don't read from the database if the queue is already packed
    if queue_in.length < WORKER_THREADS * 2
     
      # Create SQL for locked ids
      locked_ids_sql = locked_ids.collect { |locked_id| locked_id[0] }.join(', ')
      locked_ids_sql = "AND id NOT IN(" + locked_ids_sql + ")" if locked_ids_sql.length > 0
 
      # Read jobs from database
      jobs = Job.find(
        :all,
        :conditions => "#{locked_ids_sql} AND UTC_TIMESTAMP() >= scheduled_at",
        :order => "scheduled_at ASC",
        :limit => WORKER_THREADS * 2
      )
 
      # Push jobs into queue_in
      while job = jobs.pop
        queue_in << job
        locked_ids << [job.id, Time.now]
      end
    end

    # Slow it down a little, to give more resources to the workers
    sleep(1)
 
    # Start threads
    start_worker_threads(worker_threads, queue_in, queue_out)
   
    # Pop jobs from queue_out and delete locked IDs.
    while queue_out.length > 0 and job = queue_out.pop
      # job.save # Save back to database. Can also be done inside the worker.
      locked_ids.delete_if { |locked_id| locked_id[0] == job.id }
    end
 
    # Delete stale IDs from locked_ids (just in case - this should never happen)
    locked_ids.delete_if { |locked_id| locked_id[1] < 5.minutes.ago }
  rescue     
    # Log error...
  end
end

SubscribeSubscribe to this Feed

Save to: Del.icio.us Save to: Google Save to: Facebook Save to: StumbleUpon Save to: Slashdot Save to: Technorati



Comment on this article [4]

  1. dan () said 783 Tage zuvor:

    sleeping and checking queue_in.length is unnecessary. .pop suspends the thread until the queue has something to pop

  2. Niko (www.rubyrobot.org) said 783 Tage zuvor:

    Dan, thanks for the info. I will update the article.

  3. Rony Anderson Chaves Freitas () said 506 Tage zuvor:

    Great article!

  4. Mousavi (fd) said 217 Tage zuvor:

    Hi, very helpful article. Thanks for great sharing knowledge.

Don't even bother spamming, comments are moderated.
Textile-Hilfe