7

Building a Robust Elixir Dispatcher

 3 years ago
source link: https://railsadventures.wordpress.com/2018/02/01/building-a-robust-elixir-dispatcher/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Building a Robust Elixir Dispatcher

elixir-flame__riuc2g__.png?w=309

I’m pretty new to Elixir. This language fascinates me as it is based on a paradigm I never experienced before.
The ideas of functional programming, processes, message passing and fault tolerance are bundled together into a language and eco system which is fun and productive to work with.

Recently during a feature I was working on, I had to code an Elixir module that receives and dispatches tasks. I wanted to share with you my journey to the final module I ended up with.

The Spec

The requirements from the module were the following:

  1. The module receives and runs tasks.
  2. Each task is composed of a task identifier and a function to run.
  3. While a task is being executed its result should be :pending.
  4. A task that takes more than 10 seconds in considered timed out. Its result should return :timeout.
  5. A task that throws an unhandled exception should return {:error, the_error} as its result.
  6. Each task result should be saved until requested. This includes tasks that timed out, threw an exception or succeeded.

First Iteration — A Naive Dispatcher

My first thought was to create a GenServer and use the cast method to dispatch the task.
A task would be a simple zero arguments function passed to the GenServer.

defmodule Dispatcher do

use GenServer

# Client

def start() do

GenServer.start(__MODULE__, [])

end

def result(s, task_id), do: GenServer.call(s, {:result, task_id})

def run(s, task_id, f), do: GenServer.cast(s, {:run, {task_id, f}})

# Server

def init(_args), do: {:ok, %{}}

def handle_call({:result, task_id}, _from, results) do

case Map.fetch(results, task_id) do

{:ok, :pending} -> {:reply, :pending, results}

{:ok, result} -> {:reply, result, Map.delete(results, task_id)}

:error -> {:reply, nil, results}

end

end

def handle_cast({:run, {task_id, f}}, results) do

new_state = Map.put(results, task_id, :pending)

res = f.()

{:noreply, Map.put(new_state, task_id, res)}

end

end

This is the most simple solution I could come up with at first. The GenServerhas a run method which receives a task id and a function to perform. To retrieve a task result one would use the result method with the task id. The state of the GenServer is a task_id => result map.
Let’s run $ iex dispatcher.exs and start playing with the Dispatcher module.

iex> {:ok, d} = Dispatcher.start()

{:ok, #PID<0.96.0>}

iex> Dispatcher.run(d, "a", fn -> 1 + 1 end)

:ok

iex> Dispatcher.run(d, "b", fn -> 2 + 2 end)

:ok

iex> Dispatcher.result(d, "a")

2

iex> Dispatcher.result(d, "b")

4

iex> Dispatcher.result(d, "b")

nil

We can see that our dispatcher handles well simple calculations — I performed two tasks: task a which resolves to the value 2 and task b which resolves to 4. Once a value is retrieved the task is removed from the Dispatcher.
Let’s try a dipatching long running task:

iex> Dispatcher.run(d, “c”, fn -> :timer.sleep(8000); “finished” end) 

:ok

iex> Dispatcher.result d, “c”

** (exit) exited in: GenServer.call(#PID<0.96.0>, {:result, “c”}, 5000)

 ** (EXIT) time out

 (elixir) lib/gen_server.ex:831: GenServer.call/3

I dispatched a task that waits for 8 seconds and returns “finished”. When I tried to retrieve the task result I got a timeout exception instead of getting back :pending from the Dispatcher.
That’s expected since the Dispatcher currently performs the task in a synchronous way. The task blocks the Dispatcher so it becomes unresponsive. We can use processes to make the Dispatcher perform the task asynchronously. The simplest way would be to use spawn/1 and run the given task in the spawned process. This way the Dispatcher stays responsive regardless of the time it takes a task to complete.

Second Iteraion — Async

defmodule Dispatcher do

.

.

.

# Everything until handle_call stays the same

def handle_call({:result, task_id}, _from, results) do

case Map.fetch(results, task_id) do

{:ok, :pending} -> {:reply, :pending, results}

{:ok, result} -> {:reply, result, Map.delete(results, task_id)}

:error -> {:reply, nil, results}

end

end

def handle_cast({:run, {task_id, f}}, results) do

me = self()

spawn(fn ->

res = f.()

send(me, {:ok, task_id, res})

end)

{:noreply, Map.put(results, task_id, :pending)}

end

def handle_info({:ok, task_id, res}, results) do

{:noreply, Map.put(results, task_id, res)}

end

end

The new handle_cast callback spawns a process which completes the task and sends the result back to the Dispatcher along with its task_id.
In order to process the result message from the spawned process we had to implement the handle_info (line 26) callback.

Let’s try the async version of the Dispatcher:

iex> Dispatcher.run(d, "a", fn -> :timer.sleep(8000); "done" end)

:ok

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

"done"

iex> Dispatcher.result(d, "a")

nil

Now the Dispatcher replied immediately with :pending since the task is being performed asynchronously in a spawned process which does not block the Dispatcher process itself.
When the task is done, the value is stored and we can retrieve it.
The Dispatcher still doesn’t handle tasks timeouts though — When a task takes longer than 10 seconds we should set its result to :timeout.
My strategy was to start the task and send the Dispatcher a delayed message indicating that the task run period was over. If that message arrived before the task had sent its result the Dispatcher marked it as timed out.

Third Iteration — Handling Timeouts

defmodule Dispatcher do

use GenServer

@timeout 10000

# .

# .

# .

# Same stuff as before

def handle_cast({:run, {task_id, f}}, results) do

me = self()

spawn(fn ->

res = f.()

send(me, {:ok, task_id, res})

end)

Process.send_after(me, {:timeout, task_id}, @timeout)

{:noreply, Map.put(results, task_id, :pending)}

end

def handle_info({:ok, task_id, res}, results) do

case Map.fetch(results, task_id) do

{:ok, :pending} -> {:noreply, Map.put(results, task_id, res)}

_other -> {:noreply, results}

end

end

def handle_info({:timeout, task_id}, results) do

case Map.fetch(results, task_id) do

{:ok, :pending} -> {:noreply, Map.put(results, task_id, :timeout)}

_other -> {:noreply, results}

end

end

end

The Dispatcher calls Process.send_after/4 (line 19) to receive a delayed message indicating the task has timed out. I added another handle_info callback implementation (line 31) to handle those timeout messages. It will only set the result of task_id as :timeout in case it is still marked as :pending. This way, only if the timeout message arrives before the spawned process replies with the task’s result, the task is marked with :timeout.
Let’s test the new Dispatcher:

iex> Dispatcher.run(d, "a", fn -> :timer.sleep(12000); "done" end)

:ok

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

:timeout

iex> Dispatcher.result(d, "a")

nil

Looks like we’re good. The task ran for 12 seconds which is more than our 10 seconds timeout threshold and was marked with a :timeout.
The only requirement I haven’t fulfilled yet is dealing with crashed tasks. Let’s see how the Dispatcher currently handles crashed tasks:

iex> Dispatcher.run(d, "a", fn -> 1/0 end)

:ok

iex>

17:38:19.429 [error] Process #PID<0.152.0> raised an exception

** (ArithmeticError) bad argument in arithmetic expression

:erlang./(1, 0)

lib/dispatcher.ex:27: anonymous fn/3 in Dispatcher.handle_cast/2

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

:pending

iex> Dispatcher.result(d, "a")

:timeout

The process we spawned for the task has crashed because of an exception. It did not report back to the Dispatcher so after 10 seconds it was marked with :timeout.

Final Iteration — Handling Exceptions

We would like to know when a process we spawned has ended its life and what was the cause. Did it end normally or with an exception?
Luckily, Elixir provides an easy way to get notified about processes we care about: Monitors. We can ask Elixir to notify us when a process ends by calling Process.monitor/1 on the spawned process PID. When the spawned process ends, the process which issued the Process.monitor/1 receives a message of the following format: {:DOWN, ref, :process, pid, reason}
ref is the value returned from the Process.monitor/1 call.
reason is :normal if the process ended without an uncaught exception, otherwise it holds the exception itself.
The strategy here is to spawn a process, monitor it and handle the incoming process :DOWN messages to identify processes that threw an exception.
The Dispatcher has to maintain a mapping between the ref returned by the monitor command and the task_id, so when it receives a process :DOWNmessage it knows what task_id it refers to.
This is the final and complete version of the Dispatcher:

defmodule Dispatcher do

use GenServer

@timeout 10000

def start() do

GenServer.start(__MODULE__, [])

end

def result(s, task_id), do: GenServer.call(s, {:result, task_id})

def run(s, task_id, f), do: GenServer.cast(s, {:run, {task_id, f}})

def init(_args), do: {:ok, %{results: %{}, refs: %{}}}

def handle_call({:result, task_id}, _from, state) do

case Map.fetch(state.results, task_id) do

{:ok, :pending} -> {:reply, :pending, state}

{:ok, result} -> {:reply, result, Map.update!(state, :results, fn m -> Map.delete(m, task_id) end)}

:error -> {:reply, nil, state}

end

end

def handle_cast({:run, {task_id, f}}, state) do

me = self()

pid = spawn(fn ->

res = f.()

send(me, {:ok, task_id, res})

end)

ref = Process.monitor(pid)

Process.send_after(me, {:timeout, task_id}, @timeout)

new_state = state |> put_in([:results, task_id], :pending) |> put_in([:refs, ref], task_id)

{:noreply, new_state}

end

def handle_info({:ok, task_id, res}, state) do

case Map.fetch(state.results, task_id) do

{:ok, :pending} -> {:noreply, put_in(state, [:results, task_id], res)}

_other -> {:noreply, state}

end

end

def handle_info({:timeout, task_id}, state) do

case Map.fetch(state.results, task_id) do

{:ok, :pending} -> {:noreply, put_in(state, [:results, task_id], :timeout)}

_other -> {:noreply, state}

end

end

def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do

{:noreply, Map.update!(state, :refs, fn m -> Map.delete(m, ref) end)}

end

def handle_info({:DOWN, ref, :process, _pid, error}, state) do

task_id = state.refs[ref]

new_state = state

|> put_in([:results, task_id], {:error, error})

|> Map.update!(:refs, fn m -> Map.delete(m, ref) end)

{:noreply, new_state}

end

end

The 3 interesting points in the above code are:

  • Lines 31 and 33 handles the monitor creation and updating the ref => task_id mapping.
  • The handle_info callback on line 52 handles :DOWN messages of tasks that ended normally.
  • The handle_info callback on line 56 handles those processes that threw an exception by setting the corresponding task result with {:error, the_exception}

Let’s test the Dispatcher’s exception handling:

iex> Dispatcher.run d, "a", fn -> 1/0 end

:ok

iex>

17:41:39.336 [error] Process #PID<0.97.0> raised an exception

** (ArithmeticError) bad argument in arithmetic expression

:erlang./(1, 0)

lib/dispatcher.ex:27: anonymous fn/3 in Dispatcher.handle_cast/2

iex> Dispatcher.result d, "a"

{:error,

{:badarith,

[

{:erlang, :/, [1, 0], []},

{Dispatcher, :"-handle_cast/2-fun-0-", 3,

[file: 'lib/dispatcher.ex', line: 27]}

]}}

iex> Dispatcher.result d, "a"

nil

The exception was stored under the result of task ”a” as we wanted.

Summary

I learnt a lot about GenServers, Processes, Monitors and how processes in Elixir communicate between themselves. Building the Dispatcher was both fun and insightful.
There is no doubt I’m going to explore Elixir some more in the future!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK