个人 二维码




公众号二维码

目录

为什么我推荐你使用 RabbitMQ 实现订单超时处理(一)


温馨提示:本案例图文并茂,代码齐全,大家只需要跟着文章一步步学习即可实现。想要一步到位直接获取代码的同学,请关注微信公众号「哈喽沃德先生」回复 ordertimeout 即可

  大家平时应该都有过网上购物、看电影、点外卖的经历,有时候也会因为某些原因导致商品不想要了,看电影去不了了,外卖不想吃了等情况发生,此时我们无需做其他操作,即使已经下单,只要不付款即可,过段时间该订单则会自动取消。

  行内话讲,这叫做业务延迟处理或延迟任务,即业务有“在一段时间之后,完成一个工作任务”的需求。既然这个功能这么重要,身为一名合格的程序员必须搞清楚其背后的实现原理。安排!

  本案例分上下集两篇,该文为第一篇,主要讲解 RabbitMQ 如何实现延迟任务处理。下一篇将通过 Spring Boot + RabbitMQ + Vue 实现一个简易版的订单超时处理系统,方便大家理解其背后的原理。

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210322191636607.png

案例分析

  

  无论是电商购物、购买电影票、还是点外卖,相同的环节都是下单即生成订单,然后唤起支付系统进行支付。这里面肯定会遇到订单超时(下单以后未支付,指定时间内取消订单)的问题,且面试中也会被经常问到。

  如果单拿订单超时这个案例来说,核心逻辑无非以下几点:

  • 用户选择心仪的商品以后提交订单
  • 后台生成订单相关数据并入库(订单编号、订单关联商品、订单关联用户、订单状态、支付状态、发货状态等)
  • 前台进入支付界面
  • 用户放弃支付且未取消订单
  • 通过延迟任务实现订单超时处理自动关闭订单(修改订单相关数据,修改状态,返回库存等)

  

  其实除了订单超时场景之外,以下场景同样可以使用延迟任务来解决:

  • 新用户注册后,3 天内没有登陆,则进行短信提醒
  • 抖音用户 3 天未上线,从该用户关注列表中随机选择一个最新作品并发送短信撩动
  • 手机远程遥控智能设备 1 小时后启动
  • 订单在 15 分钟之内未支付,则自动取消
  • 新创建的店铺,10 天内没有上传商品,则自动发送消息提醒
  • 用户发起退款,3 天内没有得到处理,则通知相关运营人员
  • 预定会议后,在预定的时间点前 10 分钟通知相关人员参加会议
  • ……

  

方案分析

  

  这里先罗列几种延迟任务的解决方案,然后再分析其优缺点:

  • JDK 延迟队列
  • 定时任务
  • 被动取消
  • Redis Sorted Set
  • Redis 事件通知
  • 时间轮算法
  • RabbitMQ
  • ……

  

JDK 延迟队列

  

  该方案是利用 JDK 自带的 java.util.concurrent 包中的 DelayQueue 队列。

1
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>

  这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入 DelayQueue 中的对象,必须实现 Delayed 接口。

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210406190749640.png
  • offer():添加元素
  • poll():获取并移除队列的超时元素,没有则返回空
  • take():获取并移除队列的超时元素,没有则 wait 当前线程,直到有元素满足超时条件时返回结果

  

优点

  • 效率高,任务触发时间延迟低

  

缺点

  • 服务器重启后,数据全部丢失,怕宕机
  • 集群扩展麻烦,难度较高
  • 由于内存条件限制的原因,下单未付款的订单过多时,容易出现 OOM 异常
  • 代码复杂度较高

  

定时任务

  

  这种方式是最简单的,启动一个计划任务,每隔一定时间(假设 1 分钟)去扫描数据库一次,通过订单时间来判断是否超时,然后进行 UPDATE 或 DELETE 操作。

  

优点

  • 实现简单
  • 高可用性,支持集群(Quartz、TBSchedule、XX-JOB、Elastic-Job、Staurn、LTS 等)

  

缺点

  • 服务器内存消耗大
  • 存在延迟,比如每 1 分钟扫描一次,延迟时间就是 1 分钟。也有可能会更久,比如 1 分钟之内有大量数据,1 分钟没处理完,那么下一分钟的就会顺延
  • 效率低
  • 数据库压力大,订单数据过大时,数据库压力也会增加

  

被动取消

  

  利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。

  这种方式依赖于用户的查询操作触发,如果用户不进行查询订单的操作,该订单就永远不会被取消。所以,实际应用中,也是 被动取消 + 定时任务 的组合方式来实现。这种情况下定时任务的时间可以设置的稍微“长”一点。

  

优点

  • 实现简单
  • 支持集群(Quartz、TBSchedule、XX-JOB、Elastic-Job、Staurn、LTS 等)

  

缺点

  • 产生额外影响,比如统计订单、统计库存等
  • 影响用户体验,打开订单列表时需要处理数据,从而降低显示的实时性

  

Redis Sorted Set

  

  Redis 有序集合(Sorted Set)每个元素都会关联一个 double 类型的分数 score。Redis 可以通过分数来为集合中的成员进行从小到大的排序。

1
2
3
4
添加元素:ZADD key score member [[score member] [score member]]
按顺序查询元素:ZRANGE key start stop [WITHSCORES]
查询元素score:ZSCORE key member
移除元素:ZREM key member [member …]

  该方案可以将订单超时时间戳与订单编号分别设置为 score 和 member。系统扫描第一个元素判断是否超时,超时则进行业务处理。

  然而,这一版存在一个致命的硬伤,在高并发条件下,多个消费者会取到同一个订单编号,又需要编写 Lua 脚本保证原子性或使用分布式锁,用了分布式锁性能又下降了。

  

优点

  • 可靠性,基于 Redis 自身的持久化特性实现消息持久化
  • 高可用性,支持单机、主从、哨兵、集群多种模式

  

缺点

  • 单个有序集合无法支持太大的数据量
  • 需要额外进行 Redis 的维护

  

Redis 事件通知

  

  该方案是使用 Redis 的 Keyspace Notifications,利用该机制可以在 Key 失效之后,提供一个回调,实际上就是 Redis 会给客户端发送一个消息。需要 Redis 版本 2.8 以上。

  修改 Redis 相关事件配置。找到 Redis 配置文件 redis.conf,查看 notify-keyspace-events 配置项,如果没有,添加 notify-keyspace-events Ex,如果有值,则追加 Ex,相关参数说明如下:

  • K:keyspace 事件,事件以 keyspace@ 为前缀进行发布
  • E:keyevent 事件,事件以 keyevent@ 为前缀进行发布
  • g:一般性的,非特定类型的命令,比如 del,expire,rename 等
  • $:字符串特定命令
  • l:列表特定命令
  • s:集合特定命令
  • h:哈希特定命令
  • z:有序集合特定命令
  • x:过期事件,当某个键过期并删除时会产生该事件
  • e:驱逐事件,当某个键因 maxmemore 策略而被删除时,产生该事件
  • A:g$lshzxe 的别名,因此 AKE 表示所有事件

  利用该机制,可以将订单编号写入 Redis,并设置对应的过期时间,当订单过期后,取到过期的 Key 然后进行业务处理。

  

优点

  • 可靠性,基于 Redis 自身的持久化特性实现消息持久化
  • 高可用性,支持单机、主从、哨兵、集群多种模式

  

缺点

  • 开启键通知会对 Redis 产生额外的开销
  • 目前键通知功能 Redis 并不保证消息必达,Redis 客户端断开连接所有 Key 会丢失
  • 需要额外进行 Redis 的维护

  

时间轮算法

  

