最近项目用到 Kafka,从公司主系统同步数据到我们的采销系统。之前没接触过,边做边学,正好梳理一下。

为什么需要 Kafka

我们做的是采销系统。买手创建采购活动(商品、折扣、赠品),审批通过后发布到门店。门店对外预售,客户下单后扣可预定额度,我们把订单推给总部系统去采购。典型的拉式供应链:先收客单再采购,零库存。

商品基础信息(名称、图片、标准价格、门店库存)都来自主系统,需要实时同步过来。我们自己维护折扣、采购计划、可预定额度。

问题来了:主系统商品价格从 999 改成 899,我们系统怎么知道?

传统方案:主系统改完价格,调我们的 HTTP 接口推数据。但是:

  • 主系统要等我们处理完才能继续,太慢
  • 我们系统挂了,主系统也挂着等,还得重试
  • 价格变了要同步数据库,还要重算活动折扣价,两件事耦合在一个接口里

Kafka 方案:主系统把价格变动扔到 Kafka,不等我们处理就继续干活。我们启动两个服务:价格同步服务更新数据库,折扣重算服务重算活动。各干各的,谁挂了都不影响别人。

几个基本概念

Producer 和 Message

Producer(生产者)就是发消息的那一方。在我们系统里,主系统就是 Producer。

Message(消息)就是发的数据。比如茅台价格从 999 改成 899,主系统往 Kafka 扔一条消息:

{
  "productId": "SKU123",
  "eventType": "PRICE_UPDATED",
  "oldPrice": 999,
  "newPrice": 899
}

Topic - 给消息分个类

Topic(主题)就是消息的分类。

为什么要分类?因为不同数据的处理逻辑完全不一样。价格变了要重算折扣,库存变了要发预警,混一起根本没法处理。

我们按数据类型分了三个 Topic:

  • product_info:商品基础信息(上架、改名、换图、下架)
  • product_price:价格变动
  • store_inventory:门店库存变动

价格消息只进 product_price,库存消息只进 store_inventory

Consumer - 干活的

Consumer(消费者)就是接收并处理消息的程序。你在服务器上启动一个进程,这个进程就是一个 Consumer。

比如我们有个”价格同步服务”,启动后就成了一个 Consumer,从 product_price 这个 Topic 里读消息,然后更新数据库。

Consumer Group - 团队协作

Consumer Group(消费者组)是一组 Consumer 的集合。关键规则:一条消息在一个 Group 内只会被一个 Consumer 处理。

比如”价格同步服务”启动了 4 个进程(4 个 Consumer),它们组成一个 Consumer Group 叫 price-sync-group。这 4 个进程会分工合作,每条消息只被其中一个进程处理。

发布/订阅机制:同一条消息可以被多个 Consumer Group 消费,但在一个 Group 内只会被一个 Consumer 处理。

举个例子,product_price 这个 Topic 有两个 Group 在监听:

  • price-sync-group(价格同步服务):更新数据库
  • discount-recalc-group(折扣重算服务):重算活动折扣价

主系统推了条”茅台从 999 改成 899”的消息:

  1. price-sync-group 收到一份,Group 内 4 个 Consumer 中的某一个处理它,更新数据库
  2. discount-recalc-group 也收到一份,Group 内的某个 Consumer 处理它,重算折扣
  3. 两个 Group 互不影响。价格同步服务挂了,折扣重算服务照样工作

这就是发布/订阅。一条消息发出去,所有订阅的 Group 都能收到。

为什么需要 Consumer Group?

  1. 负载均衡:多个 Consumer 分摊压力
  2. 故障转移:一个 Consumer 挂了,其他 Consumer 接管
  3. 发布/订阅:多个 Group 各干各的事

Group ID 划分原则

命名建议:{服务名}-{功能}-group,例如 price-sync-groupdiscount-recalc-group

一个应用该用一个 Group 还是多个 Group?

两种方案:

方案 1:一个 Group,处理多个 Topic,多个不同类型的 Consumer — 小应用最爱。

// 一个应用,一个 Kafka 连接
app.connectMicroservice({
  consumer: { groupId: 'appointment-consumer-group' }
});
 
// 内部不同 Consumer 监听不同 Topic
@Controller()
export class PriceSyncConsumer {
  @MessagePattern('product_price')
  async handlePrice() { /* 处理价格 */ }
}
 
@Controller()
export class InventorySyncConsumer {
  @MessagePattern('store_inventory')
  async handleInventory() { /* 处理库存 */ }
}

简单、资源高效,适合消息处理逻辑简单、流量相近的场景。

方案 2:多个独立应用,每个专职处理特定功能

// 应用 1:价格同步(部署 10 个实例)
consumer: { groupId: 'price-sync-group' }
 
// 应用 2:库存同步(部署 3 个实例)
consumer: { groupId: 'inventory-sync-group' }

适合流量差异大、需要独立扩展和故障隔离的场景。建议:先用方案 1,业务复杂后再拆分为方案 2

Partition - 并行处理

商品种类多了,单管道扛不住怎么办?Kafka 把一个 Topic 拆成多条并行的子管道,这就是 Partition(分区)。

比如把 product_price 拆成 4 个分区(0、1、2、3)。主系统推的价格消息同时在 4 条管道里跑,吞吐量翻 4 倍。

Partition 和 Consumer 的关系

price-sync-group 要消费 4 个分区的消息,可以启动几个 Consumer?

  • 启动 4 个 Consumer:一个盯一个分区,完美
  • 启动 2 个 Consumer:每个盯 2 个分区,能跑但压力大
  • 启动 5 个 Consumer:4 个干活,第 5 个没分区可盯,只能摸鱼

Kafka 的铁律:一个分区只能被一个 Group 内的一个 Consumer 消费,防止重复处理。

实践建议:Consumer 数量 = Partition 数量,完美负载均衡。

Partition 数量怎么定

创建 Topic 时指定分区数量,根据流量估算:

# 低流量(每秒 < 100 条):3-5 个 Partition
# 中等流量(每秒 100-1000 条):10-20 个 Partition
# 高流量(每秒 > 1000 条):50-100 个 Partition
 
kafka-topics --create \
  --topic product_price \
  --partitions 3 \
  --replication-factor 3

Kafka 支持动态扩缩容,自动 Rebalance:

# 低峰期:5 个 Consumer(每个处理 2 个 Partition)
kubectl scale deployment price-consumer --replicas=5
 
# 高峰期:扩容到 10 个(每个处理 1 个 Partition)
kubectl scale deployment price-consumer --replicas=10

Message Key - 保证顺序

Message Key 解决分区间的时序问题。

主系统对茅台(SKU123)先推”改成 899”,紧接着又推”改成 799”(搞大促)。

不带 Key 的后果

  • 第一条消息轮询进分区 0
  • 第二条消息轮询进分区 1
  • 分区 1 进程手快,先处理了 799,数据库改成 799
  • 分区 0 进程后处理 899,数据库又改成 899
  • 最终数据库里是 899,但应该是 799,错了!

带 Key(Key: SKU123

  • 两条消息都进分区 2(Kafka 根据 Key 哈希决定)
  • 分区 2 按顺序处理,先 899 再 799
  • 最终数据库里是 799,对了

Kafka 的铁律:同一 Key 的消息必须进同一分区。

这叫局部有序。不同商品的价格消息可能乱序(全局无序),但同一商品的价格消息严格有序(局部有序)。

Key 一般用业务主键。在我们系统里:

  • 商品价格/信息消息:Key: SKU123(商品 ID),保证同一商品的多次更新按顺序处理
  • 门店库存消息:Key: STORE_001:SKU123(门店 ID + 商品 ID),保证同一门店的同一商品库存变动按顺序处理
  • 订单消息:Key: ORD20260313001(订单号),保证同一订单的创建、取消、履约按顺序处理

串起来看

主系统(Producer)把带 Key 的价格消息(Message)推到 product_price(Topic)。Kafka 根据 Key 哈希决定进哪个 Partition。

每个 Partition 都是一个有序队列。多个 Consumer Group 可以同时消费这个 Topic,每个 Group 内的 Consumer 分摊 Partition。

我们系统怎么用的

数据流转两个方向:

  1. 接收:从主系统同步商品信息、价格、门店库存
  2. 发送:把门店订单推给总部系统

在 NestJS 里用 Kafka

在 NestJS 中使用 Kafka 有两种方式:官方微服务模块(装饰器风格,体验好)和原生 kafkajs(掌控力强)。详细对比和实现见 kafka-in-nestjs

下面的例子用 NestJS 官方微服务模块

Topic 和 Consumer Group 划分

Topic事件类型Consumer Group干什么
product_infoPRODUCT_CREATED/UPDATED/DELETEDproduct-sync-group同步商品信息到数据库
activity-refresh-group刷新活动商品展示信息
product_pricePRICE_UPDATEDprice-sync-group同步标准价格到数据库
discount-recalc-group重算活动折扣价
store_inventoryINVENTORY_UPDATED/ADJUSTEDinventory-sync-group同步门店库存到数据库
inventory-alert-group监控库存,低于阈值预警
order_eventsORDER_CREATED/CANCELLED/FULFILLEDorder-collect-group归集门店订单到总部
finance-settlement-group记录金额,准备财务结算

配置 Kafka Module

在使用前先配置好 Kafka Module:

// kafka/kafka.module.ts
@Module({
  imports: [
    // 注册 Kafka Client,用于 Producer 端发送消息
    ClientsModule.register([{
      name: 'KAFKA_CLIENT',  // 这个名字用于依赖注入
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'procurement-system',
          brokers: ['localhost:9092'],
        },
      },
    }]),
  ],
  exports: [ClientsModule],  // 导出,让其他模块可以注入使用
})
export class KafkaModule {}

