最近项目用到 Kafka,从公司主系统同步数据到我们的采销系统。之前没接触过,边做边学,正好梳理一下。
为什么需要 Kafka
我们做的是采销系统。买手创建采购活动(商品、折扣、赠品),审批通过后发布到门店。门店对外预售,客户下单后扣可预定额度,我们把订单推给总部系统去采购。典型的拉式供应链:先收客单再采购,零库存。
商品基础信息(名称、图片、标准价格、门店库存)都来自主系统,需要实时同步过来。我们自己维护折扣、采购计划、可预定额度。
问题来了:主系统商品价格从 999 改成 899,我们系统怎么知道?
传统方案:主系统改完价格,调我们的 HTTP 接口推数据。但是:
- 主系统要等我们处理完才能继续,太慢
- 我们系统挂了,主系统也挂着等,还得重试
- 价格变了要同步数据库,还要重算活动折扣价,两件事耦合在一个接口里
Kafka 方案:主系统把价格变动扔到 Kafka,不等我们处理就继续干活。我们启动两个服务:价格同步服务更新数据库,折扣重算服务重算活动。各干各的,谁挂了都不影响别人。
几个基本概念
Producer 和 Message
Producer(生产者)就是发消息的那一方。在我们系统里,主系统就是 Producer。
Message(消息)就是发的数据。比如茅台价格从 999 改成 899,主系统往 Kafka 扔一条消息:
{
"productId": "SKU123",
"eventType": "PRICE_UPDATED",
"oldPrice": 999,
"newPrice": 899
}Topic - 给消息分个类
Topic(主题)就是消息的分类。
为什么要分类?因为不同数据的处理逻辑完全不一样。价格变了要重算折扣,库存变了要发预警,混一起根本没法处理。
我们按数据类型分了三个 Topic:
product_info:商品基础信息(上架、改名、换图、下架)product_price:价格变动store_inventory:门店库存变动
价格消息只进 product_price,库存消息只进 store_inventory。
Consumer - 干活的
Consumer(消费者)就是接收并处理消息的程序。你在服务器上启动一个进程,这个进程就是一个 Consumer。
比如我们有个”价格同步服务”,启动后就成了一个 Consumer,从 product_price 这个 Topic 里读消息,然后更新数据库。
Consumer Group - 团队协作
Consumer Group(消费者组)是一组 Consumer 的集合。关键规则:一条消息在一个 Group 内只会被一个 Consumer 处理。
比如”价格同步服务”启动了 4 个进程(4 个 Consumer),它们组成一个 Consumer Group 叫 price-sync-group。这 4 个进程会分工合作,每条消息只被其中一个进程处理。
发布/订阅机制:同一条消息可以被多个 Consumer Group 消费,但在一个 Group 内只会被一个 Consumer 处理。
举个例子,product_price 这个 Topic 有两个 Group 在监听:
price-sync-group(价格同步服务):更新数据库discount-recalc-group(折扣重算服务):重算活动折扣价
主系统推了条”茅台从 999 改成 899”的消息:
price-sync-group收到一份,Group 内 4 个 Consumer 中的某一个处理它,更新数据库discount-recalc-group也收到一份,Group 内的某个 Consumer 处理它,重算折扣- 两个 Group 互不影响。价格同步服务挂了,折扣重算服务照样工作
这就是发布/订阅。一条消息发出去,所有订阅的 Group 都能收到。
为什么需要 Consumer Group?
- 负载均衡:多个 Consumer 分摊压力
- 故障转移:一个 Consumer 挂了,其他 Consumer 接管
- 发布/订阅:多个 Group 各干各的事
Group ID 划分原则
命名建议:{服务名}-{功能}-group,例如 price-sync-group、discount-recalc-group。
一个应用该用一个 Group 还是多个 Group?
两种方案:
方案 1:一个 Group,处理多个 Topic,多个不同类型的 Consumer — 小应用最爱。
// 一个应用,一个 Kafka 连接
app.connectMicroservice({
consumer: { groupId: 'appointment-consumer-group' }
});
// 内部不同 Consumer 监听不同 Topic
@Controller()
export class PriceSyncConsumer {
@MessagePattern('product_price')
async handlePrice() { /* 处理价格 */ }
}
@Controller()
export class InventorySyncConsumer {
@MessagePattern('store_inventory')
async handleInventory() { /* 处理库存 */ }
}简单、资源高效,适合消息处理逻辑简单、流量相近的场景。
方案 2:多个独立应用,每个专职处理特定功能
// 应用 1:价格同步(部署 10 个实例)
consumer: { groupId: 'price-sync-group' }
// 应用 2:库存同步(部署 3 个实例)
consumer: { groupId: 'inventory-sync-group' }适合流量差异大、需要独立扩展和故障隔离的场景。建议:先用方案 1,业务复杂后再拆分为方案 2
Partition - 并行处理
商品种类多了,单管道扛不住怎么办?Kafka 把一个 Topic 拆成多条并行的子管道,这就是 Partition(分区)。
比如把 product_price 拆成 4 个分区(0、1、2、3)。主系统推的价格消息同时在 4 条管道里跑,吞吐量翻 4 倍。
Partition 和 Consumer 的关系
price-sync-group 要消费 4 个分区的消息,可以启动几个 Consumer?
- 启动 4 个 Consumer:一个盯一个分区,完美
- 启动 2 个 Consumer:每个盯 2 个分区,能跑但压力大
- 启动 5 个 Consumer:4 个干活,第 5 个没分区可盯,只能摸鱼
Kafka 的铁律:一个分区只能被一个 Group 内的一个 Consumer 消费,防止重复处理。
实践建议:Consumer 数量 = Partition 数量,完美负载均衡。
Partition 数量怎么定
创建 Topic 时指定分区数量,根据流量估算:
# 低流量(每秒 < 100 条):3-5 个 Partition
# 中等流量(每秒 100-1000 条):10-20 个 Partition
# 高流量(每秒 > 1000 条):50-100 个 Partition
kafka-topics --create \
--topic product_price \
--partitions 3 \
--replication-factor 3Kafka 支持动态扩缩容,自动 Rebalance:
# 低峰期:5 个 Consumer(每个处理 2 个 Partition)
kubectl scale deployment price-consumer --replicas=5
# 高峰期:扩容到 10 个(每个处理 1 个 Partition)
kubectl scale deployment price-consumer --replicas=10Message Key - 保证顺序
Message Key 解决分区间的时序问题。
主系统对茅台(SKU123)先推”改成 899”,紧接着又推”改成 799”(搞大促)。
不带 Key 的后果:
- 第一条消息轮询进分区 0
- 第二条消息轮询进分区 1
- 分区 1 进程手快,先处理了 799,数据库改成 799
- 分区 0 进程后处理 899,数据库又改成 899
- 最终数据库里是 899,但应该是 799,错了!
带 Key(Key: SKU123):
- 两条消息都进分区 2(Kafka 根据 Key 哈希决定)
- 分区 2 按顺序处理,先 899 再 799
- 最终数据库里是 799,对了
Kafka 的铁律:同一 Key 的消息必须进同一分区。
这叫局部有序。不同商品的价格消息可能乱序(全局无序),但同一商品的价格消息严格有序(局部有序)。
Key 一般用业务主键。在我们系统里:
- 商品价格/信息消息:
Key: SKU123(商品 ID),保证同一商品的多次更新按顺序处理 - 门店库存消息:
Key: STORE_001:SKU123(门店 ID + 商品 ID),保证同一门店的同一商品库存变动按顺序处理 - 订单消息:
Key: ORD20260313001(订单号),保证同一订单的创建、取消、履约按顺序处理
串起来看
主系统(Producer)把带 Key 的价格消息(Message)推到 product_price(Topic)。Kafka 根据 Key 哈希决定进哪个 Partition。
每个 Partition 都是一个有序队列。多个 Consumer Group 可以同时消费这个 Topic,每个 Group 内的 Consumer 分摊 Partition。
我们系统怎么用的
数据流转两个方向:
- 接收:从主系统同步商品信息、价格、门店库存
- 发送:把门店订单推给总部系统
在 NestJS 里用 Kafka
在 NestJS 中使用 Kafka 有两种方式:官方微服务模块(装饰器风格,体验好)和原生 kafkajs(掌控力强)。详细对比和实现见 kafka-in-nestjs。
下面的例子用 NestJS 官方微服务模块。
Topic 和 Consumer Group 划分
| Topic | 事件类型 | Consumer Group | 干什么 |
|---|---|---|---|
product_info | PRODUCT_CREATED/UPDATED/DELETED | product-sync-group | 同步商品信息到数据库 |
activity-refresh-group | 刷新活动商品展示信息 | ||
product_price | PRICE_UPDATED | price-sync-group | 同步标准价格到数据库 |
discount-recalc-group | 重算活动折扣价 | ||
store_inventory | INVENTORY_UPDATED/ADJUSTED | inventory-sync-group | 同步门店库存到数据库 |
inventory-alert-group | 监控库存,低于阈值预警 | ||
order_events | ORDER_CREATED/CANCELLED/FULFILLED | order-collect-group | 归集门店订单到总部 |
finance-settlement-group | 记录金额,准备财务结算 |
配置 Kafka Module
在使用前先配置好 Kafka Module:
// kafka/kafka.module.ts
@Module({
imports: [
// 注册 Kafka Client,用于 Producer 端发送消息
ClientsModule.register([{
name: 'KAFKA_CLIENT', // 这个名字用于依赖注入
transport: Transport.KAFKA,
options: {
client: {
clientId: 'procurement-system',
brokers: ['localhost:9092'],
},
},
}]),
],
exports: [ClientsModule], // 导出,让其他模块可以注入使用
})
export class KafkaModule {}Partition 数量怎么配
Partition 数量不在代码里配,而是创建 Topic 时在 Kafka 服务端指定。或者用 Kafka 管理工具(Kafka Manager、Confluent Control Center)创建时指定。
Consumer 数量建议:
- Consumer 实例数 = Partition 数量(完美负载均衡)
- 例如:10 个 Partition,启动 10 个 Consumer 实例
- 超过 Partition 数量的 Consumer 会摸鱼
配置 Consumer Group(启动微服务时指定)
// main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// 启动 Kafka 微服务,作为 Consumer
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: { clientId: 'procurement-system', brokers: ['localhost:9092'] },
consumer: { groupId: 'biglot-consumer-group' }, // 这里指定 Consumer Group
},
});
await app.startAllMicroservices();
await app.listen(3000);
}在其他模块中注入使用
// app.module.ts
@Module({
imports: [KafkaModule], // 导入 Kafka Module
controllers: [PriceSyncConsumer],
})
export class AppModule {}HTTP 服务和 Consumer 要不要分开部署
问题:多个 Consumer 实例会不会影响 HTTP 服务?
按照上面的配置,app.connectMicroservice() 在同一个 Node.js 进程里启动 Kafka Consumer,和 HTTP 服务共享资源。如果要启动 10 个 Consumer 实例(匹配 10 个 Partition),就得启动 10 个应用实例,HTTP 服务也启动了 10 次,浪费资源。
生产环境建议:拆开部署
HTTP 服务应用(只负责接收请求):
// main-http.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
await app.listen(3000); // 只启动 HTTP
}Consumer 应用(只负责消费消息):
// main-consumer.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'procurement-system-consumer',
brokers: ['localhost:9092']
},
consumer: { groupId: 'price-sync-group' },
},
});
await app.listen();
}部署:
- HTTP 服务:启动 2-3 个实例(根据流量)
- Consumer 服务:启动 N 个实例(N = Partition 数量,实现完美负载均衡)
何时拆开?
- ✅ 消息量大(每秒上千条)
- ✅ CPU 密集(复杂计算、图片处理)
- ✅ 需要独立扩展(HTTP 要 3 个实例,Consumer 要 10 个)
- ❌ 消息量小、处理简单,放一起就行
代码示例:价格调整
Producer 端:主系统推送价格变动消息
// main-system/product-price.service.ts
@Injectable()
export class ProductPriceService {
constructor(@Inject('KAFKA_CLIENT') private kafkaClient: ClientKafka) {}
async updatePrice(productId: string, newPrice: number) {
// 1. 更新主系统数据库价格
await this.prisma.product.update({ ... });
// 2. 构造消息,Key 是商品 ID,保证同一商品的消息进同一分区
const message = {
key: productId, // Message Key: 商品 ID
value: JSON.stringify({
productId,
eventType: 'PRICE_UPDATED',
oldPrice: 2999,
newPrice: 2599,
...
}),
};
// 3. 推送消息到 product_price Topic
this.kafkaClient.emit('product_price', message);
}
}Consumer 端:价格同步服务(price-sync-group)
// procurement-system/price-sync.consumer.ts
@Controller()
export class PriceSyncConsumer {
// 监听 product_price Topic,Consumer Group 是 price-sync-group
@MessagePattern('product_price')
async handlePriceUpdate(@Payload() message: any) {
const { productId, newPrice } = message.value;
// 同步价格到采销系统数据库
await this.prisma.product.update({
where: { productId },
data: { standardPrice: newPrice },
});
}
}Consumer 端:折扣重算服务(discount-recalc-group)
// procurement-system/discount-recalc.consumer.ts
@Controller()
export class DiscountRecalcConsumer {
// 同样监听 product_price Topic,但是不同的 Consumer Group
@MessagePattern('product_price')
async handlePriceUpdate(@Payload() message: any) {
const { productId, newPrice } = message.value;
// 1. 查询所有进行中且包含该商品的采购活动
const activities = await this.activityService.findActiveByProduct(productId);
// 2. 遍历活动,重新计算折扣价
for (const activity of activities) {
const discountRate = activity.discountRate; // 比如 0.8 (8折)
// 更新活动中的商品折扣价
...
}
}
}代码示例:门店下单
Producer 端:门店系统推送订单消息
// store-system/order.service.ts
@Injectable()
export class OrderService {
constructor(@Inject('KAFKA_CLIENT') private kafkaClient: ClientKafka) {}
async createOrder(orderData: any) {
// 1. 扣减可预定额度
await this.activityService.decreaseQuota(items);
// 2. 生成订单(包含价格 Snapshot)
const order = await this.prisma.order.create({
data: {
orderId,
items: [...], // 包含 standardPrice、discountPrice 等 Snapshot
totalAmount,
},
});
// 3. 构造订单消息,Key 是订单号,value 包含订单详情和价格 Snapshot
const message = {
key: orderId, // Message Key: 订单号
value: JSON.stringify({
eventType: 'ORDER_CREATED',
orderId,
items: [{ productId: 'SKU123', standardPrice: 2599, discountPrice: 2079, quantity: 2 }], // Snapshot
...
}),
};
// 4. 推送到 order_events Topic
this.kafkaClient.emit('order_events', message);
}
}2. Consumer 端:订单归集服务(总部)
// headquarters-system/order-collect.consumer.ts
@Controller()
export class OrderCollectConsumer {
// 监听 order_events Topic,Consumer Group 是 order-collect-group
@MessagePattern('order_events')
async handleOrderCreated(@Payload() message: any) {
const { orderId, items } = message.value;
// 1. 在总部系统创建订单记录
await this.prisma.order.create({ data: message.value });
// 2. 累加商品总预定数量,准备采购
for (const item of items) {
await this.productService.increaseTotalQuantity(item.productId, item.quantity);
}
}
}3. Consumer 端:财务结算服务(总部)
// headquarters-system/finance-settlement.consumer.ts
@Controller()
export class FinanceSettlementConsumer {
// 同样监听 order_events Topic,但是不同的 Consumer Group
@MessagePattern('order_events')
async handleOrderCreated(@Payload() message: any) {
const { orderId, storeId, totalAmount } = message.value;
// 1. 记录订单金额
await this.financeService.recordRevenue({ orderId, amount: totalAmount });
// 2. 计算门店分成(假设 70%)
const commission = totalAmount * 0.7;
await this.financeService.recordCommission({ storeId, orderId, commission });
}
}流程说明
价格调整
- 主系统调用
ProductPriceService.updatePrice() - 主系统推送消息到
product_priceTopic,Key 是SKU123 price-sync-group的某个 Consumer 收到消息,更新数据库:2999 → 2599discount-recalc-group的某个 Consumer 收到消息,重算折扣价:2399 → 2079
门店下单
- 门店调用
OrderService.createOrder() - 门店扣减可预定额度,生成订单(包含价格 Snapshot)
- 门店推送消息到
order_eventsTopic,Key 是订单号 order-collect-group的某个 Consumer 收到消息,在总部创建订单finance-settlement-group的某个 Consumer 收到消息,记录财务数据
Snapshot 的作用
消息里包含下单时的价格 Snapshot(标准价 2599、折扣价 2079)。后续价格怎么变,这个订单的 Snapshot 都不会变,财务对账时能追溯回当时的价格。
解耦和容错
各模块完全解耦。折扣重算服务挂了,价格同步服务照样工作。重算服务修好后,从 Kafka 读未消费的消息,自己追上进度。
我们系统和主系统也解耦。主系统推完消息就走,不等我们处理完。我们挂了主系统不受影响,修好后从 Kafka 消费堆积的消息,慢慢追回来。