java rabbitmq

RabbitMQ

1、简介

消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

1.1、实现

消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括 JBoss MessagingJORAMApache ActiveMQ Sun open Message QueueIBM MQApache QpidHttpSQS

当前使用较多的消息队列有 RabbitMQRocketMQActiveMQKafkazeroMQMetaMQ等,而部分数据库如 RedisMysql以及 phxsql 也可实现消息队列的功能。

1.2、特点

MQ是消费者生产者模型的一个典型的代表,端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:
1.AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
2.JMS,Java消息服务(Java Message Service)应用程序接口,是个 java 平台中关于面向消息中间件的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的API,绝大多数 MOM 提供商都对 JMS 提供支持。常见的消息队列,大部分都实现了 JMS API,如 ActiveMQ, Redis 以及 Rabbit MQ 等。

1.3、优点

解耦:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而原系统不需要做任何修改。

异步:把非必要的业务逻辑写入消息队列,需要消息的系统自己从消息队列中订阅,提高网站的响应效率。

流量削峰:系统 A 慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

1.4、使用场景

当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候

2、安装

安装 Rabbit MQ 之前,先安装 erlang ,两者之间版本要对应。

rabbitmq-plugins enable rabbitmq_management 启动插件

2.1、管理界面

1
2
3
4
5
6
7
- 启动服务后,通过 http://localhost:15672/#/ 进入后台管理
- Overview:查看概要信息
- Connections:查看连接信息
- Channels:通道列表
- Exchanges:发送消息时,背后的交换机
- Queues:队列
- Admin:对应当前 Rabbit MQ 中的用户

2.2、使用

Admin 中创建用户,在 Virtual Hosts 中创建 virtual host ,并为用户分配权限

3、专业术语

3.1、Direct Exchange

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

3.2、Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

3.3、Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:

  • (星号 *) 用来表示一个单词 (必须出现的)
  • (井号 #) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的

举个小例子
队列Q1 绑定键为 .TT.

队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

主题交换机是非常强大的
当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个该篇暂不做讲述。

4、简单队列

有一个消费者和生产者,消费者生产消息,消费者消费消息,同时监听队列。

不适用于生产环境

5、工作队列

5.1、轮询模式

5.2、公平模式

公平分发,能者多劳

  1. 消费者设置每次接收的消息 <=1
  2. 设置回执消息为手动 basicConsume 方法中 autoAck 设为 false
  3. 在具体的方法处理中手动回执消息,调用 basicAck 方法

适用一个消息被一个消费者消费的情况

6、Publish/Subscribe 发布/订阅队列

  • 消息产生后不是直接投送到队列中,而是将消息先投送给 Exchange 交换机,然后消息经过 Exchange 交换机投递到相关队列。
  • 多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。

7、Routing 路由队列

  1. 生产者生产的消息投给交换机
  2. 交换机投送消息时的 Exchange Types 为 direct 类型
  3. 消息通过

8、Topics 主题队列

  • routing key 为一个句点号 . 分隔的字符串(将被句点号 . 分隔开的每一段独立的字符串称为一个单词),如 ”stock.usd.nyse“、”nyse.vmw”
  • routing key 中可以存在两种特殊的字符 *# ,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个)

img

9、RPC队列

img

Rabbit MQ 中实现 RPC 的机制是:

  1. 客户端发送请求(消息)时,在消息的属性(MessageProperties,在 AMQP 协议中定义了14种 properties ,这些属性会随着消息一起发送)中设置两个值 replyTo (一个 Queue 名称,用于告诉服务器处理完成之后将通知我的消息发送到这个 Queue 中)和 correlationId (此次请求的标识号,服务器处理完成之后需要将此属性返还,客户端将根据这个 id 了解哪条请求被成功执行了或执行失败。
  2. 服务端收到消息并处理
  3. 服务端处理完消息之后,将生成一条应答消息到 replyTo 指定的 Queue ,同时携带 correlationId属性。

客户端之前已订阅 replyTo 指定的 Queue,从中收到服务器的应答消息之后,根据其中的 correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

10、Rabbit MQ 消息的事务机制

  • 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案
  • txSelect() 开启事务,txCommit() 提交事务,txRollback() 回滚事务
    • 缺点:影响 Rabbit MQ 的性能
  • 通过将 channel 设置成 confirm 模式来实现 confirmSelect() 方法
    • 三种方式:
      • 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器 confirm。实际上是一种串行 confirm。
      • 批量 confirm 模式:每发送一批消息后,调用 waitForConfirmsOrDie() 方法,等待服务器端 confirm。
      • 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会调用这个方法。
    • 同步:
    • 异步:

11、确认模式

confirm

confirm ACK

channel.basicAck(deliveryTag,false) 方法确认签收消息

channel.basicNacl(…) 拒收消息

12、SpringAMQP

依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
- @RabbitListener 标注在类上,需要配合标注了 @RabbitHandler 的方法使用
- 多个方法标注 @RabbitHandler 通过方法参数区分

- @RabbitListener 标注在方法上
- @RabbitListener(bindings={
@QueueBinding(
exchange = @Exchange(name = "spring-rabbitmq-topic",type = "topic"),
value = @Queue("spring-rabbitmq-queue"),
key = {"t"}
)
})

13、消费端限流

  1. 确保 ack 机制为手动确认
  2. 配置属性 perfetcj 一次消费多少条消息

14、TTL(Time To Live)

存活时间/过期时间

当消息到达存活时间之后,还没有被消费,会被自动清除。

RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

整个队列过期(设置 x-message-ttl 参数),单个消息过期(设置 message.getMessageProperties().setExpiration(“毫米数”)

两者都设置,以时间短的为准

队列过期,会将队列所有的消息全部移除

消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)

15、死信队列

英文缩写 DLX,Dead Letter Exchange(死信交换机),当消息成为 Dead message 后,可以被重新发送到另一个交换机,这个交换机就是 DLX。

消息成为死信的情况:

  1. 队列消息长度达到限制
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  3. 原队列存在消息过期设置,消息到达超时时间未被消费

队列绑定死信交换机:

给队列设置参数:x-dead-letter-exchangex-dead-letter-routing-key

image-20210928141301129

  • 死信交换机和死信队列和普通的交换机和队列没有区别
  • 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。

16、延迟队列

即消息进入消息队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功 7 天后,发送短信问候。

实现方式:

  1. 定时器
  2. 延迟队列

image-20210928145131461

rabbitmq 并没有直接提供延迟队列的功能,但可以通过 TTL + 死信队列 组合实现延迟队列的效果。

image-20210928151823342

17、日志与监控

18、消息可靠性分析与追踪

Firehose :消息追踪,通过 rabbitmqctl trace_on / rabbitmqctl_off 开启和关闭。

rabbitme_tracing : 插件

19、管理

20、消息可靠性保障

  • 消息补偿机制

image-20210928162355791

21、消息幂等性处理

  • 乐观锁解决方案

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果,也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在 MQ 中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

image-20210928163049716

22、RabbitMQ高可用集群

部署多个 rabbitmq 实例,使用 HAProxy 做代理转发

image-20210928163434630

  • Copyrights © 2022-2023 hqz

请我喝杯咖啡吧~

支付宝
微信