https://mrhelloworld.com/resources/articles/why/rabbitmq/471426-20180101114911565-1021558722.png

  时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向固定频率轮动,每一次跳动称为一个 tick。这样可以看出时间轮有 3 个重要的属性参数:

  • ticksPerWheel:一轮的 tick 数
  • tickDuration:一个 tick 的持续时间
  • timeUnit:时间单位

  当 ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的时钟走动完全类似了。

  如果当前指针指在 1 上面,我有一个任务需要 4 秒以后执行,那么这个执行的线程回调或者消息将会被放在 5 上。那如果需要在 20 秒之后执行怎么办,由于这个环形结构槽数只到 8,如果要 20 秒,指针需要多转 2 圈。位置是在 2 圈之后的 5 上面(20 % 8 + 1)。

  时间轮,是一种实现延迟功能(定时器)的巧妙算法,在 Netty,Zookeeper,Kafka 等各种框架中,甚至 Linux 内核中都有用到。时间轮算法的精髓当然不止这么简单,本文主要讲解 RabbitMQ 的延迟任务实现方案,所以这里简单带过。

  

优点

  • 效率高
  • 如果使用 Netty 的 HashedWheelTimer 来实现,代码复杂度比 JDK 的 DelayQueue 低
  • 如果使用第三方中间件实现,支持集群扩展、高吞吐量、消息持久化等

  

缺点

  • 服务器重启后,数据全部丢失,怕宕机
  • 集群扩展麻烦,难度较高
  • 由于内存条件限制的原因,下单未付款的订单过多时,容易出现 OOM 异常
  • 如果使用第三方中间件实现,需要额外进行第三方中间件的维护

  

RabbitMQ

  

  终于进入本文的主题部分了,首先借用 RabbitMQ 官网的一句话简单介绍一下它:RabbitMQ is the most widely deployed open source message broker.

  

延迟队列

  

  队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

  延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。

  简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

  本文使用 RabbitMQ 也是通过延迟队列的机制来实现订单超时的处理。然而 RabbitMQ 自身并没有延迟队列这个功能,实现该功能一般有以下两种方式:

  • 利用 TTL(Time To Live)DLX(Dead Letter Exchanges) 实现延迟队列
  • 利用 RabbitMQ 的社区插件 rabbitmq_delayed_message_exchange 实现,作者:Alvaro Videla

  

TTL

  

  TTL 是 RabbitMQ 中一个消息或者队列的存活时间属性,表示一条消息或者队列中所有消息的存活时间,单位毫秒。

  如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息在 TTL 设置的时间内没有被消费的话,则会成为“死信”。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用。

  

DLX

  

  DLX 是 RabbitMQ 中的一种消息机制,表示死信交换机。

  当消息在一个队列中变为死信后,它会被重新发送到死信交换机。以下情况会出现死信:

  • 消息未被签收,消费端使用了 channel.basicNackchannel.basicReject,并且 requeue 属性被设置为 false
  • 消息过期(TTL)
  • 消息队列达到了最大长度

  “死信”消息会被 RabbitMQ 进行特殊处理,如果配置了死信交换机,那么该消息将会被丢进死信交换机中,被死信交换机上绑定的队列所消费,如果没有配置,则该消息被丢弃。

  

优缺点

  

优点

  • 可靠性,消息持久化
  • 高可用性,非常方便部署负载均衡的集群,实现高可用性和吞吐量,轻松联合多个可用性区域和块
  • 易管理和监控,使用 HTTP-API,命令行工具或其他 UI 工具来管理和监控 RabbitMQ

  

缺点

  • 系统可用性降低
  • 系统复杂性变高
  • 系统一致性问题
  • 需要额外进行 RabbitMQ 的维护

  

总结

  

  当然也要分实际情况来决定,如果贵司已经在用 RabbitMQ 的情况下,延迟任务肯定首选使用 RabbitMQ 来实现,如果贵司并没有使用 RabbitMQ,就为了实现这样一个功能而强行使用 RabbitMQ,在一个稳定运行的系统中引入一个第三方中间件是需要考虑很多问题的,否则就会得不偿失。

  目前大型互联网公司多多少少都会引入消息中间件,毕竟它拥有解耦、异步、流量削峰、日志处理等优点及功能,是分布式系统中重要的组件。在这种情况下,使用消息中间件来实现延迟任务就变得理所当然了。

  综上所述,你懂的,废话不多说,下面进入实战环节。

  

准备工作

  

环境

  

  • RabbitMQ:3.8.14
  • Spring Boot:2.4.4
  • JDK:11.0.10
  • IDE:IntelliJ IDEA

  

RabbitMQ

  

  为了方便省事,本文采用单节点 RabbitMQ。

  

Spring Boot

  

创建项目

  

  使用 Spring Initializr 初始化 Spring Boot 项目,添加 Spring WebSpring RabbitMQLombok

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210408215806811.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210408220702813.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210408220808373.png

  

配置文件

  

  application.yml 配置 RabbitMQ 服务器相关信息。

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: 192.168.10.101 # 服务器 IP
    port: 5672           # 服务器端口
    username: guest      # 用户名
    password: guest      # 密码
    virtual-host: /      # 虚拟主机地址

  

方案实现

  

RabbitMQ 延迟队列实现

  

  本文会讲解三种 RabbitMQ 实现延迟队列的方式,第一种按下图来进行构建:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210408224801574.png

  生产者生产一条延迟消息,根据延迟时间的不同,利用不同的 routingkey 将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性,并绑定到同一个死信交换机中。

  消息过期后,根据 routingkey 的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。

  

RabbitMQ 配置类

  

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Configuration
public class RabbitMQConfiguration {

    // 延迟交换机
    public static final String DELAY_EXCHANGE = "delay.exchange";
    // 延迟队列
    public static final String DELAY_QUEUE_A = "delay.queue.a";
    public static final String DELAY_QUEUE_B = "delay.queue.b";
    // 延迟队列路由Key
    public static final String DELAY_QUEUE_A_ROUTING_KEY = "delay.queue.a.routingkey";
    public static final String DELAY_QUEUE_B_ROUTING_KEY = "delay.queue.b.routingkey";
    // 死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    // 死信队列
    public static final String DEAD_LETTER_QUEUE_A = "dead.letter.queue.a";
    public static final String DEAD_LETTER_QUEUE_B = "dead.letter.queue.b";
    // 死信队列路由Key
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead.letter.delay_10s.routingkey";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead.letter.delay_60s.routingkey";

    // 声明延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }

    // 声明死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明延迟队列A,延时 10s,并绑定到对应的死信交换机
    @Bean("delayQueueA")
    public Queue delayQueueA() {
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 声明队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明队列的死信路由Key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        // x-message-ttl 声明队列的消息TTL存活时间
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(DELAY_QUEUE_A).withArguments(args).build();
    }

    // 声明延迟队列B,延时 60s,并绑定到对应的死信交换机
    @Bean("delayQueueB")
    public Queue delayQueueB() {
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 声明队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明队列的死信路由Key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        // x-message-ttl 声明队列的消息TTL存活时间
        args.put("x-message-ttl", 60000);
        return QueueBuilder.durable(DELAY_QUEUE_B).withArguments(args).build();
    }

    // 声明延迟队列A的绑定关系
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_A_ROUTING_KEY);
    }

    // 声明延迟队列B的绑定关系
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_B_ROUTING_KEY);
    }

    // 声明死信队列A 用于接收延时 10s 处理的消息
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUE_A);
    }

    // 声明死信队列B 用于接收延时 60s 处理的消息
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB() {
        return new Queue(DEAD_LETTER_QUEUE_B);
    }

    // 声明死信队列A的绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }

    // 声明死信队列B的绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

}

  

枚举类

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Objects;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Getter
@AllArgsConstructor
public enum DelayTypeEnum {

    // 10s
    DELAY_10s(1),
    // 60s
    DELAY_60s(2);

    private Integer type;

    public static DelayTypeEnum getDelayTypeEnum(Integer type) {
        if (Objects.equals(type, DELAY_10s.type)) {
            return DELAY_10s;
        }
        if (Objects.equals(type, DELAY_60s.type)) {
            return DELAY_60s;
        }
        return null;
    }

}

  

消息生产者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.producer;

