消息丢失的情况
消息丢失的情况主要有以下三种:
- 生产者向消息代理传递消息的过程中,消息丢失了
- 消息代理( RabbitMQ )把消息弄丢了
- 消费者把消息弄丢了

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性
生产者的可靠性
2.1 生产者重连
由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: CaiXuKun
password: T1rhFXMGXIOYCoyi
connection-timeout: 1s # 连接超时时间
template:
retry:
enabled: true # 开启连接超时重试机制
initial-interval: 1000ms # 连接失败后的初始等待时间
multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
注意事项:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
可以考虑使用异步线程来执行发送消息的代码
2.2 生产者确认
RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
其它情况都会返回 NACK,告知生产者消息投递失败

2.3 生产者确认机制的代码实现
在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
publisher-confirm-type 有三种模式:
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息
每个 RabbitTemplate 只能配置一个 ReturnCallback
publisher 模块新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置回调
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
System.out.println("收到消息的return callback, " +
"exchange = " + returnedMessage.getExchange() + ", " +
"routingKey = " + returnedMessage.getRoutingKey() + ", " +
"replyCode = " + returnedMessage.getReplyCode() + ", " +
"replyText = " + returnedMessage.getReplyText() + ", " +
"message = " + returnedMessage.getMessage());
});
}
}
publisher 模块添加一个测试类,测试 ReturnCallback 的效果
@Test
void testConfirmCallback() throws InterruptedException {
CorrelationData correlationData = new CorrelationData();
correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
if (confirm.isAck()) {
// 消息发送成功
System.out.println("消息发送成功,收到ack");
} else {
// 消息发送失败
System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());
}
if (throwable != null) {
// 消息回调失败
System.err.println("消息回调失败");
}
});
rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);
// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
Thread.sleep(2000);
}
如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果

如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果

可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag 为 0 表示消息无法路由到队列)
2.4 如何看待和处理生产者的确认信息
生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息
消息代理(RabbitMQ)的可靠性
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
一旦 RabbitMQ 宕机,内存中的消息会丢失
内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息


数据持久化
RabbitMQ 实现数据持久化包括 3 个方面:
交换机持久化
队列持久化
消息持久化
注意事项:
利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)
延迟消息
什么是延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务

死信交换机
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

利用死信交换机的特点,可以实现发送延迟消息的功能
延迟消息插件(推荐使用)
5.3.1 下载并安装延迟插件
RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中
插件的下载地址:rabbitmq-delayed-message-exchange
延迟消息的原理和缺点
RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒
RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)
定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大
所以说,延迟消息适用于延迟时间较短的场景
取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
- 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源


发送延迟检测订单的消息
我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class MultipleDelayMessage<T> {
private T data;
private List<Long> delayMillis;
public MultipleDelayMessage() {
}
public MultipleDelayMessage(T data, Long... delayMillis) {
this.data = data;
this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
}
public MultipleDelayMessage(T data, List<Long> delayMillis) {
this.data = data;
this.delayMillis = delayMillis;
}
public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
}
public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
return new MultipleDelayMessage<>(data, delayMillis);
}
public boolean hasNextDelay() {
return !delayMillis.isEmpty();
}
public Long removeNextDelay() {
return delayMillis.remove(0);
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public List<Long> getDelayMillis() {
return delayMillis;
}
public void setDelayMillis(List<Long> delayMillis) {
this.delayMillis = delayMillis;
}
@Override
public String toString() {
return "MultipleDelayMessage{" +
"data=" + data +
", delayMillis=" + delayMillis +
'}';
}
}
我们再定义一个发送延迟消息的消息处理器,供所有服务使用
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class DelayMessagePostProcessor implements MessagePostProcessor {
private final Integer delay;
public DelayMessagePostProcessor(Integer delay) {
this.delay = delay;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
改造后的发送延迟消息的测试方法
“delay.direct”:交换机的名称
“delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列
“Hello, DelayQueue!”:实际要发送的消息内容
new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}






