2

rabbitmq(7)通配符模式

 2 years ago
source link: https://wakzz.cn/2018/03/25/rabbitmq/(7)%E9%80%9A%E9%85%8D%E7%AC%A6%E6%A8%A1%E5%BC%8F/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

rabbitmq(7)通配符模式

祈雨的博客
2018-03-25

topic实际是direct模式的升级版,topic模式下消息可以通过routingKey匹配的方式对应到消费者。
匹配规则如下:#(井号)可以匹配零个或多个单词,*(星号)可以匹配一个单词。

这里写图片描述

2.1、生产者

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
* 申明交换机, topic
*/
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String message = "Hello world";

/*
* routingKey不是任意的,必须是由点分隔的单词列表,例如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",且最多255个字节
* topic实际是direct模式的升级版,topic模式下消息可以通过routingKey匹配的方式对应到消费者
* 匹配规则如下:#(井号)可以匹配零个或多个单词,*(星号)可以匹配一个单词
* 例如:key1.key2可以匹配key1.*和key1.key2.#,但不能匹配#.key3
*/
String routingKey = "key1.key2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

2.2、消费者1

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, "key1.*");

System.out.println(" [1] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}

2.2、消费者2

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

// 一个消费者可以绑定多个routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "#.key3");

System.out.println(" [2] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK