Last Updated:

Simple, organized queue with Resque

After using many different queuing systems, from the venerable BackgrounDRb to DelayedJob, to build my own solutions, I settled on the excellent Resque.

A well-known blog post by Chris Wenstrat from GitHub says it all, and the README has everything you could ever hope for in terms of launch and up and running. However, Resque leaves a lot before imagining exactly how to integrate it into your app. This is, of course, one of its strengths as it provides a high degree of flexibility.

In this article, I'm going to introduce one such integration strategy that I found particularly useful, clean, and most importantly, simple.

Native namespace

To free up a dependency on your database that could lead to performance issues in the future, Resque uses Redis for its data store (note: If you want to learn more about how to get started with Redis, read my previous article here).

Assuming you've already configured Redis in your application, you probably already have the global variable $redis to represent the connection. Resque uses the redis-namespace gem by default to avoid polluting your redis server's key space, but I personally like to control important details like this.

Fortunately, Resque allows this, so initializing a connection is as easy as adding the following to config/initializers/resque.rb:

Resque. redis = Redis::Namespace. new(:resque, :redis => $redis)

Put jobs

Inspired by a deferred job, in Resque all you need to run code in the background is to provide a class or module that responds to the execute method. These objects also specify the name of the queue that processes them.

Therefore, the most obvious solution for Resque integration is to have one of these objects per background task. For example, you might have one class to handle user-uploaded images, another class to send your monthly newsletter to all users, and another class to update search indexes.

As you can imagine, the number of these workers will increase over time. In addition, Resque prioritizes queues based solely on the order in which they are specified for the worker, so you will need to remember which workers are working with which queues. If you have an app that constantly adds, removes, or prioritizes new background tasks, it's not only confusing, it's also going to require a lot of maintenance.

Clean approach

Instead of focusing on what should be happening in the background, let's focus on when. That is, my approach is to have priority as the guiding factor when designing an interface for Resque.

The first step is to note that in 99% of cases, background jobs fall into one of three possible priorities: high, normal, and low. While it will be easier to add more priorities later, this will only happen in very rare cases.

Having only three priorities also makes it much easier to set up workers. Each worker is always assigned to these three queues, so except for the number of employees, the team is always the same:

QUEUE=high,normal,low rake resque:work

 

If necessary, to add more processing power to the queue, simply create more workers:

COUNT=3 QUEUE=high,normal,low rake resque:workers

 

Instead, queue methods

The next step in this approach is to make it easier to add any instance method or class to one of these queues. That's where the dynamic nature of Ruby will help a lot. To get started at the end, we'll be able to queue something like this:

Queue::Normal. enqueue(some_object, :some_method, { 'some_arg' => 1, 'some_other_arg' => 2 })
Queue::High. enqueue(some_object, :some_really_imporant_method, { 'some_arg' => 1, 'some_other_arg' => 2 })
Queue::Low. enqueue(some_object, :some_method_that_can_take_its_time, { 'some_arg' => 1, 'some_other_arg' => 2 })

 

Also, it has no some_object whether some_object is a class, a module, or an instance. We can change the priority of a method by simply switching between Queue::NormalQueue::HighQueue::Low.

Let's see how we can code this. The above interface clearly dictates that each class will need to specify a queuing method. We also know that each class needs to specify a queue name and an execute method, so this is a good place to start:

# app/models/queue/high.rb
class Queue::High
@queue = :high
def self. enqueue(object, method, *args)
# ... to be continued
end
def self. perform
# ... to be continued
end
end
# app/models/queue/normal.rb
class Queue::Normal
@queue = :normal
def self. enqueue(object, method, *args)
# ... to be continued
end
def self. perform
# ... to be continued
end
end
# app/models/queue/low.rb
class Queue::Low
@queue = :low
def self. enqueue(object, method, *args)
# ... to be continued
end
def self. perform
# ... to be continued
end
end

We can already see that these classes have the same interface, so let's convert it to a superclass:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
# ... to be continued
end
def perform
# ... to be continued
end
end
end
# app/models/queue/high.rb
class Queue::High < Queue::Base
@queue = :high
end
# app/models/queue/normal.rb
class Queue::Normal < Queue::Base
@queue = :normal
end
# app/models/queue/low.rb
class Queue::Low < Queue::Base
@queue = :low
end

 

The only enqueue of the enqueue method is to provide data for the execute method that will be called by Resque. The execute method must find the object by id, call the method in the queue, and pass it the arguments in the queue. Since everything passed to Resque must be serializable in JSON, we need to pass the object class name, method, and object identifier as a special "meta" argument:

def enqueue(object, method, *args)
meta = { ‘class’ => object. class. name, 'method' => method, 'id' => object. id }
Resque. enqueue(self, meta, args)
end

 

 

 

Then, in the execute method, we can use the constantize Rails extension to get the class by its name, find the object, and send the method along with its arguments:

def perform(meta = { }, *args)
if model = meta[‘class’]. constantize. find_by_id(meta[‘id’])
model. send(meta[‘method’], *args)
end
end

 

And with that, we're ready to queue any instance method. Here's an example of what the big picture might look like:

# app/models/my_model.rb
class MyModel < ActiveRecord::Base
def async_background_method(arg1, arg2)
Queue::Normal. enqueue(self, :background_method, arg1, arg2)
end
# this happens in the background,
def background_method(arg1, arg2)
# do something that takes a long time...
end
end

 

That's all there is to it. The only caveat is that all arguments passed to background_method will be serialized to JSON and then deserialized back into Ruby. This usually doesn't cause problems, but one big difference is that all hashes with character keys will have string keys in the background_method.

Queuing Class Methods, or

Only one last step remains. We also want to queue the methods of a class or class. That is, we don't always need to find an instance by ID to perform certain background tasks. For example, we may send an email to each registered user. The code will look something like this:

# app/models/user.rb
class User
class << self
def async_spam_all(email_text)
Queue::Normal. enqueue(self, :spam_all, email_text)
end
def spam_all(email_text)
find_each { | user| UserMailer. spam(user, email_text). deliver }
end
end
end

 

This will require some minor modifications to the enqueue and enqueue methods, as we need to be able to distinguish between a class or module in a queue and an object in a queue.

To do this, we need to look at whether the object class :find_by_id or not. This works because if an object is a class or module, its class is Class , which does not meet the :find_by_id . If the object is not an instance of the model, we do not add the 'id' key to the meta information.

Therefore, the execute method should only check for the presence of this key to determine whether to call the method directly on the object or to first find an instance by id:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
meta = { ‘method’ => method }
if is_model? (object)
Resque. enqueue(self, meta. merge(‘class’ => object. class. name, ‘id’ => object. id), *args)
else
Resque. enqueue(self, meta. merge(‘class’ => object. name), *args)
end
end
def perform(meta = { }, *args)
if meta. has_key? (‘id’)
if model = meta[‘class’]. constantize. find_by_id(meta[‘id’])
model. send(meta[‘method’], *args)
end
else
meta[‘class’]. constantize. send(meta[‘method’], *args)
end
end
def is_model? (object)
object. class. respond_to? (:find_by_id)
end
end
end

 

Note: This article assumes the use of ActiveRecord ORM. Depending on your application, you may need to change the definition of is_model? to more precisely indicate what constitutes a model instance.

strength

We have a working queue interface, but still enough room for programmer errors. The enqueue method should ideally throw exceptions when a programmer tries to queue a nonexistent method or tries to pass too many or too few arguments to that method.

By adding a few quick checks, you can significantly reduce the number of queue-side failures. Let's add a method ensure_queueable! to Queue::Base which throws an exception if the method does not exist and the appropriate number of arguments is passed. With these changes, the entire Queue::Base class looks like this:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
meta = { ‘method’ => method }
ensure_queueable! (object, method, *args)
if is_model? (object)
Resque. enqueue(self, meta. merge(‘class’ => object. class. name, ‘id’ => object. id), *args)
else
Resque. enqueue(self, meta. merge(‘class’ => object. name), *args)
end
end
def perform(meta = { }, *args)
if meta. has_key? (‘id’)
if model = meta[‘class’]. constantize. find_by_id(meta[‘id’])
model. send(meta[‘method’], *args)
end
else
meta[‘class’]. constantize. send(meta[‘method’], *args)
end
end
def is_model? (object)
object. class. respond_to? (:find_by_id)
end
private
def ensure_queueable! (object, method, *args)
ensure_responds_to! (object, method)
ensure_arity! (object, method, args. length)
end
def ensure_responds_to! (object, method)
unless object. respond_to? (method)
raise “object must respond to #{method}
end
end
def ensure_arity! (object, method, arity)
required = object. method(method). arity
if required < 0 && arity < –required
raise #{method}: #{arity} of #{–required} arguments given”
elsif required >= 0 && required != arity
raise #{method}: #{arity} of #{required} arguments given”
end
end
end
end

 

Note: Checking the arity of a method is somewhat complicated because methods that accept a variable number of arguments return a negative number. See the Ruby Method documentation for more information.

That's it.

Thanks to this, we have a simple and intuitive Resque interface that allows us to queue any method of an instance or class with minimal effort. We can also add new priorities to a few lines of code by simply defining a new queue::Base subclass.

Also, we've provided a single queue entry point that uses Resque as its implementation, but if we decide to swap Resque for another solution in the future, we'll only need to make changes to Queue::Base.