Database-backed job processing

Handle jobs in batches using mongo and bee-queue

Wednesday, Dec 6th, 2017

Mixmax is a communications platform that brings professional communication & email into the 21st century.

This blog post is part of the Mixmax 2017 Advent Calendar. The previous post on December 5th was Recruiting Engineers Using Online Challenges.

At Mixmax, we take advantage of job queues for handling our high volume data processing, adding fault tolerance between our microservices, and driving our background automation. However, not all jobs are created equal, and sometimes normal processing doesn’t quite cut it.

Why batch jobs?

In general, job queues function as one way channels to process discrete units of work. As such, they are usually designed for sequentially publishing and processing single jobs where new work is added to the end of the queue and consumers continuously handle those jobs, taking actions based on the data within the job. However, some jobs don’t easily fit into this paradigm. Examples of jobs that can be problematic when processed individually:

  • Jobs that are ingested at high volumes and require long processing times eventually back up the queue as the input is higher than the throughput.
  • Jobs where the downstream provider is limited (i.e. databases or rate limited API calls) simply cannot process individual volume at the required rate.

If either of these is a hard limit on job processing or if job processing is not time sensitive, we can instead batch operations together to make processing each job more efficient.

If we’re going to batch jobs, it’s important to note that the FIFO order of queues is likely not the best way to consolidate operations for efficiency. To get around this, we need to rethink the structure of our queues so we can efficiently batch operations, while not sacrificing throughput for incoming operations. At Mixmax, our solution was to decouple our queue’s input operations from the batched processing using a database as the layer of abstraction between the two.

Structuring the queues

At Mixmax, we use bee-queue (check out the project), a Redis-backed queue written in node, to handle all our jobs. For this queue, we have functions that create jobs to add to the queue (let’s call these publishers) and register functions to process those jobs as they arrive (these are consumers).

To batch our jobs, we use need a set of publishers and consumers to store queued operations, and a set to process the batches of operations (we call these the ‘store queue’ and the ‘flush queue’). We also need to choose a value to partition our batches into disjoint sets. For example, if each job is performed per-user, we split jobs by the user’s unique Id.

Store Operations

The store queue’s role is to transform incoming jobs, add any data necessary for processing, and then store whatever is required for the flush queue to perform the operation. This makes the code for our store queue really simple:

// storeQueue.js
const { Queue } = require('bee-queue');
const storeQueue = new Queue('storeQueue');

// Publish a new job to the store queue.
async function publish(user, data) {
  await storeQueue.createJob({ userId: user.id, task: data }).save();
}

// Process jobs as they arrive.
async function consume(job) {
  const { userId, task } = job.data;

  await db.stored.insert({ userId, task });
  // Publish a flush job.
  await flush.publish({ userId });
}
storeQueue.process(consume);

Notice that the last line of our store queue consumer actually publishes a job to the flush queue. The simplest trigger for the flush queue’s check is when the store queue finishes storing an operation. To trigger the flush queue, we publish a new flush queue job when the store queue completes its processing, and the only information passed to the flush queue publisher is a key that uniquely identifies the batch that should be flushed (in this case, a unique user id).

Flushing Operations

The flush queue takes stored operations, checks to see if the batch is large enough, and completes the batch if it’s over the threshold for batch operations. The code to publish to our flush queue is only marginally more complex than the store queue:

// flushQueue.js
const { Queue } = require('bee-queue');
const flushQueue = new Queue('flushQueue');

// Publish a flush job and set the static job id.
async function publish({ userId }) {
  await flushQueue.createJob({ userId })
    .setId(`flush_${userId}`)
    .save();
}

The first thing to note is that we manually set the id of published jobs in the flush queue. We set this, unique to each batch of operations, using bee-queue’s setId method. It’s important to set the job id here to prevent the same batch from being processed more than once concurrently. By ensuring each job id relates to exactly one batch and that all batches of operations are disjoint (that no stored operation can be in two batches), we effectively lock each batch until the previous batch completes. This could also be done using a distributed lock for each unique batch of operations.

Now, we’ve naturally split our operations into batches, so consuming them is just a matter of retrieving those batches and determining if the batch is large enough for us to perform the operations it contains. When consuming the job, we want to make sure it’s sufficiently large to make the batching worth it. If batches are too small, then we’ve added a layer of complexity without actually improving our throughput or avoiding the limits of a downstream provider.

// flushQueue.js
async function consume(job) {
  const { userId } = job.data;

  const jobs = await db.stored.find({ userId });
  if (jobs.length < MIN_BATCH_SIZE) return;

  // Perform operations in bulk.

  // Remove completed operations.
}
flushQueue.process(consume);

We consume the flush job, use the partitioning value to retrieve the size of the unique set of operations, and check if it’s larger than our minimum threshold. If it’s larger, we can do whatever work is necessary, removing the operations when complete. Otherwise, there’s nothing to do and the current operations can wait until the next one is added to the batch.

With those few bits of code, you can use bee-queue and mongo to process incoming operations in batches, rather than performing the same (expensive, slow or limited) operation on each one.

Even better batches

While it’s likely that the simple approach above can solve many batching problems, there are a few minor tweaks to the batching algorithm that can make it even better:

Error handling

One shortcoming of the above implementation is that it doesn’t handle errors during processing very well. The only options are either retrying the whole batch if one operation fails or not retrying at all. Neither of those are particularly appealing options, but by adding an additional error property to stored operations and detecting failed operations after attempting to complete a batch of operations, you can remove successful operations from the stored batch, but keep failed operations to retry, incrementing their error count. Then, only retrieve operations below a certain error threshold. Then it’s possible to retry a few times, without retrying indefinitely. With the addition of error handling, the flush queue code might look like this:

// storeQueue.js
async function consume(job) {
  const { userId, task } = job.data;

  await db.stored.insert({ userId, task, errors: 0 });
  // Publish a flush job.
  await flush.publish({ userId });
}
storeQueue.process(consume);

// flushQueue.js
async function consume(job) {
  const { userId } = job.data;

  // Only allow 5 errors.
  const jobs = await db.stored.find({
    userId,
    $or: [
      { errors: { $exists: false } },
      { errors: { $lte: 5 } }
    ]
  });
  if (jobs.length < MIN_BATCH_SIZE) return;

  const succeeded = [];
  for (let i = 0; i < jobs.length; i++) {
    cosnt job = job[i];
    try {
      // Perform operation.
    } catch (err) {
      await db.stored.update({ _id: job.id }, { $inc: errors: 1 });
      continue;
    }
    succeeded.push(job);
  }

  // Remove successful operations.
}
flushQueue.process(consume);

Avoid trapped jobs

An additional shortcoming of the simple approach is that it is sub-optimal for low volume batches. While some batches might fill up quickly, others might sit below the MIN_BATCH_SIZE indefinitely. So, we should add a timeout condition to operations as well. That is - if an operation has been in the queue for too long, automatically perform it regardless of the batch size. By adding this timeout, we guarantee that items in our queue are never outside the acceptable range of staleness. You can simply track this using a Date field on operations like this:

// storeQueue.js
async function consume(job) {
  const { userId, task } = job.data;

  await db.stored.insert({ userId, task, errors: 0, createdAt: new Date() });
  // Publish a flush job.
  await flush.publish({ userId });
}
storeQueue.process(consume);

// flushQueue.js
async function consume(job) {
  const { userId } = job.data;

  let jobs = await db.stored.find({
    userId,
    $or: [
      { errors: { $exists: false } },
      { errors: { $lte: 5 } }
    ]
  });
  if (jobs.length < MIN_BATCH_SIZE) {
    // Filter jobs that must be flushed.
    const flushThreshold = new Date(Date.now() - 10 * 60 * 1000) // 10 minutes ago.
    jobs = jobs.filter((job) => job.createdAt < flushThreshold);
  }
  if (jobs.length === 0) return;

  const succeeded = [];
  for (let i = 0; i < jobs.length; i++) {
    cosnt job = job[i];
    try {
      // Perform operation.
    } catch (err) {
      await db.stored.update({ _id: job.id }, { $inc: errors: 1 });
      continue;
    }
    succeeded.push(job);
  }

  // Remove completed operations.
}
flushQueue.process(consume);

Though we implemented this pattern with bee-queue and mongo at Mixmax, it’d apply equally well for other queue and database combinations. So, next time you have queues with lots of input and slow consumers (or limited usage), fear not! You can use existing queues together with your database to help compose single operations into more manageable and efficient batches.

Interested in building high-volume queues for all kinds of work? Join us!