|

Redis 实战篇 - 消息队列模块学习笔记

p072 - 实战篇-25.Redis消息队列-认识消息队列

为什么需要消息队列

基于JVM阻塞队列实现异步秒杀存在两个核心问题:

  1. JVM内存限制:高并发时订单量可能超出JVM阻塞队列上限
  2. 数据安全问题:JVM内存无持久化机制,服务重启或宕机时队列中的任务全部丢失;处理中的任务异常后也无法恢复

消息队列(Message Queue)概念

消息队列是存放消息的队列,至少包含三个角色:

角色 说明
生产者(Producer) 消息的发送方
消息队列(Message Queue) 存储和管理消息
消费者(Consumer) 消息的接收方

消息队列也称为 消息代理(Message Broker),功能远不止存储,更像是快递公司:揽件 → 运输 → 保管 → 投递。

生活类比——快递柜: - 没有快递柜:快递员送货上门,收件人不在 → 两头都难受,效率低下 - 有快递柜:快递员10分钟投完一上午的货,收件人有空再去取 → 解耦,提高效率

消息队列的核心价值

  1. 解耦:生产者和消费者不需要相互等待
  2. 提高效率:异步处理模式,系统各模块按自身节奏工作
  3. 数据安全:消息持久化 + 消费确认机制,确保至少被消费一次

消息队列与JVM阻塞队列的区别

对比项 JVM阻塞队列 消息队列
位置 JVM内存内 独立服务,JVM之外
存储上限 受JVM内存限制 不受JVM限制
持久化 支持,数据不丢失
消费确认 要求消费者确认,未确认则保留消息

Redis实现消息队列的三种方式

  1. 基于List结构:利用链表模拟阻塞队列
  2. 基于PubSub:发布订阅模式,Redis 2.0引入
  3. 基于Stream:Redis 5.0引入的全新数据类型,功能最完善

p073 - 实战篇-26.Redis消息队列-基于List实现消息队列

基本原理

Redis的List是双向链表,要模拟队列效果(先进先出),需保证入口和出口不在同一侧

  • 方案一LPUSH + RPOP(左进右出)
  • 方案二RPUSH + LPOP(右进左出)

阻塞式读取

RPOP / LPOP非阻塞的,没有消息时直接返回空。要实现阻塞队列效果,必须使用:

  • BRPOP key timeout:阻塞式移除并获取最后一个元素
  • BLPOP key timeout:阻塞式移除并获取第一个元素

B = Block(阻塞),timeout 为阻塞等待时长(秒),0 表示永久等待。

演示示例

# 消费者端:阻塞监听名为 myqueue 的队列,最多等20秒
BRPOP myqueue 20

# 生产者端:向队列左侧添加消息
LPUSH myqueue "message1" "message2"

# 消费者端立即收到:["myqueue", "message1"](先进先出)

优缺点总结

优点: 1. 利用Redis存储,不受JVM内存上限限制 ✅ 2. Redis支持持久化,数据安全性有保障 ✅ 3. 基于List有序性,保证消息有序(先进先出) ✅

缺点: 1. 无法避免消息丢失RPOP 是 remove and get,消费者取出消息后、处理完成前如果宕机,该消息丢失 ❌ 2. 仅支持单消费模式:一条消息被一个消费者取走后,其他消费者无法获取 ❌


p074 - 实战篇-27.Redis消息队列-PubSub实现消息队列

基本概念

PubSub = Publish / Subscribe(发布/订阅),Redis 2.0引入的消息传递模型。

消费者可以订阅一个或多个 channel(频道),生产者向频道发送消息时,所有订阅者都能收到

核心命令

# 订阅频道(支持同时订阅多个)
SUBSCRIBE channel1 channel2

# 向频道发送消息
PUBLISH channel1 "hello"

# 模式订阅(支持通配符)
PSUBSCRIBE order.*    # 订阅所有以 order. 开头的频道

通配符规则

