Broadway Source Reading (Part 3 - Processor)
source link: http://www.thinkingincrowd.me/2021/06/19/broadway-source-reading-processor/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Normally, we only need to define the pipeline options for the Messaging Middleware Producer, Processor, and implement the handle_message/3
callback to use Broadway.
The complexity of how the handle_message/3
is called, how the messages got acknowledged and how the failed messages handled all hide behind. That is the part I would like to know by reading the source code. As we have gone through the Producer part, I bet $10 dollars the questions above should be able to answer after reading the Processor part.
Startup Call Sequence
With the experience on the startup call sequence of Producer, we can find that the one for Processor is quite similar:
SupervisorProcessorStageSubscriberGenStagestart_link(args, stage_options)start_link(ProcessorStage, ..., stage_opts)start_link(Subscriber, {ProcessorStage, ...}, stage_opts)init({Subsriber, args})init({ProcessorStage, ...})init/1{:consumer, state, []}{:consumer, state, []}init_consumer(ProcessorStage, opts, state){:producer_consumer, state, dispatcher}{:producer_consumer, state, dispatcher}init_producer_consumer(ProcessorStage, opts, state)alt[w/o Batcher][w/ Batcher]consumer_init_subscribe(subscribe_to, stage)consumer_subscribe/7ask/3SupervisorProcessorStageSubscriberGenStage
At the end of the call, the processor ask/3
for messages immediately after subscribing to the producers. The process of ProcessorStage sends a :"$gen_producer"
message to the producer process it subscribes to. In the producer’s handle_info/2
, it calls dispatcher_callback/3
and redirect the call to dispatcher module’s ask/3
method that passes the result to handle_dispatcher_result/2
.
Message Consuming as a Consumer (w/o Batcher)
There are two branches in the case
statement in handle_dispatcher_result/2
. As in the simplest configuration, the current process receivng the message is the :producer
process, so the second path should be taken as a Producer.
The start of this flow in the Producer matches what we have explored in the Producer part.
ProducerStageDemandDispatcherProcessorStageAcknowledgerMyBroadwaynoreply_callback(:handle_demand, [counter, state], stage)handle_demand/2handle_noreply_callback/2dispatch_events/3dispatch/3handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :consumer} = stage)consumer_dispatch/6handle_events/3maybe_prepare_messages/2prepare_messages/2handle_messages/4handle_message/3maybe_handle_failed_messages/3handle_failed/2ack_messages(successful_messages_to_ack, failed_messages)take_pc_events(queue, counter, stage)alt[as Producer][as ProducerConsumer]ProducerStageDemandDispatcherProcessorStageAcknowledgerMyBroadway
Here is the tricky thing when the DemandDispatcher.dispatch
is called. It sends a :"$gen_consumer"
message from the Producer process to its subscriber, the consumer, the process of the ProcessorStage
. The process of the ProcessorStage
is a GenStage
. In its consumer_dispatch/6
method, it delegates the call to mod.handle_events/3
which is the ProcessorStage
module.
I always get confused as a novice Elixir programmer on what methods are called as the process of the module, what are called as simple module function.
Failed Messages Handling and Acknowledging
If we look into the ProcessorStage.handle_events/3
, it’s clear that each message will be handled by our implemented callback prepare_messages/2
and handle_message/3
of our MyBroadway
module. Each message will be separated into the successful_messages
and failed_messages
categories.
The failed_messages
will first be passed to handle_failed/2
of our MyBroadway
module so that we can do whatever necessary, such as saving them in DB or forwarding to another exception queue before the Acknowledger
acknowledge them.
The messages are grouped by each message’s acknowledger to actually ack
them. The message acknowledger is actually from the producer that fulfills the scenario mentioned in documentation:
where messages are coming from different producers. Broadway will use this information to correctly identify the acknowledger and pass it among with the messages so you can properly communicate with the source of the data for acknowledgement.
What I do not quite understand is that the messages are not passed through the Map (replacing the true
value) but use the Process dict. I checked the git history and found that it’s the change made by Jose from a branch named jv-speed-up-ack. Is it faster? Why?
defp group_by_acknowledger(ackers, messages, key) do
Enum.reduce(messages, ackers, fn %{acknowledger: {acknowledger, ack_ref, _}} = msg, acc ->
ack_info = {acknowledger, ack_ref}
pdict_key = {ack_info, key}
Process.put(pdict_key, [msg | Process.get(pdict_key, [])])
Map.put(acc, ack_info, true)
end)
end
defp call_ack({{acknowledger, ack_ref} = ack_info, true}) do
successful = Process.delete({ack_info, :successful}) || []
failed = Process.delete({ack_info, :failed}) || []
acknowledger.ack(ack_ref, Enum.reverse(successful), Enum.reverse(failed))
end
Update: I posted the question in ElixirForum and got Jose’s reply. Take a look there. :D
Recommend
-
36
One of my long running side projects is a Gtk backend called “Broadway”. Instead of rendering to the screen this backend creates a HTTP server that you can connect to, and then exposes the UI remotely in the browser....
-
12
In today’s post, we will be covering the Elixir library named Broadway . This library is maintained by the kind folks at Plataformatec and allows us to create highl...
-
7
Elixir Alchemy How to Use Broadway in Your Elixir Application Alex Koutmos on Dec 12, 2019 “I absolutely love AppSignal.” Disc...
-
5
TikTok’s one-night Ratatouille musical will star some of Broadway’s biggest names Adam Lambert, Wayne Brady, and Tituss Burgess will all play key roles ...
-
6
Broadway Source Reading (Part 1I have learned Elixir for a while and always want to see if I can start reading any opensource project to learn from. Two reasons to pick Broadway...
-
6
JS视频解码JSMpeg和Broadway开箱测评 这篇文章发布于 2021年04月25日,星期日,11:58,归类于 JS实例。 阅读 580 次, 今...
-
2
After having an overview on the Broadway’s architecture, I want to know how the basic Broadway pipeline works with only Producer
-
4
Central Park season 2 will bore you with the worst of Broadway [Apple TV+ review]Can things really get worse in season 2? Of course they can.Photo: Apple TV+Central Park, the Apple TV+ animated musical abou...
-
5
GenStage pipelinesWhen options for batcher is configured, the processor becomes :producer_consumer type. The GenStage pipeline becomes much more complex than the simple one shown in
-
5
Broadway Source Reading (Part 5At the beginning of this interesting article about Broadway’s concur...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK