应用场景
目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:
- 淘宝七天自动确认收货。在签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
- 12306 购票支付确认页面。在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。
在上面两种场景中,如果使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
- 使用 redis 给订单设置过期时间,最后通过判断 redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道redis都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
- 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
- 使用 jvm 原生的 DelayQueue,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。
问题分析
消息从生产端到消费端消费要经过3个步骤:
- 生产端发送消息到RabbitMQ;
- RabbitMQ发送消息到消费端;
- 消费端消费这条消息;
这3个步骤中的每一步都有可能导致消息丢失,所以要有一些措施来保证系统的可靠性。磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。
生产端可靠性投递
比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失。针对以上情况,RabbitMQ本身提供了一些机制。
事务消息机制
事务消息机制由于会严重降低性能
,所以一般不采用这种方法,在此不扩展了,而采用另一种轻量级的解决方案——confirm消息确认机制
。
confirm消息确认机制
什么是confirm消息确认机制?
就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
RabbitMQ消息处理
消息持久化
什么是消息持久化呢?
RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。
如何持久化呢?
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。所有需要给exchange、queue和message都进行持久化,这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
消息入库
前面提到了会有极端情况,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了
,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息
,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。
所以除了RabbitMQ提供的一些机制外,也要做一些消息补偿机制,以应对一些极端情况。
消息入库,就是将要发送的消息保存到数据库中
。
首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认
;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息
。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器
,定时检索消息表,将status=0并且超过固定时间
后,还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性
),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。
这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。
注:本文案例没实现消息入库方案,可自行加逻辑,不难
消费端消息不丢失
默认情况下,以下3种情况会导致消息丢失:
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
- 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
其实,上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制
,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。
所以就需要将自动ack机制改为手动ack机制。
对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息
;一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息
。如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性
。
之前一般采用死信队列+TTL过期时间来实现延迟队列,现在RabbitMQ 官方提供了延迟队列的插件,这个插件是实验性的,但相当稳定,废话不多说开整
死信队列
死信(Dead Letter)
是Rabbitmq 提供的一种机制。当一条消息满足下列条件之一那么它会成为死信:
- 消息被否定确认 (如
channel.basicNack
) 并且此时requeue
属性被设置为false
- 消息在队列的存活时间超过设置的
TTL
时间
- 消息队列的消息数量已经超过最大队列长度
若配置了死信队列,死信会被 Rabbitmq 投到死信队列中。
在Rabbitmq 中创建死信队列的操作流程
大概是:
- 创建一个交换机作为死信交换机
- 在业务队列中配置
x-dead-letter-exchange
和 x-dead-letter-routing-key
,将第一步的交换机设为业务队列的死信交换机
- 在死信交换机上创建队列,并监听此队列
死信队列的设计目的:为了存储没有被正常消费的消息,便于排查和重新投递
。
死信队列同样也没有对投递时间做出保证
,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。
为了解决这个问题,Rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange
,推荐使用官方插件来做延时消息。
插件安装
注:延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依赖 Erlang/OPT 18.0 及以上运行环境。
此插件需要 Erlang 23.2 或更高版本。Erlang windows下载
最新版本针对 RabbitMQ3.8.x,较早的系列已不受支持。RabbitMQ下载
RabbitMQ 官方提供的延迟队列插件,下载放置到 RabbitMQ 根目录下的 plugins 内。延迟队列插件下载
进入RabbitMQ安装目录的sbin目录下,在cmd窗口使用命令启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
- 禁用插件,使用如下命令,
但请注意,所有尚未交付的延迟消息都将丢失
。
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
|
注:本demo在windows环境下模拟,插件采用rabbitmq-delayed-message-exchange v3.8.x版本;插件启用或禁用时,若Rabbitmq服务一直是启用状态的话,需重启使其生效
实现延迟推送信息
依赖文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-boot-project</artifactId> <groupId>cn.goitman</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq-delayed-demo</artifactId>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.4.RELEASE</version> </dependency>
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency>
</dependencies> </project>
|
配置文件
一般来说消息生产和消费是两个独立的项目,配置应该分开,这里为了方便就整合在一块啦
server: port: 8080 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / connection-timeout: 15000 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 10 prefetch: 2
|
- 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
- 在springboot2.2.0.RELEASE版本之前是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
- NONE值是禁用发布确认模式,是默认值
- CORRELATED值是发布消息成功到交换器后会触发回调方法
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
引导类
package cn.goitman;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitmqApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); } }
|
实体类
模拟一个订单对象,发送、接收对象信息
package cn.goitman.pojo;
public class Order {
public String orderId;
public String orderStatus;
public Order() { }
public Order(String orderId, String orderStatus) { this.orderId = orderId; this.orderStatus = orderStatus; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getOrderStatus() { return orderStatus; }
public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } }
|
配置类
package cn.goitman.config;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitmqConfig {
private final static Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
public static final String DELAYED_EXCHANGE = "delayedExchange"; public static final String DELAYED_QUEUE = "delayedQueue"; public static final String DELAYED_KEY = "delayed.#";
@Bean public TopicExchange delayedExchange() { TopicExchange exchange = new TopicExchange(DELAYED_EXCHANGE, true, false); exchange.setDelayed(true); return exchange; }
@Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE, true); }
@Bean public Binding delayedBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_KEY); }
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
|
消息生产者
package cn.goitman.component;
import cn.goitman.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.UUID;
@Component public class ProducerSender {
private final static Logger logger = LoggerFactory.getLogger(ProducerSender.class);
@Autowired private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("correlationData:{},ack:{},cause:{}",correlationData.toString(), ack, cause); } };
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("returnedMessage:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } };
public void sendMessage(Object message) { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE, "delayed.boot", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(6000); return message; } }, new CorrelationData(UUID.randomUUID().toString().replace("-", ""))); } }
|
消息消费者
package cn.goitman.component;
import cn.goitman.pojo.Order; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class ConsumerReceiver {
private final static Logger logger = LoggerFactory.getLogger(ConsumerReceiver.class);
@RabbitListener(queues = "delayedQueue") public void receiverMessage(Message msg , Channel channel) throws IOException { Order order = JSONObject.parseObject(new String(msg.getBody(),"UTF-8"), Order.class); logger.info("order:{}",order.toString());
boolean flag = true;
if (flag) {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false); logger.info("消费成功,ID:{}",msg.getMessageProperties().getDeliveryTag()); }else {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true); logger.info("消费失败,ID:{}",msg.getMessageProperties().getDeliveryTag()); } } }
|
测试类
当然也可以写个测试方法,这里就这样啦
package cn.goitman.controller;
import cn.goitman.component.ProducerSender; import cn.goitman.pojo.Order; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;
@RestController public class MessageSendTest {
@Autowired private ProducerSender producerSender;
@PostMapping("/delayedSend") public void delayedSend(@RequestBody Order order){ producerSender.sendMessage(order); } }
|
测试结果
交换机
绑定交换机和队列之间的联系,并配置路由键字符
队列
6秒后交换机将数据发送到队列,队列即时发送给消费端消费数据
源码地址:https://github.com/wangdaicong/spring-boot-project/tree/master/rabbitmq-delayed-demo