1

rabbitmq(8)事务

 2 years ago
source link: https://wakzz.cn/2018/03/25/rabbitmq/(8)%E4%BA%8B%E5%8A%A1/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

rabbitmq(8)事务

祈雨的博客
2018-03-25

rabbitmq的事务机制可以保证生产者发送的消息成功传递到broker。如果在事务提交前由于本地或broker的问题发生异常,则可以回滚事务。

2.1、生产者

private final static String QUEUE_NAME = "hello";

@Test
public void testSend() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

/**
* 事务模式吞吐量低
*/
try {
// 开启事务
channel.txSelect();

String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

/**
* 如果在提交事务前发生异常,则通过回滚事务,消费者将不会接收到该消息
*/

// 提交事务
channel.txCommit();
} catch (Exception e) {
// 回滚事务
channel.txRollback();
e.printStackTrace();
}

channel.close();
connection.close();
}

2.2、消费者

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK