A Multithreaded Job Queue For Ruby (on Rails)
Tags: job queue,
load balancing,
mysql,
ruby on rails
Posted 1040 Tage zuvor.
The Job Queue Problem
I have a Ruby on Rails application with a rapidly growing database of currently 50,000 jobs per day to be executed. Each jobs fetches data over the internet and takes about 1-10 seconds to execute. These tasks are scheduled for specific times and should be executed with as little delay as possible.
I experimented with different solutions. They were either too slow or too complex or too hard to integrate into my code. Since I like solving problems and I had some time for the project, I rolled my own job queue.
A Multithreaded Queue is Needed For The Job
It was clear for me that the solution had to be multithreaded, so multiple jobs could be processed at the same time and no job would have to wait unnecessary. The problem with reading jobs from a database that are processed in separate threads is that you are very likely reading jobs that are already being processed. You could solve this by read locking the jobs in the database. But then the jobs would not be readable for other parts of the application also. You could also write a "being processed" flag into each job row in the database, but that again would slow down the application by requiring one extra write access for each job.
A Simple Solution
The first solution I came up with: jobs are read from the database into an thread safe IN-queue inside the main loop. From this queue the worker threads pull theirs jobs. When they are done, they write the processed job back into the thread safe OUT-queue. Writing the jobs back into the database is also done in the main loop. To prevent jobs from being read from the database that are already being processed, a list of these job's primary ids is kept and updated in the main loop to exclude them from being read like this:
Look again at the diagram on top: everything you see on the left of the thin pink dividing line is done in the main loop. Everything on the right is done in the threaded workers. There's only one problem with this solution: writing back to the database is done sequentially in one place inside the main loop. When many jobs are written back into the database, the queue-IN runs empty and the workers starve.
A Slightly More Complex Solution
With Rails, Active Record is thread safe and supports multiple concurrent connections. So why not do the saving inside the worker? The OUT-queue is then just used to update the locked ids list. For me this runs really smooth. Just make sure you set the pool of database connections greater than the number of WORKER_THREADS. I use an Active Record pool of 25 connections and 20 threads for maximum performance. If you test this code, I would recommend to benchmark you application with the database saving done inside the main loop vs. inside the worker. In this diagram, the grey arrows represent the data used for updating die locked ids list, whereas the black arrows represent the actual job data.
It's Scalable
That's it basically. The job queue is scalable to some extend. While it would be possible to access the queues from other workers in a cloud, this would slow down the main loop, because the queue needs to be locked internally while a thread is accessing it over a possibly slow network connection. Maybe it would be a better approach, if each worker acted as a distributer itself and gave some of it's jobs away to other workers in the cloud.
For my project I will probably not scale this job queue over multiple machines. I ran some benchmarks and the results show that in theory 500,000 jobs could be processed per day on one server, that's almost 6 jobs per second, each running ~5 seconds in average. If I get close to this number, I will put new clients jobs onto another server setup.
It's Stable in Production
I should note that this job queue code (with some more details) is running stable in production for some months now and never showed any problems. I just make sure the script is running and the number of "over aged" jobs in the database is under a certain limit by external monitoring and restart the script if it should be ever necessary.
Please do comment if you have suggestions how to improve it. If you use the following code it in your project, I would be happy if you let me know.
The Full Code
Here is the full code to the Job Queue for Ruby on Rails. It's a little simplified from the version I run in production to make it more clear how it works. Do with it whatever you want. With a few changes you could also run it as a Ruby script without the Rails environment:
RAILS_ENV = ARGV[0] || 'development'
require File.dirname(__FILE__) + '/../config/environment.rb'
queue_in = Queue.new # Thread safe
queue_out = Queue.new # Threaf safe
locked_ids = Array.new # NOT thread safe
WORKER_THREADS = 20
worker_threads = []
def start_worker_threads(worker_threads, queue_in, queue_out)
# Start all threads at once
while worker_threads.size < WORKER_THREADS
# Start thread
worker_threads << Thread.new {
# Do the worker stuff!
while true
sleep(0.1) # Take a little break, so an idle running worker doesn't keep the system busy
if queue_in.length > 0
job = queue_in.pop
job.work!
queue_out.push job
end
end
}
end
end
# The main loop
while true
begin
# Don't read from the database if the queue is already packed
if queue_in.length < WORKER_THREADS * 2
# Create SQL for locked ids
locked_ids_sql = locked_ids.collect { |locked_id| locked_id[0] }.join(', ')
locked_ids_sql = "AND id NOT IN(" + locked_ids_sql + ")" if locked_ids_sql.length > 0
# Read jobs from database
jobs = Job.find(
:all,
:conditions => "#{locked_ids_sql} AND UTC_TIMESTAMP() >= scheduled_at",
:order => "scheduled_at ASC",
:limit => WORKER_THREADS * 2
)
# Push jobs into queue_in
while job = jobs.pop
queue_in << job
locked_ids << [job.id, Time.now]
end
end
# Slow it down a little, to give more resources to the workers
sleep(1)
# Start threads
start_worker_threads(worker_threads, queue_in, queue_out)
# Pop jobs from queue_out and delete locked IDs.
while queue_out.length > 0 and job = queue_out.pop
# job.save # Save back to database. Can also be done inside the worker.
locked_ids.delete_if { |locked_id| locked_id[0] == job.id }
end
# Delete stale IDs from locked_ids (just in case - this should never happen)
locked_ids.delete_if { |locked_id| locked_id[1] < 5.minutes.ago }
rescue
# Log error...
end
end
