5

Broadway Source Reading (Part 4 - Batching)

 3 years ago
source link: http://www.thinkingincrowd.me/2021/07/16/broadway-source-reading-batching/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

GenStage pipelines

When options for batcher is configured, the processor becomes :producer_consumer type. The GenStage pipeline becomes much more complex than the simple one shown in the post about processor.

Producer
Processor_1
Processor_2
Batcher:S3
DemandDispatcher/PartitionDispatcher
Batcher:SQS
BatchProcessor_S3_1
BatchProcessor_S3_2
BatchProcessor_SQS_1
Acknowledger

Notes: The communications between the stages are through process message passing, while other interactions between the DemandDispatcher, the Acknowledger and them are through direct method calls. The Dispatcher and Acknowledger are not part of the GenStage pipelines and so here uses dotted line to indicate their interactions to have clearer separation.

In the scenario of multiple batchers, PartitionDispatcher is used instead of DemandDispatcher for the processors in build_processors_specs/2 and the :partitions are set by the keys of the batchers’ config.

The interactions with dispatcher is to register the processes of the BatcherStage and BatchProcessorStage into the dispatcher_state of the processes of ProcessorStage and BatcherStage respectively for event dispatching. The source code is the handle_info/2 in the processes of ProcessorStage and BatcherStage matching the {:"$gen_producer", {consumer_pid, ref} = from, {:subscribe, cancel, opts}} message. It invokes the producer_subscribe/3 function, subsequently invokes the function subscribe(opts, from, dispatcher_state) of the DemandDispatcher or PartitionDispatcher.

Message Consuming as a Producer Consumer (w Batcher)

The sequence flow which starts with ProducerStage changes a little bit as the ProcessorStage is changed to be :producer_consumer with batchers. And the take_pc_events/3 function actually calls consumer_dispatch/6 that we covered in the post about processor.

ProducerStageDemandDispatcherProcessorStagenoreply_callback(:handle_demand, [counter, state], stage)handle_demand/2handle_noreply_callback/2dispatch_events/3dispatch/3handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :producer_consumer} = stage)send_pc_events/3consumer_dispatch/6loop[take_pc_even-ts/3 until eventin queue isempty]take_pc_events/3alt[as Producer][as ProducerConsumer]ProducerStageDemandDispatcherProcessorStage

Because the messages will be forwarded to the batchers without acknowledgement after processing. As a result, the successful_messages_to_ack is [], and the successful_messages_to_forward contain all messages to batchers. Below is the sub-sequence flow of dispatch_events/3:

ProcessorStageDemandDispatcherPartitionDispatcherBatcherStagedispatch(successful_messages_to_forward)split_events/3send(batcher_pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect])loop[dispatch_demand/3 until every batcher's demand is met]dispatch(successful_messages_to_forward)split_events/4dispatch_per_partition/1maybe_send/3send(batcher_pid, {:"$gen_consumer", {self(), ref}, :lists.reverse(events)}, [:noconnect])loop[until every partition is dispatched]alt[w only one Batcher][w multiple Batcher]handle_info({:"$gen_consumer", {processor_pid, ref}, events}, %{type: :producer_consumer} = stage)take_pc_events/3ProcessorStageDemandDispatcherPartitionDispatcherBatcherStage

Again, the take_pc_events/3 function of the BatcherStage process is triggered. As you can see, the GenStage pipelines work in such a way repeatedly on each stage. Hence, we know that the Dispatcher will dispatch the events to BatchProcessorStage, and that process receiving the message will invoke consumer_dispatch/6 and handle_events/3.

Below is the sub-sequence flow of handle_events/3 in BatchProcessorStage:

BatchProcessorStageAcknowledgerMyBWhandle_batch/4handle_batch/4maybe_handle_failed_messages/3handle_failed/2ack_messages(successful_messages_to_ack, failed_messages)BatchProcessorStageAcknowledgerMyBW

Now, we basically know how Broadway works with different pipeline options underneath. However, it’s still far from really understanding it from bottom-up. When I was working on this post, I came across one interesting article about Broadway’s concurrency and how misconfiguration might have great impact on it. The author did a lot of debugging to figure it out. I will see if I can see it from the source code level with the help from this article.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK