2

Gracefully killing your RabbitMQ consumers

 1 year ago
source link: https://ingelbrechtrobin.medium.com/gracefully-killing-your-rabbitmq-consumers-2f0ab48a1e7f
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Gracefully killing your RabbitMQ consumers

If your application relies on AMQP, you most likely are using multiple consumers and workers to process data and commands. Each of these consumers has to be restarted every time you deploy new code to any of your environments to make sure they are “working” with the latest version of your code. Killing and restarting consumers is easy, but what if a consumer is in the middle of processing a message…?

0*rxJ6JjPuak80mocx
Photo by Anwaar Ali on Unsplash

Note: I’m a PHP developer, so in this article, I will be using PHP examples.

The problem

If your application relies on for example RabbitMQ, you will have to kill and restart consumers at some point in time. If you keep your workers alive with systemd, this could be achieved by running

systemctl --user stop NAME-OF-YOUR-PROCESS
systemctl --user start NAME-OF-YOUR-PROCESS

Easy enough, right? But what if that consumer was in the middle of processing a message? Depending on your RMQ configuration, that message might be lost forever or it might be re-queued to be picked up when the consumer restarts.

Both situations can cause problems:

  • The message got lost and the process never finished resulting in a corrupt state of for example your database
  • The message got re-queued and is processed a second time resulting again in a corrupt state of for example your database because the message got partially processed the first time and got processed completely the second time.

I think it’s obvious to state that killing and restarting consumers without any thought, can have nasty consequences.

The solution

The solution to this problem is actually quite easy. It basically comes down to making sure that your consumer (which is a PHP script)

  • Blocks any incoming exit signals
  • Processes the message it has in memory
  • Unblocks any exit signals that might have come in
  • Dies peacefully ☠️

PHP’s pcntl extension allows you to control processes like Unix does and thus allows you to override the default behavior for various exit signals:

  • CTRL + c corresponds to the exit signal SIGINT
  • kill [PID] corresponds to the exit signal SIGTERM
  • systemctl stop corresponds to the exit signal SIGTERMas well

The pcntl extension contains a lot of functionality, but we only need 3 functions to gracefully kill consumers:

  • pcntl_signal(): replaces the current signal handler for the signal indicated we define. In other words, this function allows us to override the default behavior when an exit signal comes in.
  • pcntl_sigprocmask(): sets and retrieves blocked signals. Allows for exit signals to be disabled and re-enabled.
  • pcntl_signal_dispatch(): calls signal handlers for pending signals. They will not be processed until this function is called.

Example consumer without gracefully killing it 🥺

<?php

class Consumer
{
public function __construct(
private readonly AMQPStreamConnection $AMQPStreamConnection,
private readonly AMQPChannelFactory $AMQPChannelFactory,
)
{
}

public function consume(Queue $queue): void
{
$channel = $this->AMQPChannelFactory->getForQueue($queue);
$callback = static function (AMQPMessage $message) {
$this->process($message);
};

$channel->basic_consume($queue->getName(), '', false, false, false, false, $callback);

while ($channel->is_open()) {
$channel->wait();
}

$channel->close();
$this->AMQPStreamConnection->close();
}
}

This is a basic example of a consumer that is not aware of any signals and thus will be killed as soon as an exit signal comes in. Now let’s make this worker smarter and ensure it is killed properly by using pcntl functions.

Example consumer with a proper killing 🔫

<?php

class Consumer
{
private bool $forceShutDown = false;

public function __construct(
private readonly AMQPStreamConnection $AMQPStreamConnection,
private readonly AMQPChannelFactory $AMQPChannelFactory,
)
{
// Install signal handlers for the signals we want to override.
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
}

public function shutdown(): void
{
$this->forceShutDown = true;
}

public function consume(Queue $queue): void
{
$channel = $this->AMQPChannelFactory->getForQueue($queue);
$callback = static function (AMQPMessage $message) {
// Block any incoming exit signals to make sure the current message can be processed.
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGINT]);
$this->process($message);
// Unblock any incoming exit signals, message has been processed at this point.
pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
// Dispatch the exit signals that might've come in.
pcntl_signal_dispatch();
};

$channel->basic_consume($queue->getName(), '', false, false, false, false, $callback);

while ($channel->is_open() && !$this->forceShutDown) {
$channel->wait();
// Dispatch incoming exit signals.
pcntl_signal_dispatch();
}

$channel->close();
$this->AMQPStreamConnection->close();
}
}

This consumer blocks any incoming exit signal in its message callback then finishes the processing of the message it has currently in memory and dispatches the exit signal afterward, making sure messages are always processed and never interrupted.

🔥PRO TIP🔥: Symfony has built-in support for signals in the form of “Signal events

If you have any additions to this approach, please let me know. I’d be glad to improve on this solution.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK