你有没有思考过一个支付问题:网站用户下单后,如果 30 分钟内没支付,订单就会自动取消。这个功能怎么做?绝大多数人觉得这个功能超级简单,但它其实是一面能照出你真实技术实力的照妖镜。
今天我们通过这个场景,从最直接的定时任务方案开始,逐步演进到使用 Message Queue(消息队列,简称 MQ)的优雅方案。如果你还不了解 MQ,或者想知道 MQ 在实际业务中该怎么用,这篇文章会给你一个清晰的答案。
为什么需要 MQ?
最直接的想法是用 Cron Job(定时任务),每隔一分钟扫描数据库,找出超时的订单并取消:
// ❌ 定时任务方案:每分钟扫描一次数据库
cron.schedule('*/1 * * * *', async () => {
const expiredOrders = await db.query(`
SELECT * FROM orders
WHERE status = 'PENDING' AND created_at < NOW() - INTERVAL '30 minutes'
`);
// ... 批量取消订单
});这种方式在小网站还能对付,但在并发量上去了问题明显:每次全表扫描会让数据库 CPU 飙升,影响正常业务;批量取消订单时会产生锁竞争;而且每分钟扫一次,时效性也不够精确。
这时候,MQ 就派上用场了。它把定时任务的”一次性批量处理”变成”按订单逐个处理”,既削峰填谷,又能精确控制每个订单的超时时间点。
RocketMQ 基础
在开始用 MQ 改造之前,先了解一下 RocketMQ 的核心概念。
架构组成
RocketMQ 由四个核心角色组成:
NameServer(注册中心) 轻量级的服务发现组件,类似于 Zookeeper 但更轻量。它就像是 RocketMQ 的”通讯录”,负责管理 Broker 的地址信息。
需要注意的是,RocketMQ 不是单个服务,而是由多个独立组件组成的分布式系统。部署时需要先启动 NameServer(默认端口 9876),再启动 Broker(默认端口 10911)。Broker 启动后会向 NameServer 注册自己,并定期发送心跳。
你的业务代码连接的是 NameServer 地址,Producer 和 Consumer 通过 NameServer 获取 Broker 列表,然后真正收发消息时是直接和 Broker 通信,不再经过 NameServer。
// 你的业务代码配置 NameServer 地址
const producer = new Producer({
nameServer: 'localhost:9876' // 可以是多个:'host1:9876;host2:9876'
});
// Producer 启动时会从 NameServer 获取 Broker 地址
await producer.start();
// 发送消息时,直接和 Broker 通信
await producer.send({
topic: 'order-events',
body: JSON.stringify({ orderId: 123 })
});Broker(消息存储) 真正存储和转发消息的服务器。Broker 会向 NameServer 注册自己,接收 Producer 发来的消息,存储到磁盘,并在 Consumer 请求时返回消息。一个 RocketMQ 集群可以有多个 Broker,每个 Broker 可以存储多个 Topic 的数据。
Producer(生产者) 发送消息的应用程序。你的业务代码通过 Producer 向指定 Topic 发送消息。
Consumer(消费者) 接收消息的应用程序。Consumer 订阅 Topic,从 Broker 拉取消息进行处理。
graph LR P(Producer) -->|1. 发送消息| B(Broker) B -->|2. 存储消息| D[(磁盘)] C(Consumer) -->|3. 拉取消息| B P -.->|注册/心跳| N[NameServer] B -.->|注册/心跳| N C -.->|获取路由| N
Topic 和 Message
Topic(主题)
消息的分类标签,就像快递的”类别”。比如 order-created、payment-success 等。Producer 发消息时指定 Topic,Consumer 订阅 Topic 接收消息。
Message(消息) 实际传输的数据,包含 body(消息体)、tag(子标签)、key(业务标识)等。
await producer.send({
topic: 'order-events',
body: JSON.stringify({ orderId: 123, amount: 99.9 }),
tag: 'ORDER_CREATED', // 可选,用于过滤
key: 'order-123' // 可选,用于查询
});Consumer Group
多个 Consumer 可以组成一个 Consumer Group(消费者组),同一个 Group 内的 Consumer 会负载均衡地消费消息。比如一个 Topic 有 4 个队列,2 个 Consumer,每个 Consumer 消费 2 个队列。
const consumer = new Consumer({
nameServer: 'localhost:9876',
groupId: 'order-consumer-group' // 同一个 group 的 consumer 会分摊消息
});
consumer.subscribe('order-events', async (msg) => {
// 处理消息
});方案一:延迟消息
现在用 MQ 来改造。RocketMQ 支持延迟消息,可以指定消息在未来某个时间点被消费。
延迟消息的使用
RocketMQ 支持 18 个固定延迟级别,不支持任意时间延迟:
// 延迟级别对应时间
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
await producer.send({
topic: 'delayed-task',
body: JSON.stringify({ taskId: 456 }),
delayLevel: 16 // 对应 30 分钟
});实现方案
下单成功后,发一个 30 分钟的延迟消息。时间一到,Consumer 拿到消息再去检查状态,未支付就取消:
const producer = new Producer({ nameServer: 'localhost:9876' });
app.post('/orders', async (req, res) => {
const client = await db.connect();
try {
await client.query('BEGIN');
// 创建订单
const result = await client.query(
'INSERT INTO orders (...) VALUES (...) RETURNING id',
[req.body.userId, req.body.amount, 'PENDING']
);
// 发送 30 分钟延迟消息
await producer.send({
topic: 'order-timeout-check',
body: JSON.stringify({ orderId: result.rows[0].id }),
delayLevel: 16 // 30 分钟
});
await client.query('COMMIT');
res.json({ orderId: result.rows[0].id });
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
});这个方案解放了数据库,每次只处理单个订单,避免了锁竞争。但有个业务痛点:绝大部分订单在下单后很快就支付了,死等 30 分钟才去处理消息,浪费了系统资源。
优化:阶梯式检查
可以改成阶梯式检查:先发一个 30 秒的短周期消息,没支付?再发 1 分钟的,还没支付?发 5 分钟的…
// 阶梯检查:快速感知支付状态
const DELAY_LEVELS = [
{ level: 3, minutes: 0.5 }, // 30秒
{ level: 4, minutes: 1 }, // 1分钟
// ... 更多级别
];
async function sendNextCheck(orderId, currentIndex = 0) {
const order = await db.query('SELECT status FROM orders WHERE id = $1', [orderId]);
if (order.rows[0].status === 'PAID') return;
if (currentIndex >= DELAY_LEVELS.length) {
await cancelOrder(orderId);
return;
}
// 发送下一个延迟消息
await producer.send({
topic: 'order-timeout-check',
body: JSON.stringify({ orderId, nextIndex: currentIndex + 1 }),
delayLevel: DELAY_LEVELS[currentIndex].level
});
}这个方案能快速感知支付状态,但代价是消息量暴增(原本一个订单一条消息,现在变成多条),MQ 的存储压力成倍增加。
方案二:事务消息
既想用 MQ 削峰,又要灵活控制检查频率,还得避免消息爆炸,怎么办?这时候,RocketMQ 的事务消息就派上用场了。
事务消息的工作原理
事务消息用于保证本地事务和消息发送的一致性。它的工作流程是:
- 发送半消息:Producer 发送一个”准备”状态的消息到 Broker,Consumer 此时看不到
- 执行本地事务:Producer 执行本地数据库操作
- 返回事务状态:
COMMIT:本地事务成功,Broker 将消息标记为可消费ROLLBACK:本地事务失败,Broker 丢弃消息UNKNOWN:状态不确定,触发回查机制
回查机制(重点)
如果 Producer 返回 UNKNOWN,Broker 会定期(默认 60 秒)回调 Producer 的 setTransactionCheckListener 方法,询问本地事务的最终状态。这个机制最多回查 15 次。
const producer = new TransactionProducer({
nameServer: 'localhost:9876',
groupId: 'my-group'
});
// 设置本地事务执行器
producer.setLocalTransactionExecutor(async (msg) => {
try {
// 执行本地事务(如创建订单)
await db.query('INSERT INTO orders ...');
return TransactionStatus.UNKNOWN; // 故意返回 UNKNOWN 触发回查
} catch (err) {
return TransactionStatus.ROLLBACK;
}
});
// 设置回查监听器:Broker 会定期调用这个方法
producer.setTransactionCheckListener(async (msg) => {
const order = await db.query('SELECT status FROM orders WHERE id = ?');
if (order.status === 'PAID') {
return TransactionStatus.ROLLBACK; // 已支付,丢弃消息
}
if (isTimeout(order)) {
return TransactionStatus.COMMIT; // 超时,提交消息让消费者处理
}
return TransactionStatus.UNKNOWN; // 继续等待,下次再回查
});巧用回查机制实现定时轮询
我们可以”套用”事务消息的回查机制来做定时轮询:
完整实现
核心思路:故意返回 UNKNOWN 状态,让 Broker 定期回查订单状态,一条消息就搞定了阶梯检查的需求。
graph TD A(用户下单) --> B(发送事务半消息) B --> C(创建订单) C --> D(返回 UNKNOWN 状态) D --> E(Broker 定期回查) E --> F{检查订单状态} F -->|已支付| G(返回 ROLLBACK, 丢弃消息) F -->|未支付且未满30分| H(返回 UNKNOWN, 继续回查) F -->|未支付且已满30分| I(返回 COMMIT, 消费者取消订单)
const producer = new TransactionProducer({
nameServer: 'localhost:9876',
groupId: 'order-transaction-group'
});
// 1. 本地事务执行器:创建订单
producer.setLocalTransactionExecutor(async (msg) => {
const { userId, amount } = JSON.parse(msg.body);
try {
await db.query(
'INSERT INTO orders (...) VALUES (...) RETURNING id',
[userId, amount, 'PENDING']
);
// 故意返回 UNKNOWN,让 MQ 定期回查
return TransactionStatus.UNKNOWN;
} catch (err) {
return TransactionStatus.ROLLBACK;
}
});
// 2. 事务状态回查:MQ 定期调用,检查订单状态
producer.setTransactionCheckListener(async (msg) => {
const { orderId } = JSON.parse(msg.body);
const result = await db.query(
'SELECT status, created_at FROM orders WHERE id = $1',
[orderId]
);
if (result.rows.length === 0) {
return TransactionStatus.ROLLBACK;
}
const order = result.rows[0];
// 已支付,终止流程
if (order.status === 'PAID') {
return TransactionStatus.ROLLBACK;
}
const elapsed = Date.now() - new Date(order.created_at).getTime();
const isTimeout = elapsed >= 30 * 60 * 1000;
// 未支付但还没到 30 分钟,返回 UNKNOWN 让 MQ 继续回查
if (!isTimeout) {
return TransactionStatus.UNKNOWN;
}
// 超时未支付,提交消息,放行给下游消费者执行取消
return TransactionStatus.COMMIT;
});
// 3. 下单接口
app.post('/orders', async (req, res) => {
await producer.sendTransaction({
topic: 'order-transaction',
body: JSON.stringify(req.body)
});
res.json({ success: true });
});
// 4. 消费者:真正执行取消订单
consumer.subscribe('order-transaction', async (msg) => {
const { orderId } = JSON.parse(msg.body);
await cancelOrderWithVerification(orderId);
});这个方案的优势在于,把定时轮询的压力转嫁给了 MQ 的回查机制,代码层面只需关注核心业务逻辑。
生产环境的必备环节:第三方对账
不管用哪种 MQ 方案,在真实的生产环境中,还有一个关键环节不能忽略:第三方对账。
在支付场景下,永远不要只相信本地数据库。支付动作在微信、支付宝等第三方平台完成,用户网络可能卡顿,支付平台的回调请求也可能延迟或丢失。
如果 MQ 到点触发了取消订单,但用户其实已经付过钱了,这就是严重的资损事故。
// 完整的取消订单流程
async function cancelOrderWithVerification(orderId) {
const client = await db.connect();
try {
await client.query('BEGIN');
const result = await db.query(
'SELECT status, payment_transaction_id FROM orders WHERE id = $1 FOR UPDATE',
[orderId]
);
const order = result.rows[0];
// 本地已支付,直接返回
if (order.status === 'PAID') {
await client.query('ROLLBACK');
return;
}
// 必须主动调用第三方支付平台确认
const paymentStatus = await axios.get(
`https://payment-api.example.com/query/${order.payment_transaction_id}`
);
// 第三方显示已支付,更新本地状态
if (paymentStatus.data.status === 'SUCCESS') {
await db.query('UPDATE orders SET status = $1 WHERE id = $2', ['PAID', orderId]);
await client.query('COMMIT');
return;
}
// 确认第三方也未支付,才真正取消订单
await db.query('UPDATE orders SET status = $1 WHERE id = $2', ['CANCELLED', orderId]);
// 回滚库存等操作...
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}在取消订单前,必须主动调用支付平台的 API 查询真实状态,跨网络对账后才能真正取消。
总结
通过订单超时取消这个场景,我们看到了 RocketMQ 在实际业务中的应用:
技术选型
- 定时任务:适合小规模场景,但高并发下会拖垮数据库
- 延迟消息:削峰填谷,解放数据库,但时效性受限
- 阶梯检查:快速感知状态变化,但消息量暴增
- 事务消息:巧用回查机制,一条消息搞定定时轮询
使用场景 MQ 不只是用来传消息,它的延迟消息和事务消息特性,能优雅地解决定时任务、异步处理、削峰填谷等场景问题。关键是理解 MQ 的机制,选择合适的方案。
生产实践 真实的生产环境中,除了 MQ 方案,还需要考虑第三方对账、幂等性、消息重试等细节,这才是完整的解决方案。