There’s nothing new about the idea of an asynchronous task to perform complex or long-running operations against your data. These “background jobs” often process or produce large amounts of data, and ideally, you’d like to be flexible about when and where this code runs. One such recent instance I helped write was a periodic import of a large amount of data from a file, followed by individual processing of each new record created from that file. We devised a way to breakdown the job into (very) many relatively inexpensive operations. We liked the pattern so much that we’re sharing it here.
Version 1
Our initial implementation was to run the whole thing end-to-end. There wasn’t much code, but it wasn’t trivial to test. Also, though our test data ran quickly, production data would be dozens or even hundreds of times larger. Perhaps worse was a subtle bug that would be triggered should this import fail and need to be retried. Read on for the solution, or wait and try to figure it out yourself.
# importer.rb
class Importer
def self.schedule
Delayed::Job.enqueue Importer.new
end
def perform
# Setup import source and enumerate through it...
import_source.each do |import_row|
record = Record.create_from_import(import_row)
# process record...
end
end
end
To handle scheduling, we chose Delayed::Job
. It takes care of queuing tasks to run in the background and keeping track of their current status—whether they’re yet to run, running, complete, or failed. You could just as easily use Resque
or Sidekiq
to the same end.
Version 2
The problem with the Version 1 implementation is that even though the import process wasn’t expensive, each individual processing step was. For this reason, Ryan Cromwell suggested that we defer the processing of new records until later and that we queue up a separate job for each new record created from the file.
# importer.rb
class Importer
def self.schedule
Delayed::Job.enqueue Importer.new
end
def perform
# Setup import source and enumerate through it...
import_source.each do |import_row|
record = Record.create_from_import(import_row)
Processor.schedule record.id
end
end
end
# processor.rb
class Processor
def self.schedule(record_id)
Delayed::Job.enqueue Processor.new record_id
end
def initialize(record_id)
@record_id = record_id
end
def perform
return if complete? # For idempotence
# Process record...
end
def complete?
# check for completeness conditions
end
end
Now that we have each operation as its own job, it can run anytime and anywhere. Should this process grow beyond the ability of the application server to handle, we can spin up a collection of worker processes and run several of these in parallel, moving the constraint from the app server to the database. It also allows us to replace the import with an API in the future without touching the processing step. It also insulates the process itself from any one failure, as successful processing jobs don’t need to be rerun if a member of their batch fails.
Asynchronous Processing Wins
Testing, as it turned out, got really simple too. These were two separate processes all along, and removing the dependency meant less stubbing of methods involved in how they interacted with each other. It also exposed the subtle bug I mentioned before. The import operation itself isn’t likely to generate failures, but the record processing is. Once we started passing the newly minted record to its processing job, we noticed duplicate records associated with the processing step.
To make processing idempotent, we stopped passing the entire record into the processing step. Using only its ID meant we could store that ID and retrieve the object’s properties each time. Now that we always have the latest state of the object, we can check it for completeness before proceeding.
I think this pattern could be adapted to any process that needs to either create or retrieve a large number of records. Making expensive processes like this more asynchronous means you have options as to when and how they are completed.