Partition 数量怎么配

Partition 数量不在代码里配,而是创建 Topic 时在 Kafka 服务端指定。或者用 Kafka 管理工具(Kafka Manager、Confluent Control Center)创建时指定。

Consumer 数量建议

  • Consumer 实例数 = Partition 数量(完美负载均衡)
  • 例如:10 个 Partition,启动 10 个 Consumer 实例
  • 超过 Partition 数量的 Consumer 会摸鱼

配置 Consumer Group(启动微服务时指定)

// main.ts
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  
  // 启动 Kafka 微服务,作为 Consumer
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: { clientId: 'procurement-system', brokers: ['localhost:9092'] },
      consumer: { groupId: 'biglot-consumer-group' },  // 这里指定 Consumer Group
    },
  });
 
  await app.startAllMicroservices();
  await app.listen(3000);
}

在其他模块中注入使用

// app.module.ts
@Module({
  imports: [KafkaModule],  // 导入 Kafka Module
  controllers: [PriceSyncConsumer],
})
export class AppModule {}

HTTP 服务和 Consumer 要不要分开部署

问题:多个 Consumer 实例会不会影响 HTTP 服务?

按照上面的配置,app.connectMicroservice() 在同一个 Node.js 进程里启动 Kafka Consumer,和 HTTP 服务共享资源。如果要启动 10 个 Consumer 实例(匹配 10 个 Partition),就得启动 10 个应用实例,HTTP 服务也启动了 10 次,浪费资源。

生产环境建议:拆开部署

HTTP 服务应用(只负责接收请求):

// main-http.ts
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);  // 只启动 HTTP
}

Consumer 应用(只负责消费消息):

// main-consumer.ts
async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: { 
        clientId: 'procurement-system-consumer',
        brokers: ['localhost:9092']
      },
      consumer: { groupId: 'price-sync-group' },
    },
  });
  await app.listen();
}

部署:

  • HTTP 服务:启动 2-3 个实例(根据流量)
  • Consumer 服务:启动 N 个实例(N = Partition 数量,实现完美负载均衡)

何时拆开?

  • ✅ 消息量大(每秒上千条)
  • ✅ CPU 密集(复杂计算、图片处理)
  • ✅ 需要独立扩展(HTTP 要 3 个实例,Consumer 要 10 个)
  • ❌ 消息量小、处理简单,放一起就行

代码示例:价格调整

Producer 端:主系统推送价格变动消息

// main-system/product-price.service.ts
@Injectable()
export class ProductPriceService {
  constructor(@Inject('KAFKA_CLIENT') private kafkaClient: ClientKafka) {}
 
  async updatePrice(productId: string, newPrice: number) {
    // 1. 更新主系统数据库价格
    await this.prisma.product.update({ ... });
 
    // 2. 构造消息,Key 是商品 ID,保证同一商品的消息进同一分区
    const message = {
      key: productId,  // Message Key: 商品 ID
      value: JSON.stringify({
        productId,
        eventType: 'PRICE_UPDATED',
        oldPrice: 2999,
        newPrice: 2599,
        ...
      }),
    };
 
    // 3. 推送消息到 product_price Topic
    this.kafkaClient.emit('product_price', message);
  }
}

Consumer 端:价格同步服务(price-sync-group)

// procurement-system/price-sync.consumer.ts
@Controller()
export class PriceSyncConsumer {
  // 监听 product_price Topic,Consumer Group 是 price-sync-group
  @MessagePattern('product_price')
  async handlePriceUpdate(@Payload() message: any) {
    const { productId, newPrice } = message.value;
 
    // 同步价格到采销系统数据库
    await this.prisma.product.update({
      where: { productId },
      data: { standardPrice: newPrice },
    });
  }
}

Consumer 端:折扣重算服务(discount-recalc-group)

// procurement-system/discount-recalc.consumer.ts
@Controller()
export class DiscountRecalcConsumer {
  // 同样监听 product_price Topic,但是不同的 Consumer Group
  @MessagePattern('product_price')
  async handlePriceUpdate(@Payload() message: any) {
    const { productId, newPrice } = message.value;
 
    // 1. 查询所有进行中且包含该商品的采购活动
    const activities = await this.activityService.findActiveByProduct(productId);
 
    // 2. 遍历活动,重新计算折扣价
    for (const activity of activities) {
       const discountRate = activity.discountRate; // 比如 0.8 (8折)
       // 更新活动中的商品折扣价
       ...
    }
  }
}

代码示例:门店下单

Producer 端:门店系统推送订单消息

// store-system/order.service.ts
@Injectable()
export class OrderService {
  constructor(@Inject('KAFKA_CLIENT') private kafkaClient: ClientKafka) {}
 
  async createOrder(orderData: any) {
    // 1. 扣减可预定额度
    await this.activityService.decreaseQuota(items);
 
    // 2. 生成订单(包含价格 Snapshot)
    const order = await this.prisma.order.create({
      data: { 
        orderId,
        items: [...],  // 包含 standardPrice、discountPrice 等 Snapshot
        totalAmount,
      },
    });
 
    // 3. 构造订单消息,Key 是订单号,value 包含订单详情和价格 Snapshot
    const message = {
      key: orderId,  // Message Key: 订单号
      value: JSON.stringify({
        eventType: 'ORDER_CREATED',
        orderId,
        items: [{ productId: 'SKU123', standardPrice: 2599, discountPrice: 2079, quantity: 2 }],  // Snapshot
        ...
      }),
    };
 
    // 4. 推送到 order_events Topic
    this.kafkaClient.emit('order_events', message);
  }
}

2. Consumer 端:订单归集服务(总部)

// headquarters-system/order-collect.consumer.ts
@Controller()
export class OrderCollectConsumer {
  // 监听 order_events Topic,Consumer Group 是 order-collect-group
  @MessagePattern('order_events')
  async handleOrderCreated(@Payload() message: any) {
    const { orderId, items } = message.value;
 
    // 1. 在总部系统创建订单记录
    await this.prisma.order.create({ data: message.value });
 
    // 2. 累加商品总预定数量,准备采购
    for (const item of items) {
      await this.productService.increaseTotalQuantity(item.productId, item.quantity);
    }
  }
}

3. Consumer 端:财务结算服务(总部)

// headquarters-system/finance-settlement.consumer.ts
@Controller()
export class FinanceSettlementConsumer {
  // 同样监听 order_events Topic,但是不同的 Consumer Group
  @MessagePattern('order_events')
  async handleOrderCreated(@Payload() message: any) {
    const { orderId, storeId, totalAmount } = message.value;
 
    // 1. 记录订单金额
    await this.financeService.recordRevenue({ orderId, amount: totalAmount });
 
    // 2. 计算门店分成(假设 70%)
    const commission = totalAmount * 0.7;
    await this.financeService.recordCommission({ storeId, orderId, commission });
  }
}

流程说明

价格调整

  1. 主系统调用 ProductPriceService.updatePrice()
  2. 主系统推送消息到 product_price Topic,Key 是 SKU123
  3. price-sync-group 的某个 Consumer 收到消息,更新数据库:2999 → 2599
  4. discount-recalc-group 的某个 Consumer 收到消息,重算折扣价:2399 → 2079

门店下单

  1. 门店调用 OrderService.createOrder()
  2. 门店扣减可预定额度,生成订单(包含价格 Snapshot)
  3. 门店推送消息到 order_events Topic,Key 是订单号
  4. order-collect-group 的某个 Consumer 收到消息,在总部创建订单
  5. finance-settlement-group 的某个 Consumer 收到消息,记录财务数据

Snapshot 的作用

消息里包含下单时的价格 Snapshot(标准价 2599、折扣价 2079)。后续价格怎么变,这个订单的 Snapshot 都不会变,财务对账时能追溯回当时的价格。

解耦和容错

各模块完全解耦。折扣重算服务挂了,价格同步服务照样工作。重算服务修好后,从 Kafka 读未消费的消息,自己追上进度。

我们系统和主系统也解耦。主系统推完消息就走,不等我们处理完。我们挂了主系统不受影响,修好后从 Kafka 消费堆积的消息,慢慢追回来。