Building a Robust Elixir Dispatcher
source link: https://railsadventures.wordpress.com/2018/02/01/building-a-robust-elixir-dispatcher/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Building a Robust Elixir Dispatcher
I’m pretty new to Elixir. This language fascinates me as it is based on a paradigm I never experienced before.
The ideas of functional programming, processes, message passing and fault tolerance are bundled together into a language and eco system which is fun and productive to work with.
Recently during a feature I was working on, I had to code an Elixir module that receives and dispatches tasks. I wanted to share with you my journey to the final module I ended up with.
The Spec
The requirements from the module were the following:
- The module receives and runs tasks.
- Each task is composed of a task identifier and a function to run.
- While a task is being executed its result should be
:pending
. - A task that takes more than 10 seconds in considered timed out. Its result should return
:timeout
. - A task that throws an unhandled exception should return
{:error, the_error}
as its result. - Each task result should be saved until requested. This includes tasks that timed out, threw an exception or succeeded.
First Iteration — A Naive Dispatcher
My first thought was to create a GenServer
and use the cast method to dispatch the task.
A task would be a simple zero arguments function passed to the GenServer
.
defmodule Dispatcher do
use GenServer
# Client
def start() do
GenServer.start(__MODULE__, [])
end
def result(s, task_id), do: GenServer.call(s, {:result, task_id})
def run(s, task_id, f), do: GenServer.cast(s, {:run, {task_id, f}})
# Server
def init(_args), do: {:ok, %{}}
def handle_call({:result, task_id}, _from, results) do
case Map.fetch(results, task_id) do
{:ok, :pending} -> {:reply, :pending, results}
{:ok, result} -> {:reply, result, Map.delete(results, task_id)}
:error -> {:reply, nil, results}
end
end
def handle_cast({:run, {task_id, f}}, results) do
new_state = Map.put(results, task_id, :pending)
res = f.()
{:noreply, Map.put(new_state, task_id, res)}
end
end
This is the most simple solution I could come up with at first. The GenServer
has a run
method which receives a task id and a function to perform. To retrieve a task result one would use the result
method with the task id. The state of the GenServer
is a task_id => result
map.
Let’s run $ iex dispatcher.exs
and start playing with the Dispatcher module.
iex> {:ok, d} = Dispatcher.start()
{:ok, #PID<0.96.0>}
iex> Dispatcher.run(d, "a", fn -> 1 + 1 end)
:ok
iex> Dispatcher.run(d, "b", fn -> 2 + 2 end)
:ok
iex> Dispatcher.result(d, "a")
2
iex> Dispatcher.result(d, "b")
4
iex> Dispatcher.result(d, "b")
nil
We can see that our dispatcher handles well simple calculations — I performed two tasks: task a
which resolves to the value 2 and task b
which resolves to 4. Once a value is retrieved the task is removed from the Dispatcher.
Let’s try a dipatching long running task:
iex> Dispatcher.run(d, “c”, fn -> :timer.sleep(8000); “finished” end)
:ok
iex> Dispatcher.result d, “c”
** (exit) exited in: GenServer.call(#PID<0.96.0>, {:result, “c”}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:831: GenServer.call/3
I dispatched a task that waits for 8 seconds and returns “finished”. When I tried to retrieve the task result I got a timeout exception instead of getting back :pending
from the Dispatcher.
That’s expected since the Dispatcher currently performs the task in a synchronous way. The task blocks the Dispatcher so it becomes unresponsive. We can use processes to make the Dispatcher perform the task asynchronously. The simplest way would be to use spawn/1
and run the given task in the spawned process. This way the Dispatcher stays responsive regardless of the time it takes a task to complete.
Second Iteraion — Async
defmodule Dispatcher do
.
.
.
# Everything until handle_call stays the same
def handle_call({:result, task_id}, _from, results) do
case Map.fetch(results, task_id) do
{:ok, :pending} -> {:reply, :pending, results}
{:ok, result} -> {:reply, result, Map.delete(results, task_id)}
:error -> {:reply, nil, results}
end
end
def handle_cast({:run, {task_id, f}}, results) do
me = self()
spawn(fn ->
res = f.()
send(me, {:ok, task_id, res})
end)
{:noreply, Map.put(results, task_id, :pending)}
end
def handle_info({:ok, task_id, res}, results) do
{:noreply, Map.put(results, task_id, res)}
end
end
The new handle_cast
callback spawns a process which completes the task and sends the result back to the Dispatcher along with its task_id
.
In order to process the result message from the spawned process we had to implement the handle_info
(line 26) callback.
Let’s try the async version of the Dispatcher:
iex> Dispatcher.run(d, "a", fn -> :timer.sleep(8000); "done" end)
:ok
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
"done"
iex> Dispatcher.result(d, "a")
nil
Now the Dispatcher replied immediately with :pending
since the task is being performed asynchronously in a spawned process which does not block the Dispatcher process itself.
When the task is done, the value is stored and we can retrieve it.
The Dispatcher still doesn’t handle tasks timeouts though — When a task takes longer than 10 seconds we should set its result to :timeout
.
My strategy was to start the task and send the Dispatcher a delayed message indicating that the task run period was over. If that message arrived before the task had sent its result the Dispatcher marked it as timed out.
Third Iteration — Handling Timeouts
defmodule Dispatcher do
use GenServer
@timeout 10000
# .
# .
# .
# Same stuff as before
def handle_cast({:run, {task_id, f}}, results) do
me = self()
spawn(fn ->
res = f.()
send(me, {:ok, task_id, res})
end)
Process.send_after(me, {:timeout, task_id}, @timeout)
{:noreply, Map.put(results, task_id, :pending)}
end
def handle_info({:ok, task_id, res}, results) do
case Map.fetch(results, task_id) do
{:ok, :pending} -> {:noreply, Map.put(results, task_id, res)}
_other -> {:noreply, results}
end
end
def handle_info({:timeout, task_id}, results) do
case Map.fetch(results, task_id) do
{:ok, :pending} -> {:noreply, Map.put(results, task_id, :timeout)}
_other -> {:noreply, results}
end
end
end
The Dispatcher calls Process.send_after/4
(line 19) to receive a delayed message indicating the task has timed out. I added another handle_info
callback implementation (line 31) to handle those timeout messages. It will only set the result of task_id
as :timeout
in case it is still marked as :pending
. This way, only if the timeout message arrives before the spawned process replies with the task’s result, the task is marked with :timeout
.
Let’s test the new Dispatcher:
iex> Dispatcher.run(d, "a", fn -> :timer.sleep(12000); "done" end)
:ok
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
:timeout
iex> Dispatcher.result(d, "a")
nil
Looks like we’re good. The task ran for 12 seconds which is more than our 10 seconds timeout threshold and was marked with a :timeout
.
The only requirement I haven’t fulfilled yet is dealing with crashed tasks. Let’s see how the Dispatcher currently handles crashed tasks:
iex> Dispatcher.run(d, "a", fn -> 1/0 end)
:ok
iex>
17:38:19.429 [error] Process #PID<0.152.0> raised an exception
** (ArithmeticError) bad argument in arithmetic expression
:erlang./(1, 0)
lib/dispatcher.ex:27: anonymous fn/3 in Dispatcher.handle_cast/2
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
:pending
iex> Dispatcher.result(d, "a")
:timeout
The process we spawned for the task has crashed because of an exception. It did not report back to the Dispatcher so after 10 seconds it was marked with :timeout
.
Final Iteration — Handling Exceptions
We would like to know when a process we spawned has ended its life and what was the cause. Did it end normally or with an exception?
Luckily, Elixir provides an easy way to get notified about processes we care about: Monitors. We can ask Elixir to notify us when a process ends by calling Process.monitor/1
on the spawned process PID. When the spawned process ends, the process which issued the Process.monitor/1
receives a message of the following format: {:DOWN, ref, :process, pid, reason}
ref
is the value returned from the Process.monitor/1
call.reason
is :normal
if the process ended without an uncaught exception, otherwise it holds the exception itself.
The strategy here is to spawn a process, monitor it and handle the incoming process :DOWN
messages to identify processes that threw an exception.
The Dispatcher has to maintain a mapping between the ref
returned by the monitor command and the task_id
, so when it receives a process :DOWN
message it knows what task_id
it refers to.
This is the final and complete version of the Dispatcher:
defmodule Dispatcher do
use GenServer
@timeout 10000
def start() do
GenServer.start(__MODULE__, [])
end
def result(s, task_id), do: GenServer.call(s, {:result, task_id})
def run(s, task_id, f), do: GenServer.cast(s, {:run, {task_id, f}})
def init(_args), do: {:ok, %{results: %{}, refs: %{}}}
def handle_call({:result, task_id}, _from, state) do
case Map.fetch(state.results, task_id) do
{:ok, :pending} -> {:reply, :pending, state}
{:ok, result} -> {:reply, result, Map.update!(state, :results, fn m -> Map.delete(m, task_id) end)}
:error -> {:reply, nil, state}
end
end
def handle_cast({:run, {task_id, f}}, state) do
me = self()
pid = spawn(fn ->
res = f.()
send(me, {:ok, task_id, res})
end)
ref = Process.monitor(pid)
Process.send_after(me, {:timeout, task_id}, @timeout)
new_state = state |> put_in([:results, task_id], :pending) |> put_in([:refs, ref], task_id)
{:noreply, new_state}
end
def handle_info({:ok, task_id, res}, state) do
case Map.fetch(state.results, task_id) do
{:ok, :pending} -> {:noreply, put_in(state, [:results, task_id], res)}
_other -> {:noreply, state}
end
end
def handle_info({:timeout, task_id}, state) do
case Map.fetch(state.results, task_id) do
{:ok, :pending} -> {:noreply, put_in(state, [:results, task_id], :timeout)}
_other -> {:noreply, state}
end
end
def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do
{:noreply, Map.update!(state, :refs, fn m -> Map.delete(m, ref) end)}
end
def handle_info({:DOWN, ref, :process, _pid, error}, state) do
task_id = state.refs[ref]
new_state = state
|> put_in([:results, task_id], {:error, error})
|> Map.update!(:refs, fn m -> Map.delete(m, ref) end)
{:noreply, new_state}
end
end
The 3 interesting points in the above code are:
- Lines 31 and 33 handles the monitor creation and updating the
ref => task_id
mapping. - The
handle_info
callback on line 52 handles:DOWN
messages of tasks that ended normally. - The
handle_info
callback on line 56 handles those processes that threw an exception by setting the corresponding task result with{:error, the_exception}
Let’s test the Dispatcher’s exception handling:
iex> Dispatcher.run d, "a", fn -> 1/0 end
:ok
iex>
17:41:39.336 [error] Process #PID<0.97.0> raised an exception
** (ArithmeticError) bad argument in arithmetic expression
:erlang./(1, 0)
lib/dispatcher.ex:27: anonymous fn/3 in Dispatcher.handle_cast/2
iex> Dispatcher.result d, "a"
{:error,
{:badarith,
[
{:erlang, , [1, 0], []},
{Dispatcher, :"-handle_cast/2-fun-0-", 3,
[file: 'lib/dispatcher.ex', line: 27]}
]}}
iex> Dispatcher.result d, "a"
nil
The exception was stored under the result of task ”a”
as we wanted.
Summary
I learnt a lot about GenServers, Processes, Monitors and how processes in Elixir communicate between themselves. Building the Dispatcher was both fun and insightful.
There is no doubt I’m going to explore Elixir some more in the future!
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK