Learning Elixir's Supervisors
One of the things I struggled with when building our first elixir application was getting our Rabbit code to be properly supervised and resilient to failure. The pattern we generally follow is 1 connection and N workers, where each worker has a channel derived from the connection. The workers, which can be publishers (we normally only have 1) or consumers (we normally ave many) are dependent on the connection but independent of each other.
My first iteration worked, but wasn't the most elegant. I've since had a chance to revisit the code and thought it would make for good writing (and perhaps good reading).
Given the dependencies described above, the first thing we can do is build a supervisor tree:
defmodule App.Queue.Supervisor do
use Supervisor
def start_link() do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
children = [
worker(App.Queue.Connection, []),
supervisor(App.Queue.WorkerSupervisor, [])
]
supervise(children, strategy: :rest_for_one)
end
end
defmodule App.Queue.WorkerSupervisor do
use Supervisor
def start_link() do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
children = [
worker(App.Queue.Publisher, []),
worker(App.Queue.Consumers.Spice, []),
worker(App.Queue.Consumers.Saiyans, []),
]
supervise(children, strategy: :one_for_one)
end
end
These are split up so that the correct strategy
can be applied to each group. Starting from the bottom, if one worker is forced to restart, there's no reason to also restart the other workers. Thus, we pick the one_for_one
strategy. The rest_for_one
strategy that we use in our first Supervisor means that if one process fails and is restarted, all processes defined after it are also restarted. For this reason, we put the connection first. If the App.Queue.Connection
process is restarted, then the App.Queue.WorkerSupervisor
will also be restarted. But the reverse isn't true.
If we were to put all of the above in a single supervisor and kept the rest_for_one
strategy, it would mean that the Saiyans
consumer would be restarted whenever the Spice
consumer was. This isn't what we want.
The above is a start, but now we need to dig into our actual processes. The first one that we'll look at, and also the most critical, is the App.Queue.Connection
process. We'll be using this AMQP library. Our following examples will expose you to the parts of its API that you need to know with respect to this post.
A naïve implementation of our connection worker could look like:
defmodule App.Queue.Connection do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
# returns {:ok, conn} on success and {:error, err} on failure
AMQP.Connection.open("amqp://....")
end
end
The issue with this code is that if the connection fails, nothing happens until we try to use it. The supervisor is monitoring our process, not the connection. We could change the code to do:
def init(_) do
{:ok, conn} = AMQP.Connection.open("amqp://....")
Process.link(conn.pid)
{:ok, conn}
end
By linking our Connection process to the underlying connection, we ensure that when the underlying connection dies, our connection process will also die. At first glance, this might seem like the right approach. Our process will die, the supervisor will restart it and it'll restart each worker which can use the new connection to get new channels. However, it probably isn't what you want.
By default, if a process terminates 3 times in 5 seconds, the parent process, in our case App.Queue.Supervisor
, terminates. This cascades up, so that the parent of App.Queue.Supervisor
will try to restart that supervisor and itself terminates if we hit the 3 in 5 threshold. This goes all the way up to the application.
It's common for such connection failures, whether they're to a queue, database or webservice, to happen quickly. Failing 3 times in 5 seconds? We're likely to fail 1000 times in a second; taking down our entire app. Sadly, there's no backoff functionality in supervisors. In some cases, maybe you really do want to crash the entire app, but I think that for many apps, retrying indefinitely is a more desirable behaviour.
One quick solution could be to specify very high max_restarts
and very low max_seconds
parameters along with the strategy
. But I'd like to explore something a little more elegant.
Rather than linking to the underlying connection process, we can monitor/1
it. Rather than bubbling the termination we'll receive a message. Consider:
def init(_) do
{:ok, connect()}
end
defp connect() do
case AMQP.Connection.open() do
{:ok, conn} ->
Process.monitor(conn.pid)
conn # this will be our state
{:error, err} ->
Logger.error("failed to connect: #{inspect err}")
:timer.sleep(1000)
connect() # keep trying
end
end
# we get this message because of the call to monitor we did
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
Logger.error("queue connection is down: #{inspect reason}")
{:noreply, connect()}
end
Rather than terminating, we handle the :DOWN
notification and try to reconnect, waiting 1 second between attempts. Essentially, we're taking over the opinionated Supervisor's restart strategy; trying infinitely and applying whatever backoff algorithm we want.
We could apply the same strategy to our workers, having each monitor their channel's process. Alternative, I want to show why, in this case, it's OK to link rather than monitor:
defmodule App.Queue.Publisher do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:ok, channel} = App.Queue.Connection.open_channel()
Process.link(channel.pid)
{:ok, channel}
end
end
We'll look at open_channel
briefly, but as you read it, I want you to try to figure out why the above is safe. If Rabbit goes down, why won't this fail 3 times in 5 seconds?
defmodule App.Queue.Connection do
...
def open_channel() do
GenServer.call(__MODULE__, :open_channel)
end
def handle_call(:open_channel, _from, conn) do
{:reply, AMQP.Channel.open(conn), conn}
end
...
end
Do you see it? It's a little tricky to follow, so let's walk through it. Let's assume the app is running when suddenly our connection to Rabbit dies. This causes our channel to terminate (in Rabbit, a channel dies when its connection dies) thus causing our Publisher to terminate (since it was linked to the channel process). The WorkerSupervisor
detects this and restarts the Publisher. As part of its startup process, in its init/1
function, the process asks the Connection for a channel. This is a blocking call and one of two things will happen. Either a channel will be returned, or the call will timeout (the default 5 seconds, since we didn't specify one when we called GenServer.call
).
Why will it timeout? Because so long as our link to Rabbit is down, the Connection process is going to be in its connect
loop. If the connection isn't established, pending calls will timeout. If the connection is re-established, a channel is returned. With a timeout of 5 seconds, our workers will only fail once per 5 seconds. Thus avoiding the supervisor's limits.
Hopefully this not only provides some practical code for keeping Rabbit connections and channels alive, but also illustrates how supervisors, process linking and process monitoring works and can be leveraged.