你有没有思考过一个支付问题:网站用户下单后,如果 30 分钟内没支付,订单就会自动取消。这个功能怎么做?绝大多数人觉得这个功能超级简单,但它其实是一面能照出你真实技术实力的照妖镜。

今天我们通过这个场景,从最直接的定时任务方案开始,逐步演进到使用 Message Queue(消息队列,简称 MQ)的优雅方案。如果你还不了解 MQ,或者想知道 MQ 在实际业务中该怎么用,这篇文章会给你一个清晰的答案。

为什么需要 MQ?

最直接的想法是用 Cron Job(定时任务),每隔一分钟扫描数据库,找出超时的订单并取消:

// ❌ 定时任务方案:每分钟扫描一次数据库
cron.schedule('*/1 * * * *', async () => {
  const expiredOrders = await db.query(`
    SELECT * FROM orders 
    WHERE status = 'PENDING' AND created_at < NOW() - INTERVAL '30 minutes'
  `);
  
  // ... 批量取消订单
});

这种方式在小网站还能对付,但在并发量上去了问题明显:每次全表扫描会让数据库 CPU 飙升,影响正常业务;批量取消订单时会产生锁竞争;而且每分钟扫一次,时效性也不够精确。

这时候,MQ 就派上用场了。它把定时任务的”一次性批量处理”变成”按订单逐个处理”,既削峰填谷,又能精确控制每个订单的超时时间点。

RocketMQ 基础

在开始用 MQ 改造之前,先了解一下 RocketMQ 的核心概念。

架构组成

RocketMQ 由四个核心角色组成:

NameServer(注册中心) 轻量级的服务发现组件,类似于 Zookeeper 但更轻量。它就像是 RocketMQ 的”通讯录”,负责管理 Broker 的地址信息。

需要注意的是,RocketMQ 不是单个服务,而是由多个独立组件组成的分布式系统。部署时需要先启动 NameServer(默认端口 9876),再启动 Broker(默认端口 10911)。Broker 启动后会向 NameServer 注册自己,并定期发送心跳。

你的业务代码连接的是 NameServer 地址,Producer 和 Consumer 通过 NameServer 获取 Broker 列表,然后真正收发消息时是直接和 Broker 通信,不再经过 NameServer。

// 你的业务代码配置 NameServer 地址
const producer = new Producer({ 
  nameServer: 'localhost:9876'  // 可以是多个:'host1:9876;host2:9876'
});
 
// Producer 启动时会从 NameServer 获取 Broker 地址
await producer.start();
 
// 发送消息时,直接和 Broker 通信
await producer.send({
  topic: 'order-events',
  body: JSON.stringify({ orderId: 123 })
});

Broker(消息存储) 真正存储和转发消息的服务器。Broker 会向 NameServer 注册自己,接收 Producer 发来的消息,存储到磁盘,并在 Consumer 请求时返回消息。一个 RocketMQ 集群可以有多个 Broker,每个 Broker 可以存储多个 Topic 的数据。

Producer(生产者) 发送消息的应用程序。你的业务代码通过 Producer 向指定 Topic 发送消息。

Consumer(消费者) 接收消息的应用程序。Consumer 订阅 Topic,从 Broker 拉取消息进行处理。

graph LR
    P(Producer) -->|1. 发送消息| B(Broker)
    B -->|2. 存储消息| D[(磁盘)]
    C(Consumer) -->|3. 拉取消息| B
    P -.->|注册/心跳| N[NameServer]
    B -.->|注册/心跳| N
    C -.->|获取路由| N

Topic 和 Message

Topic(主题) 消息的分类标签,就像快递的”类别”。比如 order-createdpayment-success 等。Producer 发消息时指定 Topic,Consumer 订阅 Topic 接收消息。

Message(消息) 实际传输的数据,包含 body(消息体)、tag(子标签)、key(业务标识)等。

await producer.send({
  topic: 'order-events',
  body: JSON.stringify({ orderId: 123, amount: 99.9 }),
  tag: 'ORDER_CREATED',    // 可选,用于过滤
  key: 'order-123'         // 可选,用于查询
});

Consumer Group

多个 Consumer 可以组成一个 Consumer Group(消费者组),同一个 Group 内的 Consumer 会负载均衡地消费消息。比如一个 Topic 有 4 个队列,2 个 Consumer,每个 Consumer 消费 2 个队列。

const consumer = new Consumer({
  nameServer: 'localhost:9876',
  groupId: 'order-consumer-group'  // 同一个 group 的 consumer 会分摊消息
});
 
consumer.subscribe('order-events', async (msg) => {
  // 处理消息
});

方案一:延迟消息

现在用 MQ 来改造。RocketMQ 支持延迟消息,可以指定消息在未来某个时间点被消费。

延迟消息的使用

RocketMQ 支持 18 个固定延迟级别,不支持任意时间延迟:

// 延迟级别对应时间
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
await producer.send({
  topic: 'delayed-task',
  body: JSON.stringify({ taskId: 456 }),
  delayLevel: 16  // 对应 30 分钟
});

实现方案

下单成功后,发一个 30 分钟的延迟消息。时间一到,Consumer 拿到消息再去检查状态,未支付就取消:

const producer = new Producer({ nameServer: 'localhost:9876' });
 
app.post('/orders', async (req, res) => {
  const client = await db.connect();
  try {
    await client.query('BEGIN');
    
    // 创建订单
    const result = await client.query(
      'INSERT INTO orders (...) VALUES (...) RETURNING id',
      [req.body.userId, req.body.amount, 'PENDING']
    );
    
    // 发送 30 分钟延迟消息
    await producer.send({
      topic: 'order-timeout-check',
      body: JSON.stringify({ orderId: result.rows[0].id }),
      delayLevel: 16  // 30 分钟
    });
    
    await client.query('COMMIT');
    res.json({ orderId: result.rows[0].id });
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
});

这个方案解放了数据库,每次只处理单个订单,避免了锁竞争。但有个业务痛点:绝大部分订单在下单后很快就支付了,死等 30 分钟才去处理消息,浪费了系统资源。

优化:阶梯式检查

可以改成阶梯式检查:先发一个 30 秒的短周期消息,没支付?再发 1 分钟的,还没支付?发 5 分钟的…

// 阶梯检查:快速感知支付状态
const DELAY_LEVELS = [
  { level: 3, minutes: 0.5 },   // 30秒
  { level: 4, minutes: 1 },     // 1分钟
  // ... 更多级别
];
 
async function sendNextCheck(orderId, currentIndex = 0) {
  const order = await db.query('SELECT status FROM orders WHERE id = $1', [orderId]);
  
  if (order.rows[0].status === 'PAID') return;
  
  if (currentIndex >= DELAY_LEVELS.length) {
    await cancelOrder(orderId);
    return;
  }
  
  // 发送下一个延迟消息
  await producer.send({
    topic: 'order-timeout-check',
    body: JSON.stringify({ orderId, nextIndex: currentIndex + 1 }),
    delayLevel: DELAY_LEVELS[currentIndex].level
  });
}

这个方案能快速感知支付状态,但代价是消息量暴增(原本一个订单一条消息,现在变成多条),MQ 的存储压力成倍增加。

方案二:事务消息

既想用 MQ 削峰,又要灵活控制检查频率,还得避免消息爆炸,怎么办?这时候,RocketMQ 的事务消息就派上用场了。

事务消息的工作原理

事务消息用于保证本地事务和消息发送的一致性。它的工作流程是:

  1. 发送半消息:Producer 发送一个”准备”状态的消息到 Broker,Consumer 此时看不到
  2. 执行本地事务:Producer 执行本地数据库操作
  3. 返回事务状态
    • COMMIT:本地事务成功,Broker 将消息标记为可消费
    • ROLLBACK:本地事务失败,Broker 丢弃消息
    • UNKNOWN:状态不确定,触发回查机制

回查机制(重点) 如果 Producer 返回 UNKNOWN,Broker 会定期(默认 60 秒)回调 Producer 的 setTransactionCheckListener 方法,询问本地事务的最终状态。这个机制最多回查 15 次。

const producer = new TransactionProducer({
  nameServer: 'localhost:9876',
  groupId: 'my-group'
});
 
// 设置本地事务执行器
producer.setLocalTransactionExecutor(async (msg) => {
  try {
    // 执行本地事务(如创建订单)
    await db.query('INSERT INTO orders ...');
    return TransactionStatus.UNKNOWN;  // 故意返回 UNKNOWN 触发回查
  } catch (err) {
    return TransactionStatus.ROLLBACK;
  }
});
 
// 设置回查监听器:Broker 会定期调用这个方法
producer.setTransactionCheckListener(async (msg) => {
  const order = await db.query('SELECT status FROM orders WHERE id = ?');
  
  if (order.status === 'PAID') {
    return TransactionStatus.ROLLBACK;  // 已支付,丢弃消息
  }
  
  if (isTimeout(order)) {
    return TransactionStatus.COMMIT;    // 超时,提交消息让消费者处理
  }
  
  return TransactionStatus.UNKNOWN;     // 继续等待,下次再回查
});

巧用回查机制实现定时轮询

我们可以”套用”事务消息的回查机制来做定时轮询:

完整实现

核心思路:故意返回 UNKNOWN 状态,让 Broker 定期回查订单状态,一条消息就搞定了阶梯检查的需求。

graph TD
    A(用户下单) --> B(发送事务半消息)
    B --> C(创建订单)
    C --> D(返回 UNKNOWN 状态)
    D --> E(Broker 定期回查)
    E --> F{检查订单状态}
    F -->|已支付| G(返回 ROLLBACK, 丢弃消息)
    F -->|未支付且未满30分| H(返回 UNKNOWN, 继续回查)
    F -->|未支付且已满30分| I(返回 COMMIT, 消费者取消订单)
const producer = new TransactionProducer({
  nameServer: 'localhost:9876',
  groupId: 'order-transaction-group'
});
 
// 1. 本地事务执行器:创建订单
producer.setLocalTransactionExecutor(async (msg) => {
  const { userId, amount } = JSON.parse(msg.body);
  
  try {
    await db.query(
      'INSERT INTO orders (...) VALUES (...) RETURNING id',
      [userId, amount, 'PENDING']
    );
    
    // 故意返回 UNKNOWN,让 MQ 定期回查
    return TransactionStatus.UNKNOWN;
  } catch (err) {
    return TransactionStatus.ROLLBACK;
  }
});
 
// 2. 事务状态回查:MQ 定期调用,检查订单状态
producer.setTransactionCheckListener(async (msg) => {
  const { orderId } = JSON.parse(msg.body);
  
  const result = await db.query(
    'SELECT status, created_at FROM orders WHERE id = $1',
    [orderId]
  );
  
  if (result.rows.length === 0) {
    return TransactionStatus.ROLLBACK;
  }
  
  const order = result.rows[0];
  
  // 已支付,终止流程
  if (order.status === 'PAID') {
    return TransactionStatus.ROLLBACK;
  }
  
  const elapsed = Date.now() - new Date(order.created_at).getTime();
  const isTimeout = elapsed >= 30 * 60 * 1000;
  
  // 未支付但还没到 30 分钟,返回 UNKNOWN 让 MQ 继续回查
  if (!isTimeout) {
    return TransactionStatus.UNKNOWN;
  }
  
  // 超时未支付,提交消息,放行给下游消费者执行取消
  return TransactionStatus.COMMIT;
});
 
// 3. 下单接口
app.post('/orders', async (req, res) => {
  await producer.sendTransaction({
    topic: 'order-transaction',
    body: JSON.stringify(req.body)
  });
  
  res.json({ success: true });
});
 
// 4. 消费者:真正执行取消订单
consumer.subscribe('order-transaction', async (msg) => {
  const { orderId } = JSON.parse(msg.body);
  await cancelOrderWithVerification(orderId);
});

这个方案的优势在于,把定时轮询的压力转嫁给了 MQ 的回查机制,代码层面只需关注核心业务逻辑。

生产环境的必备环节:第三方对账

不管用哪种 MQ 方案,在真实的生产环境中,还有一个关键环节不能忽略:第三方对账。

在支付场景下,永远不要只相信本地数据库。支付动作在微信、支付宝等第三方平台完成,用户网络可能卡顿,支付平台的回调请求也可能延迟或丢失。

如果 MQ 到点触发了取消订单,但用户其实已经付过钱了,这就是严重的资损事故。

// 完整的取消订单流程
async function cancelOrderWithVerification(orderId) {
  const client = await db.connect();
  
  try {
    await client.query('BEGIN');
    
    const result = await db.query(
      'SELECT status, payment_transaction_id FROM orders WHERE id = $1 FOR UPDATE',
      [orderId]
    );
    
    const order = result.rows[0];
    
    // 本地已支付,直接返回
    if (order.status === 'PAID') {
      await client.query('ROLLBACK');
      return;
    }
    
    // 必须主动调用第三方支付平台确认
    const paymentStatus = await axios.get(
      `https://payment-api.example.com/query/${order.payment_transaction_id}`
    );
    
    // 第三方显示已支付,更新本地状态
    if (paymentStatus.data.status === 'SUCCESS') {
      await db.query('UPDATE orders SET status = $1 WHERE id = $2', ['PAID', orderId]);
      await client.query('COMMIT');
      return;
    }
    
    // 确认第三方也未支付,才真正取消订单
    await db.query('UPDATE orders SET status = $1 WHERE id = $2', ['CANCELLED', orderId]);
    
    // 回滚库存等操作...
    
    await client.query('COMMIT');
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

在取消订单前,必须主动调用支付平台的 API 查询真实状态,跨网络对账后才能真正取消。

总结

通过订单超时取消这个场景,我们看到了 RocketMQ 在实际业务中的应用:

技术选型

  • 定时任务:适合小规模场景,但高并发下会拖垮数据库
  • 延迟消息:削峰填谷,解放数据库,但时效性受限
  • 阶梯检查:快速感知状态变化,但消息量暴增
  • 事务消息:巧用回查机制,一条消息搞定定时轮询

使用场景 MQ 不只是用来传消息,它的延迟消息和事务消息特性,能优雅地解决定时任务、异步处理、削峰填谷等场景问题。关键是理解 MQ 的机制,选择合适的方案。

生产实践 真实的生产环境中,除了 MQ 方案,还需要考虑第三方对账、幂等性、消息重试等细节,这才是完整的解决方案。