通配符 含义 示例
? 匹配1个字符 h?llo → hello, hxllo
* 匹配0或多个字符 h*llo → hllo, hello, hxxxxllo
[ab] 匹配指定字符之一 h[ab]llo → hallo, hbllo

演示示例

# 消费者1:订阅 order.q1
SUBSCRIBE order.q1

# 消费者2:模式订阅 order.*
PSUBSCRIBE order.*

# 生产者:向 order.q1 发消息 → 两个消费者都收到
PUBLISH order.q1 "hello"

# 生产者:向 order.q2 发消息 → 只有消费者2收到(通配符匹配)
PUBLISH order.q2 "hello"

优缺点总结

优点: - 支持发布订阅模式,一条消息可被多个消费者接收 ✅ - 多生产者多消费者,灵活度高 ✅ - 天生阻塞式(SUBSCRIBE 后自动阻塞等待消息) ✅

缺点: 1. 不支持数据持久化:消息发出后如果没有订阅者,消息直接丢失 ❌ 2. 消息丢失问题:同上,无持久化则无安全保障 ❌ 3. 消息堆积有上限:消息缓存在消费者端的缓冲区中,超出缓冲区上限则丢失 ❌

⚠️ PubSub 缺陷较多,不建议用于对可靠性要求较高的场景。如果需要可靠性,List 结构反而更好。


p075 - 实战篇-28.Redis消息队列-Stream的单消费模式

Stream 简介

Stream 是 Redis 5.0 引入的全新数据类型(与String、List、Hash、Set并列),专门为消息队列设计。

  • 支持数据持久化(本质是数据存储结构)✅
  • 功能最完善,是Redis提供的最佳消息队列方案 ✅

核心命令:XADD(发送消息)

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] [LIMIT count] <*|id> field value [field value ...]

参数说明:

参数 说明
key 队列名称
NOMKSTREAM 队列不存在时是否自动创建(默认自动创建)
MAXLEN 队列最大消息数量,超出后旧消息被淘汰
* ID,* 表示由Redis自动生成
field value 消息体,一个键值对称为一个 entry(条目)

ID格式: <毫秒时间戳>-<序列号> - 同一毫秒内:时间戳相同,序列号递增 - 不同毫秒:时间戳不同,序列号从0开始

# 最简用法:向 stream.orders 队列发送消息
XADD stream.orders * userId 1001 voucherId 20 orderId 10001

# 返回结果:自动生成的ID,如 "1711353600000-0"

核心命令:XREAD(读取消息)

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

参数说明:

参数 说明
COUNT 每次读取的最大消息数
BLOCK 阻塞等待时长(毫秒),0 表示永久阻塞
STREAMS 指定队列名称(可多个)
ID 起始读取位置,0 从第一条开始,$ 从最新一条开始
# 从 stream.orders 队列读取1条消息,无消息时阻塞等待2秒
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $

# 从第一条消息开始读取
XREAD COUNT 2 STREAMS stream.orders 0

单消费模式演示

# 消费者1:读取最新消息
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $

# 消费者2:同时读取(不同消费者读同一队列)
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $

注意: 普通 XREAD 模式下,多个消费者读取同一队列时,每个消费者都能独立读到所有消息(类似广播)。要实现”一条消息只被一个消费者处理”,需要使用消费者组模式

Stream 单消费模式特点

  • 支持阻塞读取 ✅
  • 支持持久化 ✅
  • 消息有唯一ID,可追溯 ✅
  • 但:默认是广播模式,非队列消费模式

p076 - 实战篇-29.Redis消息队列-Stream的消费者组模式

消费者组概念

消费者组(Consumer Group) 是Stream的核心特性,实现了真正的队列消费语义:

  • 同组内的多个消费者竞争消费,一条消息只会被组内一个消费者处理
  • 不同消费者组之间独立消费,互不影响
  • 支持消息确认(ACK)机制,确保消息被成功处理

核心命令

1. 创建消费者组

XGROUP CREATE key groupname id|$
# 示例:创建队列 stream.orders 和消费者组 group1,从头开始消费
XGROUP CREATE stream.orders group1 0 MKSTREAM

MKSTREAM:队列不存在时自动创建

2. 消费者组读取消息

XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK ms] [NOACK] STREAMS key [key ...] ID [ID ...]
参数 说明
GROUP groupname consumername 指定组名和消费者名
COUNT 读取数量
BLOCK 阻塞时长(毫秒)
NOACK 不需要确认(自动ACK)
ID > 表示读取未被消费的新消息;0 表示读取 pending list 中的消息
# 消费者 c1 在 group1 中读取未消费消息
XREADGROUP GROUP group1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >

# 读取 pending list 中的未确认消息
XREADGROUP GROUP group1 c1 COUNT 1 STREAMS stream.orders 0

3. 消息确认

XACK key groupname id [id ...]
# 示例:确认消息ID为 1711353600000-0 的消息已被处理
XACK stream.orders group1 1711353600000-0

消息生命周期

生产者 XADD → 队列 → 消费者 XREADGROUP(>) 读取 → 处理业务 → XACK 确认 → 消息移除
                                    ↓ (处理异常,未ACK)
                              Pending List → XREADGROUP(0) 重新读取 → 再次处理

优缺点总结

优点: 1. 消息可追溯:通过ID可以回溯历史消息 ✅ 2. 支持消费者组模式,一条消息只被组内一个消费者处理 ✅ 3. 消息确认机制:确保消息至少被消费一次 ✅ 4. Pending List:异常消息可重新消费 ✅ 5. 支持阻塞式读取 ✅

缺点: - 相比专业MQ(RabbitMQ、RocketMQ),在以下方面仍有不足: - 持久化依赖Redis自身持久化机制,有丢失风险 - 只支持消费者确认,不支持生产者确认 - 缺乏事务机制、多消费者下的严格有序性等高级特性

💡 适用场景:中小型企业,对消息队列要求不是极其严格时,Stream 完全能满足业务需要。


p077 - 实战篇-30.Redis消息队列-基于Stream消息队列实现异步秒杀

需求概述

用 Stream 消息队列替代之前的 JVM 阻塞队列,实现异步秒杀。

实现步骤

Step 1:创建 Stream 队列和消费者组

# 在命令行执行,项目启动前完成
XGROUP CREATE stream.orders g1 0 MKSTREAM

Step 2:改造 Lua 脚本——在脚本中直接发送消息

Lua脚本改造要点:

  • 新增第三个参数:orderId
  • 判断有购买资格后,直接在Lua中调用 XADD 发送消息到 Stream 队列
  • 减少一次 Java 与 Redis 的交互,效率更高
-- 参数:1=优惠券ID, 2=用户ID, 3=订单ID
-- 判断库存和一人一单...

-- 有资格:扣减库存 + 发送消息
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 1  -- 成功

Step 3:改造业务逻辑——简化下单流程

// 秒杀下单业务(改造后)
public Result seckillVoucher(Long voucherId) {
    Long userId = UserHolder.getUser().getId();
    long orderId = redisIdWorker.nextId("order");
    
    // 1. 执行Lua脚本(判断资格 + 发送消息)
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    
    // 2. 判断结果
    if (result != 0) {
        return Result.fail(result == 1 ? "库存不足" : "不能重复下单");
    }
    
    // 3. 资格通过,消息已发送,直接返回订单ID
    return Result.ok(orderId);
}

Step 4:独立线程消费消息——完成下单

// 线程任务:循环读取消息并处理
private class StreamOrderHandler implements Runnable {
    private static final String QUEUE_NAME = "stream.orders";
    private static final String GROUP_NAME = "g1";
    private static final String CONSUMER_NAME = "c1";
    
    @Override
    public void run() {
        while (true) {
            try {
                // 1. 读取消息(阻塞2秒)
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from(GROUP_NAME, CONSUMER_NAME),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
                );
                
                // 2. 判断是否获取到消息
                if (list == null || list.isEmpty()) {
                    continue;  // 无消息,继续下一次循环
                }
                
                // 3. 解析消息中的订单信息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> valueMap = record.getValue();
                VoucherOrder order = BeanUtil.fillBeanWithMap(valueMap, new VoucherOrder(), true);
                
                // 4. 创建订单
                handleVoucherOrder(order);
                
                // 5. ACK确认
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
                
            } catch (Exception e) {
                // 处理异常:从 pending list 中重新读取
                handlePendingList();
            }
        }
    }
    
    // 处理 pending list 中的异常消息
    private void handlePendingList() {
        while (true) {
            try {
                // 读取 pending list(ID=0,无需BLOCK)
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from(GROUP_NAME, CONSUMER_NAME),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
                );
                
                // 无异常消息,结束处理
                if (list == null || list.isEmpty()) {
                    break;
                }
                
                // 有异常消息,处理并确认
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> valueMap = record.getValue();
                VoucherOrder order = BeanUtil.fillBeanWithMap(valueMap, new VoucherOrder(), true);
                handleVoucherOrder(order);
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
                
            } catch (Exception e) {
                log.error("处理pending list异常", e);
                Thread.sleep(20);  // 防止频繁重试
            }
        }
    }
}

整体流程图

┌──────────────────────────────────────────────────────────┐
│                    正常消费流程                            │
│                                                          │
│  用户请求 → Lua脚本(资格判断+XADD) → 返回订单ID           │
│                    ↓                                      │
│         Stream队列(stream.orders)                        │
│                    ↓                                      │
│     独立线程 XREADGROUP(>) → 解析消息 → 创建订单 → XACK   │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│                    异常处理流程                            │
│                                                          │
│  处理消息时异常 → 未ACK → 进入Pending List                │
│                    ↓                                      │
│     handlePendingList → XREADGROUP(0) → 重新处理 → XACK  │
│                    ↓ (再次异常)                           │
│     短暂休眠(20ms) → 继续循环直到处理完毕                  │
└──────────────────────────────────────────────────────────┘

关键注意事项

  1. 队列名称拼写stream.orders 不要写错,否则找不到队列报错
  2. 消费者名建议配置化:多节点部署时,消费者名应不同,避免冲突
  3. > vs 0:XREADGROUP 中 ID 用 > 表示未消费新消息,用 0 表示 pending list
  4. pending list 不需要 BLOCK:读取异常消息时无需阻塞
  5. pending list 不要递归:异常处理中再次异常只需短暂休眠后继续循环
  6. 订单ID参数名:Lua脚本中建议直接用 id(与 VoucherOrder 实体类字段一致),方便 BeanUtil 映射

测试结果

  • 平均响应时间:~110ms(相比阻塞队列方案有大幅提升)
  • 库存无超卖 ✅
  • 订单数量准确 ✅

消息队列三种方式对比总结

特性 List PubSub Stream
数据结构 链表 发布订阅 全新数据类型
持久化 ✅ 支持 ❌ 不支持 ✅ 支持
消息有序性 ✅ 先进先出 ⚠️ 不保证 ✅ 有ID保证
多消费者 ❌ 单消费 ✅ 多消费 ✅ 消费者组模式
消息确认 ❌ 无 ❌ 无 ✅ XACK
阻塞读取 ✅ BRPOP/BLPOP ✅ 天生阻塞 ✅ BLOCK参数
消息追溯 ✅ 通过ID
异常恢复 ✅ Pending List
适用场景 简单队列 广播通知 完整消息队列

推荐选择: - 简单场景、需要持久化 → List - 广播通知、不要求可靠性 → PubSub - 完整消息队列需求 → Stream(首选) - 大型项目、要求极高可靠性 → 专业MQ(RabbitMQ、RocketMQ、Kafka)

评论交流

文章目录