MQ相关学习(高级篇)
本文最后更新于92 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com

消息丢失的情况

消息丢失的情况主要有以下三种:

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性

 生产者的可靠性

2.1 生产者重连

由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制

application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /blog
    username: CaiXuKun
    password: T1rhFXMGXIOYCoyi
    connection-timeout: 1s # 连接超时时间
    template:
      retry:
        enabled: true # 开启连接超时重试机制
        initial-interval: 1000ms # 连接失败后的初始等待时间
        multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
        max-attempts: 3 # 最大重试次数

注意事项:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
可以考虑使用异步线程来执行发送消息的代码

2.2 生产者确认


RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:

消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
其它情况都会返回 NACK,告知生产者消息投递失败

2.3 生产者确认机制的代码实现

在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)

spring:
  rabbitmq:
    publisher-returns: true
    publisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

 publisher 模块新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 配置回调
        rabbitTemplate.setReturnsCallback((returnedMessage) -> {
            System.out.println("收到消息的return callback, " +
                    "exchange = " + returnedMessage.getExchange() + ", " +
                    "routingKey = " + returnedMessage.getRoutingKey() + ", " +
                    "replyCode = " + returnedMessage.getReplyCode() + ", " +
                    "replyText = " + returnedMessage.getReplyText() + ", " +
                    "message = " + returnedMessage.getMessage());
        });
    }

}

publisher 模块添加一个测试类,测试 ReturnCallback 的效果





@Test
void testConfirmCallback() throws InterruptedException {
    CorrelationData correlationData = new CorrelationData();
    correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
        if (confirm.isAck()) {
            // 消息发送成功
            System.out.println("消息发送成功,收到ack");
        } else {
            // 消息发送失败
            System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());
        }

        if (throwable != null) {
            // 消息回调失败
            System.err.println("消息回调失败");
        }
    });

    rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);

    // 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
    Thread.sleep(2000);
}

如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果

如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果

可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag 为 0 表示消息无法路由到队列)

2.4 如何看待和处理生产者的确认信息


生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息

消息代理(RabbitMQ)的可靠性


在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

    一旦 RabbitMQ 宕机,内存中的消息会丢失
    内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
    怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

    数据持久化


    RabbitMQ 实现数据持久化包括 3 个方面:

    交换机持久化
    队列持久化
    消息持久化


    注意事项:

    利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
    在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)

    延迟消息

    什么是延迟消息

    延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

    延迟任务:一定时间之后才会执行的任务

    死信交换机
    当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

    消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
    过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
    要投递的队列消息堆积满了,最早的消息可能成为死信

    如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

    利用死信交换机的特点,可以实现发送延迟消息的功能

    延迟消息插件(推荐使用)


    5.3.1 下载并安装延迟插件
    RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

    插件的下载地址:rabbitmq-delayed-message-exchange

    延迟消息的原理和缺点
    RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

    RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

    定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

    所以说,延迟消息适用于延迟时间较短的场景

    取消超时订单

    设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

    1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
    2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

    发送延迟检测订单的消息

    我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    public class MultipleDelayMessage<T> {
    
        private T data;
    
        private List<Long> delayMillis;
    
        public MultipleDelayMessage() {
    
        }
    
        public MultipleDelayMessage(T data, Long... delayMillis) {
            this.data = data;
            this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
        }
    
        public MultipleDelayMessage(T data, List<Long> delayMillis) {
            this.data = data;
            this.delayMillis = delayMillis;
        }
    
        public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
            return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
        }
    
        public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
            return new MultipleDelayMessage<>(data, delayMillis);
        }
    
        public boolean hasNextDelay() {
            return !delayMillis.isEmpty();
        }
    
        public Long removeNextDelay() {
            return delayMillis.remove(0);
        }
    
        public T getData() {
            return data;
        }
    
        public void setData(T data) {
            this.data = data;
        }
    
        public List<Long> getDelayMillis() {
            return delayMillis;
        }
    
        public void setDelayMillis(List<Long> delayMillis) {
            this.delayMillis = delayMillis;
        }
    
        @Override
        public String toString() {
            return "MultipleDelayMessage{" +
                    "data=" + data +
                    ", delayMillis=" + delayMillis +
                    '}';
        }
    
    }
    

    我们再定义一个发送延迟消息的消息处理器,供所有服务使用

    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    
    public class DelayMessagePostProcessor implements MessagePostProcessor {
    
        private final Integer delay;
    
        public DelayMessagePostProcessor(Integer delay) {
            this.delay = delay;
        }
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setDelay(delay);
            return message;
        }
    
    }
    

    改造后的发送延迟消息的测试方法

    “delay.direct”:交换机的名称
    “delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列
    “Hello, DelayQueue!”:实际要发送的消息内容
    new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改

    @Test
    void testSendDelayMessage() {
        rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));
    
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
        simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
        System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
    }
    
    文末附加内容
    暂无评论

    发送评论 编辑评论

    
    				
    |´・ω・)ノ
    ヾ(≧∇≦*)ゝ
    (☆ω☆)
    (╯‵□′)╯︵┴─┴
     ̄﹃ ̄
    (/ω\)
    ∠( ᐛ 」∠)_
    (๑•̀ㅁ•́ฅ)
    →_→
    ୧(๑•̀⌄•́๑)૭
    ٩(ˊᗜˋ*)و
    (ノ°ο°)ノ
    (´இ皿இ`)
    ⌇●﹏●⌇
    (ฅ´ω`ฅ)
    (╯°A°)╯︵○○○
    φ( ̄∇ ̄o)
    ヾ(´・ ・`。)ノ"
    ( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
    (ó﹏ò。)
    Σ(っ °Д °;)っ
    ( ,,´・ω・)ノ"(´っω・`。)
    ╮(╯▽╰)╭
    o(*////▽////*)q
    >﹏<
    ( ๑´•ω•) "(ㆆᴗㆆ)
    😂
    😀
    😅
    😊
    🙂
    🙃
    😌
    😍
    😘
    😜
    😝
    😏
    😒
    🙄
    😳
    😡
    😔
    😫
    😱
    😭
    💩
    👻
    🙌
    🖕
    👍
    👫
    👬
    👭
    🌚
    🌝
    🙈
    💊
    😶
    🙏
    🍦
    🍉
    😣
    Source: github.com/k4yt3x/flowerhd
    颜文字
    Emoji
    小恐龙
    花!
    上一篇
    下一篇