home

Unordered Queues

Feb 04, 2015

I've previously advocated the use queues to track data-changes as the foundation of building complex systems that have low coupling and are thus easier to manage. For such use-cases, message ordering is critical. Imagine a user is created and then prompty deleted; having messages arrive in the wrong order (delete then create) is going to be a problem.

Besides tracking data changes, queues can be used to solve a number of other problems. And, in some cases, ordering doesn't matter. It's pretty common that, for some topics, you'll care about ordering and for others, you won't (or maybe different consumers wil have different requirements). This variation in requirements leaves you with three options:

The first option is the simplest and probably the one you'll end up using. It does limit your choice of tools somewhat, but a lot of the leading options guarantee ordering. The second option lets you optimize for each use-case, but is probably a bigger pain to manage (you'll probably need to deploy / maintain two different solution (I'm not aware of any solution that lets you toggle ordering)). It's worth pointing out here that systems which don't guarantee ordering might have benefits in terms of availability and/or performance.

The last option is to use a system with no ordering guarantees. Of course, for processing messages such as changed data, we need some mechanism to ensure that we don't process a delete before the corresponding create. We do this by pushing more work to the consumer.

Your first instict might me to build a mini-queue within the consumer to keep a window of messages and order them in batches. It's a poor solution though. It's a lot of code to write and still doesn't reakkt guarantee that you'll process messages in order.

A better approach is to maintain a timestamp or incrementing message id with each object and to your consumer able to handle out-of-order messages (this is a step beyond our previous requirement of making consumption idempotent). As you'll see, this requires us to save state. Imagine that we get the following messages (in the order shown):

{"type": "user", "id": 9001, "event": "update", "_id": 23}
{"type": "user", "id": 9001, "event": "delete", "_id": 28}
{"type": "user", "id": 9001, "event": "create", "_id": 4}

Each message behaves on the same object: user 9001. For each message, we'll check our state manager (any persisted key-value store will do, including the file system, a relational database, redis, llmdb, ...) to get the objects last known state:

def handle(message)
  key = "#{message.type}:#{message.id}"

  # have we already processed a newer message for this object? If so, no-op
  if message._id <= storage.get(key)
    return
  end

  # TODO: we need to process this message
  if process(message)
    # update the state
    storage.set(key, message._id)
  end
end

There's more to this than just maintaining state. Your handlers will need to be written defensively. Specifically, both your update and delete handlers need to handle the case that the create event hasn't been received yet. How you do that is implementation-specific, but for a delete, it probably means no changes, and for an update, it means doing an upsert (update if exists, insert otherwise).

As a bonus, with statefulness in place, you might want consumers to track their own object versions. Some consumers might not about certain object updates: you might only care when the user's email changes. We can use our fictional storage class to possibly skip meaningless (to us) updates (by calculate hash of the object and comparing it to our saved hash).

If you're processing a high volume of messages, you'll want to have an in-process cache for your state. But, it definitely needs to be persisted, else a system restart / crash means that you risk processing messages out of order (since all the "last states" will be 0).