import com.example.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import static com.example.config.RabbitMQConfiguration.*;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Component
public class DelayMessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String message, DelayTypeEnum type) {
        switch (type) {
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_A_ROUTING_KEY, message);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_B_ROUTING_KEY, message);
                break;
            default:
        }
    }

}

  

消息消费者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static com.example.config.RabbitMQConfiguration.*;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    // 监听死信队列A
    @RabbitListener(queues = DEAD_LETTER_QUEUE_A)
    public void receiveA(Message message, Channel channel) {
        // 获取消息
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(), msg);
    }

    // 监听死信队列B
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B)
    public void receiveB(Message message, Channel channel) {
        // 获取消息
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(), msg);
    }

}

  

控制层

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.controller;

import com.example.enums.DelayTypeEnum;
import com.example.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMQController {

    @Resource
    private DelayMessageProducer producer;

    @RequestMapping("send")
    public void send(String message, Integer delayType) {
        log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType);
        producer.send(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
    }

}

  

启动

  

  启动项目后,访问 RabbitMQ 控制台,交换机信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409141659189.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409141802131.png

  

  队列信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409141903334.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409142103836.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409142139186.png

  

测试

  

  接下来发送几条消息:

  • http://localhost:8080/rabbitmq/send?message=测试10s延迟处理&delayType=1
  • http://localhost:8080/rabbitmq/send?message=测试60s延迟处理&delayType=2
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409142825557.png

  后台日志记录:

1
2
3
4
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T14:21:07.415630800,消息:测试10s延迟处理,延迟类型:1
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T14:21:10.416759900,消息:测试60s延迟处理,延迟类型:2
c.e.consumer.DeadLetterQueueConsumer     : 当前时间:2021-03-19T14:21:17.419796100,死信队列A收到消息:测试10s延迟处理
c.e.consumer.DeadLetterQueueConsumer     : 当前时间:2021-03-19T14:22:10.424021800,死信队列B收到消息:测试60s延迟处理

  这里只有 10s 和 60s 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列?这样下去每扩展一个延迟任务,就需要额外增加一个队列。怎么解决这个问题?上第二种方案。

  

RabbitMQ 延迟队列优化

  

  方案一的问题可以通过将 TTL 设置在消息属性里来解决,然后添加一个延迟队列,用于接收设置为任意延迟时长的消息,再添加一个相应的死信队列和 routingkey 即可,如下图:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409144234893.png

  为了保留之前的案例代码,我们创建一个新的项目 delay-message-demo02,项目依赖和配置文件与第一个项目相同。

  

RabbitMQ 配置类

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Configuration
public class RabbitMQConfiguration {

    // 延迟交换机
    public static final String DELAY_EXCHANGE = "delay.exchange";
    // 延迟队列
    public static final String DELAY_QUEUE = "delay.queue";
    // 延迟队列路由Key
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routingkey";
    // 死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    // 死信队列
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    // 死信队列路由Key
    public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.routingkey";

    // 声明延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }

    // 声明死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明延迟队列,不设置消息TTL存活时间,并绑定到对应的死信交换机
    @Bean("delayQueue")
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 声明队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明队列的死信路由Key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build();
    }

    // 声明延迟队列的绑定关系
    @Bean
    public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
                                @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
    }

    // 声明死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    // 声明死信队列的绑定关系
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);
    }

}

  

消息生产者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import static com.example.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.example.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@Component
public class DelayMessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String message, String delayTime) {
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
            // 设置消息的到期时间
            msg.getMessageProperties().setExpiration(delayTime);
            return msg;
        });
    }

}

  

消息消费者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.example.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static com.example.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    // 监听死信队列
    @RabbitListener(queues = DEAD_LETTER_QUEUE)
    public void receive(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(), msg);
    }

}

  

控制层

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.controller;

import com.example.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDateTime;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMQController {

    @Resource
    private DelayMessageProducer producer;

    @RequestMapping("send")
    public void send(String message, String delayTime) {
        log.info("当前时间:{},消息:{},延迟时间:{}", LocalDateTime.now(), message, delayTime);
        producer.send(message, delayTime);
    }

}

  

启动

  

  启动项目后,访问 RabbitMQ 控制台,交换机信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409141659189.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409144926848.png

  

  队列信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409145014927.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409145101520.png

  

测试

  

  接下来发送几条消息:

  • http://localhost:8080/rabbitmq/send?message=测试60s延迟处理&delayTime=60000
  • http://localhost:8080/rabbitmq/send?message=测试10s延迟处理&delayTime=10000
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409145412164.png

  后台日志记录:

1
2
3
4
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T15:01:16.087730100,消息:测试60s延迟处理,延迟时间:60000
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T15:01:18.209655400,消息:测试10s延迟处理,延迟时间:10000
c.e.consumer.DeadLetterQueueConsumer     : 当前时间:2021-03-19T15:02:16.099414200,死信队列收到消息:测试60s延迟处理
c.e.consumer.DeadLetterQueueConsumer     : 当前时间:2021-03-19T15:02:16.099414200,死信队列收到消息:测试10s延迟处理

  消息的确过期被消费了,但是时间出了问题,原本希望 10s 被处理的消息却在 60s 后才被消费。这是因为消息属性设置 TTL 的方式,消息可能并不会按时过期。因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信交换机,所以如果第一个消息的延迟时长很长,而第二个消息的延迟时长较短,那么第二个消息是不会被优先得到处理的。怎么解决这个问题?上第三种方案。

  

RabbitMQ 延迟队列实现(插件版)

  

  方案二的问题可以通过安装 RabbitMQ 的社区插件 rabbitmq_delayed_message_exchange 来解决。

  安装插件后会生成新的 Exchange 类型 x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,而是存储在 mnesia(一个分布式数据库) 中,随后监测消息延迟时间,如达到可投递时间时将其通过 x-delayed-type 类型标记的交换机投递至目标队列。

  最终效果如下图:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409154951173.png

  为了保留之前的案例代码,我们创建一个新的项目 delay-message-demo03,项目依赖和配置文件与第一个项目相同。

  

准备工作

  

下载插件

  

  访问:https://www.rabbitmq.com/community-plugins.html 下载插件。

  插件 Github 地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

  插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409150716133.png

  下载以后得到一个 rabbitmq_delayed_message_exchange-3.8.0.ez 压缩文件,将解压的文件上传到 RabbitMQ 安装目录下的 plugins 目录中。

  

安装插件

  

  这里顺便教大家如何在 Linux 系统中根据软件包查找安装文件。

  查看已安装的 rabbitmq 的包名 rpm -qa | grep rabbitmq

1
2
[root@localhost ~]# rpm -qa | grep rabbitmq
rabbitmq-server-3.8.14-1.el7.noarch

  查看 rabbitmq 软件包安装了哪些文件 rpm -ql rabbitmq-server-3.8.14-1.el7.noarch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[root@localhost ~]# rpm -ql rabbitmq-server-3.8.14-1.el7.noarch
/etc/logrotate.d/rabbitmq-server
/etc/profile.d/rabbitmqctl-autocomplete.sh
/etc/rabbitmq
...
...
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins/README
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins/accept-0.3.5.ez
...
...

  ... 表示查询结果中我省略的部分,相信大家已经知道 plugins 目录在哪里了,现在将解压的文件上传到 RabbitMQ 安装目录下的 plugins 目录中。

  查看:ls /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409153203734.png

  

启动插件

  

  重启 RabbitMQ,输入以下命令加载 rabbitmq_delayed_message_exchange 插件:

1
2
3
4
# 重启 RabbitMQ
systemctl restart rabbitmq-server.service
# 启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  

RabbitMQ 配置类

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Configuration
public class RabbitMQConfiguration {

    // 延迟交换机
    public static final String DELAY_EXCHANGE = "delay.exchange";
    // 延迟队列
    public static final String DELAY_QUEUE = "delay.queue";
    // 延迟队列路由Key
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routingkey";

    // 声明延迟队列
    @Bean("delayQueue")
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE);
    }

    // 声明延迟交换机,延迟消息由 rabbitmq_delayed_message_exchange 插件实现
    /*
        安装插件后会生成新的 Exchange 类型 x-delayed-message,
        该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,
        而是存储在 mnesia(一个分布式数据库) 中,
        随后监测消息延迟时间,如达到可投递时间时将其通过 x-delayed-type 类型标记的交换机投递至目标队列。
     */
    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    // 声明延迟队列的绑定关系
    @Bean
    public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
                                @Qualifier("delayExchange") CustomExchange delayExchange) {
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
    }

}

  

消息生产者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import static com.example.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.example.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@Component
public class DelayMessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String message, Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
            // 设置消息的延迟时间
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

}

  

消息消费者

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.example.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static com.example.config.RabbitMQConfiguration.DELAY_QUEUE;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    // 监听延迟队列
    @RabbitListener(queues = DELAY_QUEUE)
    public void receive(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(), msg);
    }

}

  

控制层

  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.controller;

import com.example.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDateTime;

/**
 * @author 哈喽沃德先生
 * @微信公众号 哈喽沃德先生
 * @website https://mrhelloworld.com
 * @wechat 124059770
 */
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQController {

    @Resource
    private DelayMessageProducer producer;

    @RequestMapping("send")
    public void send(String message, Integer delayTime) {
        log.info("当前时间:{},消息:{},延迟时间:{}", LocalDateTime.now(), message, delayTime);
        producer.send(message, delayTime);
    }

}

  

启动

  

  启动项目后,访问 RabbitMQ 控制台,交换机信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409154411867.png
https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409154438699.png

  

  队列信息如下:

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409154520615.png

  

测试

  

  接下来发送几条消息:

  • http://localhost:8080/rabbitmq/send?message=测试60s延迟处理&delayTime=60000
  • http://localhost:8080/rabbitmq/send?message=测试10s延迟处理&delayTime=10000

  后台日志记录:

1
2
3
4
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T15:57:18.934993500,消息:测试60s延迟处理,延迟时间:60000
c.example.controller.RabbitMQController  : 当前时间:2021-03-19T15:57:22.227431100,消息:测试10s延迟处理,延迟时间:10000
com.example.consumer.DelayQueueConsumer  : 当前时间:2021-03-19T15:57:32.232044300,延迟队列收到消息:测试10s延迟处理
com.example.consumer.DelayQueueConsumer  : 当前时间:2021-03-19T15:58:18.93793800,延迟队列收到消息:测试60s延迟处理

  10s 的延迟消息被消费,注意我画横线的时间部分。

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409160033556.png

  60s 的延迟消息被消费,注意我画横线的时间部分。

https://mrhelloworld.com/resources/articles/why/rabbitmq/image-20210409160112609.png

  通过测试,发现第二个延迟较短的消息被先消费掉,符合预期。

  延迟队列在需要延迟处理的场景下非常有用,使用 RabbitMQ 来实现延迟队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃等。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。

  至此,RabbitMQ 实现延迟队列就讲到这里。下一篇我们通过 Spring Boot + RabbitMQ + Vue 实现一个简易版的订单超时处理系统,方便大家理解其背后的原理。

温馨提示:本案例图文并茂,代码齐全,大家只需要跟着文章一步步学习即可实现。想要一步到位直接获取代码的同学,请关注微信公众号「哈喽沃德先生」回复 ordertimeout 即可

  

参考

  

https://mrhelloworld.com/resources/articles/articles_bottom/end02.gif

本文采用 知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议

大家可以通过 分类 查看更多关于 RabbitMQ 的文章。

  

🤗 您的点赞转发是对我最大的鼓励和支持。

📢 扫码关注 哈喽沃德先生「文档 + 视频」每篇文章都配有专门视频讲解,学习更轻松噢 ~

https://mrhelloworld.com/resources/mrhelloworld/qrcode/OfficialAccounts500-500.gif
「 感谢支持 」