Broadway Source Reading (Part 4 - Batching)
source link: http://www.thinkingincrowd.me/2021/07/16/broadway-source-reading-batching/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
GenStage pipelines
When options for batcher is configured, the processor becomes :producer_consumer
type. The GenStage pipeline becomes much more complex than the simple one shown in the post about processor.
Notes: The communications between the stages are through process message passing, while other interactions between the DemandDispatcher
, the Acknowledger
and them are through direct method calls. The Dispatcher and Acknowledger are not part of the GenStage pipelines and so here uses dotted line to indicate their interactions to have clearer separation.
In the scenario of multiple batchers, PartitionDispatcher
is used instead of DemandDispatcher
for the processors in build_processors_specs/2
and the :partitions
are set by the keys of the batchers’ config.
The interactions with dispatcher is to register the processes of the BatcherStage
and BatchProcessorStage
into the dispatcher_state
of the processes of ProcessorStage
and BatcherStage
respectively for event dispatching. The source code is the handle_info/2
in the processes of ProcessorStage
and BatcherStage
matching the {:"$gen_producer", {consumer_pid, ref} = from, {:subscribe, cancel, opts}}
message. It invokes the producer_subscribe/3
function, subsequently invokes the function subscribe(opts, from, dispatcher_state)
of the DemandDispatcher
or PartitionDispatcher
.
Message Consuming as a Producer Consumer (w Batcher)
The sequence flow which starts with ProducerStage
changes a little bit as the ProcessorStage
is changed to be :producer_consumer
with batchers. And the take_pc_events/3
function actually calls consumer_dispatch/6
that we covered in the post about processor.
ProducerStageDemandDispatcherProcessorStagenoreply_callback(:handle_demand, [counter, state], stage)handle_demand/2handle_noreply_callback/2dispatch_events/3dispatch/3handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :producer_consumer} = stage)send_pc_events/3consumer_dispatch/6loop[take_pc_even-ts/3 until eventin queue isempty]take_pc_events/3alt[as Producer][as ProducerConsumer]ProducerStageDemandDispatcherProcessorStage
Because the messages will be forwarded to the batchers without acknowledgement after processing. As a result, the successful_messages_to_ack
is []
, and the successful_messages_to_forward
contain all messages to batchers. Below is the sub-sequence flow of dispatch_events/3
:
ProcessorStageDemandDispatcherPartitionDispatcherBatcherStagedispatch(successful_messages_to_forward)split_events/3send(batcher_pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect])loop[dispatch_demand/3 until every batcher's demand is met]dispatch(successful_messages_to_forward)split_events/4dispatch_per_partition/1maybe_send/3send(batcher_pid, {:"$gen_consumer", {self(), ref}, :lists.reverse(events)}, [:noconnect])loop[until every partition is dispatched]alt[w only one Batcher][w multiple Batcher]handle_info({:"$gen_consumer", {processor_pid, ref}, events}, %{type: :producer_consumer} = stage)take_pc_events/3ProcessorStageDemandDispatcherPartitionDispatcherBatcherStage
Again, the take_pc_events/3
function of the BatcherStage
process is triggered. As you can see, the GenStage pipelines work in such a way repeatedly on each stage. Hence, we know that the Dispatcher will dispatch the events to BatchProcessorStage
, and that process receiving the message will invoke consumer_dispatch/6
and handle_events/3
.
Below is the sub-sequence flow of handle_events/3
in BatchProcessorStage
:
BatchProcessorStageAcknowledgerMyBWhandle_batch/4handle_batch/4maybe_handle_failed_messages/3handle_failed/2ack_messages(successful_messages_to_ack, failed_messages)BatchProcessorStageAcknowledgerMyBW
Now, we basically know how Broadway works with different pipeline options underneath. However, it’s still far from really understanding it from bottom-up. When I was working on this post, I came across one interesting article about Broadway’s concurrency and how misconfiguration might have great impact on it. The author did a lot of debugging to figure it out. I will see if I can see it from the source code level with the help from this article.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK