Using Messages In Elixir To Avoid Polling
In the last post we talked about the Transactional Outbox pattern which can help keep DB and queue writes consistent by using the database as a staging area for your queue.
That post left it up to you to figure out how you'd trigger the code that gets messages out of your outbox table and into your queue. In this post we'll look at a specific implementation based on Elixir messages.
Generalizing from the last post, we essentially have some work that needs to be done asynchronously. In our specific case, the work is queued in a database, but the solutions we'll look at here isn't tied to that. The simplest solution is to poll:
def run() do
:timer.sleep(1000)
rows = DB.query("delete from outbox ... returning *")
process_rows(rows)
run()
end
The problem with polling is that you're wasting resources whenever there's no work to do.
An alternative approach is to spawn a thread/goroutine/process as the work arrives. Or, in the case of Go and Elixir we could use channels or messages respectively to trigger the action from a dedicatd goroutine/process. For example we can modify our transaction code from the last post to include the signal:
DB.transaction(fn conn ->
DB.query!(conn, "insert into users ....")
DB.query!(conn, "
insert into outbox (route, payload)
values ($1, $2)
", ["user.create", payload])
Outbox.process() # <--- added
end)
# Our new process
defmodule Outbox do
use GenServer
def process() do
GenServer.cast(__MODULE__, :process)
end
def handle_cast(:process, state) do
run()
{:noreply, state}
end
defp run() do
rows = DB.query("delete from outbox ... returning *")
process_rows(rows)
end
end
One problem here is that if the processing fails (or the entire app crashes or is redeployed without a clean shutdown) we have to wait until the next time Outbox.process/1
is called to do our work.
The Elixir "way" to solve this is to "let it crash". If we make it so the only way for our handle_cast/2
to fail is to crash, then our Outbox will be restarted by its supervisor. During startup, Outbox can check for any pending work:
defmodule Outbox do
use GenServer
def init() do
{:ok, nil, {:continue, :recover}}
end
def handle_continue(:recover, state) do
run()
{:noreply, state}
end
# same as before
end
If you're willing to live with a bit of latency, we can improve this by adding some batching. The simplest solution would be to make a small change the original from:
def process() do
GenServer.cast(__MODULE__, :process)
end
def handle_cast(:process, state) do
run()
{:noreply, state}
end
to:
def process() do
pid = Process.whereis(__MODULE__)
Process.send_after(pid, :process, 2_000)
end
def handle_info(:process, state) do
run()
{:noreply, state}
end
We're delaying the signal by 2 seconds. However, if process/0
is called 100 times during those 2 seconds, we still call run/0
100 times. This isn't optimal. Consider this tweak:
def process() do
GenServer.cast(__MODULE__, :process)
end
# our state is nil, there's no pending signal
def handle_cast(:process, nil) do
ref = Process.send_after(self(), :process, 2_000)
{:noreply, ref}
end
# our state is not nil, there's already a pending signal, don't do anything
def handle_cast(:process, ref) do
{:noreply, ref}
end
# after we're done processing, we set the state back to nil to allow
# new signals
def handle_info(:process, _ref) do
run()
{:noreply, nil}
end
We use our process' state to track whether we have a pending signal or not. Because a process synchronously executes 1 message at a time, we're guaranteed that our handle_info/2
and handle_cast/2
will never execute concurrently. This means that we'll never miss a signal and won't needlessly execute run/0
(depending on your pulling your batch work, run/0
might be called needlessly 1 extra time, but that's a lot better than N extra times.)
The explicit interface for handling messages (handle_XXX) and the process state, combined with powerful pattern matching, makes the flow of our code based on the state equally explicit. The code is easier to reason about and easier to test.