4

rabbitmq(9)延时消息

 2 years ago
source link: https://wakzz.cn/2018/03/25/rabbitmq/(9)%E5%BB%B6%E6%97%B6%E6%B6%88%E6%81%AF/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

rabbitmq(9)延时消息

祈雨的博客
2018-03-25

rabbitmq没有直接支持延时消息的功能,但可以通过死信队列实现延时消息的功能。

2.1、生产者

private final static String QUEUE_NAME = "MAIN_QUEUE";
private final static String _DIRECT_NAME = "_delay_delay";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", QUEUE_NAME);
channel.queueDeclare(_DIRECT_NAME, true, false, false, arguments);

// 延迟5秒
long delay = 5*1000;

String message = "Hello World!";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(delay+"").build();
channel.basicPublish("", _DIRECT_NAME, properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

2.2、消费者

private final static String QUEUE_NAME = "MAIN_QUEUE";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK