Redis 实战篇 - 消息队列模块学习笔记
p072 - 实战篇-25.Redis消息队列-认识消息队列
为什么需要消息队列
基于JVM阻塞队列实现异步秒杀存在两个核心问题:
- JVM内存限制:高并发时订单量可能超出JVM阻塞队列上限
- 数据安全问题:JVM内存无持久化机制,服务重启或宕机时队列中的任务全部丢失;处理中的任务异常后也无法恢复
消息队列(Message Queue)概念
消息队列是存放消息的队列,至少包含三个角色:
| 角色 | 说明 |
|---|---|
| 生产者(Producer) | 消息的发送方 |
| 消息队列(Message Queue) | 存储和管理消息 |
| 消费者(Consumer) | 消息的接收方 |
消息队列也称为 消息代理(Message Broker),功能远不止存储,更像是快递公司:揽件 → 运输 → 保管 → 投递。
生活类比——快递柜: - 没有快递柜:快递员送货上门,收件人不在 → 两头都难受,效率低下 - 有快递柜:快递员10分钟投完一上午的货,收件人有空再去取 → 解耦,提高效率
消息队列的核心价值
- 解耦:生产者和消费者不需要相互等待
- 提高效率:异步处理模式,系统各模块按自身节奏工作
- 数据安全:消息持久化 + 消费确认机制,确保至少被消费一次
消息队列与JVM阻塞队列的区别
| 对比项 | JVM阻塞队列 | 消息队列 |
|---|---|---|
| 位置 | JVM内存内 | 独立服务,JVM之外 |
| 存储上限 | 受JVM内存限制 | 不受JVM限制 |
| 持久化 | 无 | 支持,数据不丢失 |
| 消费确认 | 无 | 要求消费者确认,未确认则保留消息 |
Redis实现消息队列的三种方式
- 基于List结构:利用链表模拟阻塞队列
- 基于PubSub:发布订阅模式,Redis 2.0引入
- 基于Stream:Redis 5.0引入的全新数据类型,功能最完善
p073 - 实战篇-26.Redis消息队列-基于List实现消息队列
基本原理
Redis的List是双向链表,要模拟队列效果(先进先出),需保证入口和出口不在同一侧:
- 方案一:
LPUSH+RPOP(左进右出) - 方案二:
RPUSH+LPOP(右进左出)
阻塞式读取
RPOP / LPOP 是非阻塞的,没有消息时直接返回空。要实现阻塞队列效果,必须使用:
BRPOP key timeout:阻塞式移除并获取最后一个元素BLPOP key timeout:阻塞式移除并获取第一个元素
B = Block(阻塞),timeout 为阻塞等待时长(秒),0 表示永久等待。
演示示例
# 消费者端:阻塞监听名为 myqueue 的队列,最多等20秒
BRPOP myqueue 20
# 生产者端:向队列左侧添加消息
LPUSH myqueue "message1" "message2"
# 消费者端立即收到:["myqueue", "message1"](先进先出)优缺点总结
优点: 1. 利用Redis存储,不受JVM内存上限限制 ✅ 2. Redis支持持久化,数据安全性有保障 ✅ 3. 基于List有序性,保证消息有序(先进先出) ✅
缺点: 1. 无法避免消息丢失:RPOP 是 remove and get,消费者取出消息后、处理完成前如果宕机,该消息丢失 ❌ 2. 仅支持单消费模式:一条消息被一个消费者取走后,其他消费者无法获取 ❌
p074 - 实战篇-27.Redis消息队列-PubSub实现消息队列
基本概念
PubSub = Publish / Subscribe(发布/订阅),Redis 2.0引入的消息传递模型。
消费者可以订阅一个或多个 channel(频道),生产者向频道发送消息时,所有订阅者都能收到。
核心命令
# 订阅频道(支持同时订阅多个)
SUBSCRIBE channel1 channel2
# 向频道发送消息
PUBLISH channel1 "hello"
# 模式订阅(支持通配符)
PSUBSCRIBE order.* # 订阅所有以 order. 开头的频道通配符规则
| 通配符 | 含义 | 示例 |
|---|---|---|
? |
匹配1个字符 | h?llo → hello, hxllo |
* |
匹配0或多个字符 | h*llo → hllo, hello, hxxxxllo |
[ab] |
匹配指定字符之一 | h[ab]llo → hallo, hbllo |
演示示例
# 消费者1:订阅 order.q1
SUBSCRIBE order.q1
# 消费者2:模式订阅 order.*
PSUBSCRIBE order.*
# 生产者:向 order.q1 发消息 → 两个消费者都收到
PUBLISH order.q1 "hello"
# 生产者:向 order.q2 发消息 → 只有消费者2收到(通配符匹配)
PUBLISH order.q2 "hello"优缺点总结
优点: - 支持发布订阅模式,一条消息可被多个消费者接收 ✅ - 多生产者多消费者,灵活度高 ✅ - 天生阻塞式(SUBSCRIBE 后自动阻塞等待消息) ✅
缺点: 1. 不支持数据持久化:消息发出后如果没有订阅者,消息直接丢失 ❌ 2. 消息丢失问题:同上,无持久化则无安全保障 ❌ 3. 消息堆积有上限:消息缓存在消费者端的缓冲区中,超出缓冲区上限则丢失 ❌
⚠️ PubSub 缺陷较多,不建议用于对可靠性要求较高的场景。如果需要可靠性,List 结构反而更好。
p075 - 实战篇-28.Redis消息队列-Stream的单消费模式
Stream 简介
Stream 是 Redis 5.0 引入的全新数据类型(与String、List、Hash、Set并列),专门为消息队列设计。
- 支持数据持久化(本质是数据存储结构)✅
- 功能最完善,是Redis提供的最佳消息队列方案 ✅
核心命令:XADD(发送消息)
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] [LIMIT count] <*|id> field value [field value ...]参数说明:
| 参数 | 说明 |
|---|---|
key |
队列名称 |
NOMKSTREAM |
队列不存在时是否自动创建(默认自动创建) |
MAXLEN |
队列最大消息数量,超出后旧消息被淘汰 |
* |
ID,* 表示由Redis自动生成 |
field value |
消息体,一个键值对称为一个 entry(条目) |
ID格式: <毫秒时间戳>-<序列号> - 同一毫秒内:时间戳相同,序列号递增 - 不同毫秒:时间戳不同,序列号从0开始
# 最简用法:向 stream.orders 队列发送消息
XADD stream.orders * userId 1001 voucherId 20 orderId 10001
# 返回结果:自动生成的ID,如 "1711353600000-0"核心命令:XREAD(读取消息)
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]参数说明:
| 参数 | 说明 |
|---|---|
COUNT |
每次读取的最大消息数 |
BLOCK |
阻塞等待时长(毫秒),0 表示永久阻塞 |
STREAMS |
指定队列名称(可多个) |
ID |
起始读取位置,0 从第一条开始,$ 从最新一条开始 |
# 从 stream.orders 队列读取1条消息,无消息时阻塞等待2秒
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $
# 从第一条消息开始读取
XREAD COUNT 2 STREAMS stream.orders 0单消费模式演示
# 消费者1:读取最新消息
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $
# 消费者2:同时读取(不同消费者读同一队列)
XREAD COUNT 1 BLOCK 2000 STREAMS stream.orders $注意: 普通 XREAD 模式下,多个消费者读取同一队列时,每个消费者都能独立读到所有消息(类似广播)。要实现”一条消息只被一个消费者处理”,需要使用消费者组模式。
Stream 单消费模式特点
- 支持阻塞读取 ✅
- 支持持久化 ✅
- 消息有唯一ID,可追溯 ✅
- 但:默认是广播模式,非队列消费模式
p076 - 实战篇-29.Redis消息队列-Stream的消费者组模式
消费者组概念
消费者组(Consumer Group) 是Stream的核心特性,实现了真正的队列消费语义:
- 同组内的多个消费者竞争消费,一条消息只会被组内一个消费者处理
- 不同消费者组之间独立消费,互不影响
- 支持消息确认(ACK)机制,确保消息被成功处理
核心命令
1. 创建消费者组
XGROUP CREATE key groupname id|$
# 示例:创建队列 stream.orders 和消费者组 group1,从头开始消费
XGROUP CREATE stream.orders group1 0 MKSTREAM
MKSTREAM:队列不存在时自动创建
2. 消费者组读取消息
XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK ms] [NOACK] STREAMS key [key ...] ID [ID ...]| 参数 | 说明 |
|---|---|
GROUP groupname consumername |
指定组名和消费者名 |
COUNT |
读取数量 |
BLOCK |
阻塞时长(毫秒) |
NOACK |
不需要确认(自动ACK) |
ID |
> 表示读取未被消费的新消息;0 表示读取 pending list 中的消息 |
# 消费者 c1 在 group1 中读取未消费消息
XREADGROUP GROUP group1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
# 读取 pending list 中的未确认消息
XREADGROUP GROUP group1 c1 COUNT 1 STREAMS stream.orders 03. 消息确认
XACK key groupname id [id ...]
# 示例:确认消息ID为 1711353600000-0 的消息已被处理
XACK stream.orders group1 1711353600000-0消息生命周期
生产者 XADD → 队列 → 消费者 XREADGROUP(>) 读取 → 处理业务 → XACK 确认 → 消息移除
↓ (处理异常,未ACK)
Pending List → XREADGROUP(0) 重新读取 → 再次处理优缺点总结
优点: 1. 消息可追溯:通过ID可以回溯历史消息 ✅ 2. 支持消费者组模式,一条消息只被组内一个消费者处理 ✅ 3. 消息确认机制:确保消息至少被消费一次 ✅ 4. Pending List:异常消息可重新消费 ✅ 5. 支持阻塞式读取 ✅
缺点: - 相比专业MQ(RabbitMQ、RocketMQ),在以下方面仍有不足: - 持久化依赖Redis自身持久化机制,有丢失风险 - 只支持消费者确认,不支持生产者确认 - 缺乏事务机制、多消费者下的严格有序性等高级特性
💡 适用场景:中小型企业,对消息队列要求不是极其严格时,Stream 完全能满足业务需要。
p077 - 实战篇-30.Redis消息队列-基于Stream消息队列实现异步秒杀
需求概述
用 Stream 消息队列替代之前的 JVM 阻塞队列,实现异步秒杀。
实现步骤
Step 1:创建 Stream 队列和消费者组
# 在命令行执行,项目启动前完成
XGROUP CREATE stream.orders g1 0 MKSTREAMStep 2:改造 Lua 脚本——在脚本中直接发送消息
Lua脚本改造要点:
- 新增第三个参数:
orderId - 判断有购买资格后,直接在Lua中调用 XADD 发送消息到 Stream 队列
- 减少一次 Java 与 Redis 的交互,效率更高
-- 参数:1=优惠券ID, 2=用户ID, 3=订单ID
-- 判断库存和一人一单...
-- 有资格:扣减库存 + 发送消息
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 1 -- 成功Step 3:改造业务逻辑——简化下单流程
// 秒杀下单业务(改造后)
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1. 执行Lua脚本(判断资格 + 发送消息)
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2. 判断结果
if (result != 0) {
return Result.fail(result == 1 ? "库存不足" : "不能重复下单");
}
// 3. 资格通过,消息已发送,直接返回订单ID
return Result.ok(orderId);
}Step 4:独立线程消费消息——完成下单
// 线程任务:循环读取消息并处理
private class StreamOrderHandler implements Runnable {
private static final String QUEUE_NAME = "stream.orders";
private static final String GROUP_NAME = "g1";
private static final String CONSUMER_NAME = "c1";
@Override
public void run() {
while (true) {
try {
// 1. 读取消息(阻塞2秒)
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
);
// 2. 判断是否获取到消息
if (list == null || list.isEmpty()) {
continue; // 无消息,继续下一次循环
}
// 3. 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> valueMap = record.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(valueMap, new VoucherOrder(), true);
// 4. 创建订单
handleVoucherOrder(order);
// 5. ACK确认
stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
} catch (Exception e) {
// 处理异常:从 pending list 中重新读取
handlePendingList();
}
}
}
// 处理 pending list 中的异常消息
private void handlePendingList() {
while (true) {
try {
// 读取 pending list(ID=0,无需BLOCK)
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamReadOptions.empty().count(1),
StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
);
// 无异常消息,结束处理
if (list == null || list.isEmpty()) {
break;
}
// 有异常消息,处理并确认
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> valueMap = record.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(valueMap, new VoucherOrder(), true);
handleVoucherOrder(order);
stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
} catch (Exception e) {
log.error("处理pending list异常", e);
Thread.sleep(20); // 防止频繁重试
}
}
}
}整体流程图
┌──────────────────────────────────────────────────────────┐
│ 正常消费流程 │
│ │
│ 用户请求 → Lua脚本(资格判断+XADD) → 返回订单ID │
│ ↓ │
│ Stream队列(stream.orders) │
│ ↓ │
│ 独立线程 XREADGROUP(>) → 解析消息 → 创建订单 → XACK │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 异常处理流程 │
│ │
│ 处理消息时异常 → 未ACK → 进入Pending List │
│ ↓ │
│ handlePendingList → XREADGROUP(0) → 重新处理 → XACK │
│ ↓ (再次异常) │
│ 短暂休眠(20ms) → 继续循环直到处理完毕 │
└──────────────────────────────────────────────────────────┘关键注意事项
- 队列名称拼写:
stream.orders不要写错,否则找不到队列报错 - 消费者名建议配置化:多节点部署时,消费者名应不同,避免冲突
>vs0:XREADGROUP 中 ID 用>表示未消费新消息,用0表示 pending list- pending list 不需要 BLOCK:读取异常消息时无需阻塞
- pending list 不要递归:异常处理中再次异常只需短暂休眠后继续循环
- 订单ID参数名:Lua脚本中建议直接用
id(与 VoucherOrder 实体类字段一致),方便 BeanUtil 映射
测试结果
- 平均响应时间:~110ms(相比阻塞队列方案有大幅提升)
- 库存无超卖 ✅
- 订单数量准确 ✅
消息队列三种方式对比总结
| 特性 | List | PubSub | Stream |
|---|---|---|---|
| 数据结构 | 链表 | 发布订阅 | 全新数据类型 |
| 持久化 | ✅ 支持 | ❌ 不支持 | ✅ 支持 |
| 消息有序性 | ✅ 先进先出 | ⚠️ 不保证 | ✅ 有ID保证 |
| 多消费者 | ❌ 单消费 | ✅ 多消费 | ✅ 消费者组模式 |
| 消息确认 | ❌ 无 | ❌ 无 | ✅ XACK |
| 阻塞读取 | ✅ BRPOP/BLPOP | ✅ 天生阻塞 | ✅ BLOCK参数 |
| 消息追溯 | ❌ | ❌ | ✅ 通过ID |
| 异常恢复 | ❌ | ❌ | ✅ Pending List |
| 适用场景 | 简单队列 | 广播通知 | 完整消息队列 |
推荐选择: - 简单场景、需要持久化 → List - 广播通知、不要求可靠性 → PubSub - 完整消息队列需求 → Stream(首选) - 大型项目、要求极高可靠性 → 专业MQ(RabbitMQ、RocketMQ、Kafka)