8

rabbit mq

 4 years ago
source link: http://baixin.io/2020/04/rabbitMq/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1.入门

1.简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。 select *from student where id = 5 ;先在索引上按id=5查找,然后返回包含该值的数据行。

2.应用场景

异步处理

场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。缺点:当库存系统出现故障时,订单就会失败;订单系统和库存系统高耦合。

消息队列:订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:订阅下单的消息,获取下单消息,进行库操作。

流量削峰 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 消息队列:1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.2.秒杀业务根据消息队列中的请求信息,再做后续处理。

3.架构

rEzIvyE.jpg!web

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

Channel:消息通道,在客户端的每个连接里,可建立多个channel。

2.特性

1.任务分发机制

Round-robin(轮询分发)

在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。 RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。 Fair dispatch(公平分发)

轮训不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。可能存在有些服务很忙依然分发给它,有些很轻松却没有任务。为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

2.消息应答

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3.消息持久化

当消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍将失去!当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

4.发布/订阅

生产者只能发送消息给Exchanges(转发器),转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。

channel.exchangeDeclare("logs", "fanout");

转发器转发消息到队列。关联转发器和队列的叫Binding。

channel.queueBind(queueName, "logs", "");

Direct exchange(直接转发) 准确匹配 Topic exchange(主题转发器) 模糊匹配

(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

.orange. ”,Q2绑定键是“ .*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而”lazy.brown.fox”则只发往第二个队列。 “quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

3.代码

producer:

String EXCHANGE_NAME = "direct_logs";
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志
			
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			system.out.println(message);
		}
		// 关闭频道和连接
		channel.close();

consumer:

ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    system.out.println(message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK