在 NestJS 中使用 Kafka 有两种方式:NestJS 官方微服务模块 和原生 kafkajs 封装。两者底层都依赖 kafkajs。
核心依赖
需要的依赖有 @nestjs/microservices 和 kafkajs
方式一:Nest官方微服务模块(推荐)
消费者端
在 main.ts 配置微服务监听:
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: { brokers: ['localhost:9092'] },
consumer: { groupId: 'my-kafka-consumer-group' },
},
});
await app.listen();Controller 中接收消息:
@Controller()
export class AppController {
@EventPattern('user.created')
handleUserCreated(@Payload() message: any) {
console.log('接收到消息:', message);
}
}生产者端
Module 中注册 Kafka 客户端:
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: { brokers: ['localhost:9092'] },
// ...
},
},
]),
],
})
export class AppModule {}Service 中发送消息:
@Injectable()
export class AppService implements OnModuleInit {
constructor(
@Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
) {}
async onModuleInit() {
await this.kafkaClient.connect(); // 内部同时连接 producer 和 consumer
}
// ✅ 框架自动处理断开连接, 不用实现onModuleDestroy,手动disconnect
sendMessage() {
this.kafkaClient.emit('user.created', { userId: 123 });
}
}方式二:原生 kafkajs 封装
适用于需要精细控制底层 API 的场景。
@Injectable()
export class KafkaCustomService implements OnModuleInit, OnModuleDestroy {
private kafka: Kafka;
private producer: Producer;
private consumer: Consumer;
constructor() {
this.kafka = new Kafka({
clientId: 'custom-client',
brokers: ['localhost:9092'],
});
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId: 'custom-group' });
}
async onModuleInit() {
await this.producer.connect(); // 连接生产者,用于发消息
await this.consumer.connect(); // 连接消费者,用于收消息
await this.consumer.subscribe({ topic: 'my-topic' });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// 处理消息...
},
});
}
async sendMessage(topic: string, message: any) {
await this.producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
});
}
// ...
}跨语言通信:序列化问题
NestJS 默认会在消息外包裹一层元数据,主要用于Nest框架,做事件响应:
{
"pattern": "user.created", // 添加的字段,主要用于Nest框架,做事件响应
"data": { "userId": 123 }
}这会导致与 Java/Go 等其他语言系统对接时出现问题, 为了解决这个问题,通常需要自定义序列化器。
自定义序列化器(发送端)
export class RawKafkaSerializer implements Serializer {
serialize(value: any) {
if (value && value.value !== undefined) {
return value;
}
return value?.data ?? value; // 去除 NestJS 外壳
}
}自定义反序列化器(接收端)
export class RawKafkaDeserializer implements Deserializer {
deserialize(value: any, options?: Record<string, any>) {
let parsedValue = value;
if (Buffer.isBuffer(value)) {
parsedValue = JSON.parse(value.toString());
}
// 包上 NestJS 认识的外壳,用 Topic 作为 pattern
// 因为需要让Nest框架的事件能够响应,Guard/Pipe等能够正常工作
return {
pattern: options?.channel,
data: parsedValue,
};
}
}应用序列化器
消费者端(main.ts):
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: { brokers: ['localhost:9092'] },
deserializer: new RawKafkaDeserializer(), // 这里自定义的反序列化
},
});生产者端(app.module.ts):
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: { brokers: ['localhost:9092'] },
serializer: new RawKafkaSerializer(), // 这里自定义的序列化
},
},
])消息头(Headers)处理
什么是消息头
Kafka 消息由三部分组成:
┌─────────────────┐
│ Key (可选) │ 用于分区路由
├─────────────────┤
│ Value (必需) │ 实际业务数据
├─────────────────┤
│ Headers (可选) │ 元数据(不是业务数据)
└─────────────────┘
Headers 典型用途:
- 链路追踪:
traceId用于跟踪请求在微服务间的流转 - 认证信息:
Authorization传递用户身份 - 消息属性:
contentType、timestamp、source等 - 业务标识:
tenantId、region等
为什么不放在 Value 里?
- 保持业务数据纯净,避免污染
- 便于中间件统一处理(如日志、监控)
- 不同消费者可能只需要 Headers,无需解析整个 Value
接收端读取 Headers
使用 @Ctx() 装饰器获取 Kafka 上下文,从中提取 Headers:
@EventPattern('nestjs.service.topic')
handleMessage(
@Payload() message: any,
@Ctx() context: KafkaContext
) {
const headers = context.getMessage().headers;
const traceId = headers['traceId']?.toString();
const auth = headers['Authorization']?.toString();
// ...
}注意:Headers 的值是 Buffer 类型,需要 .toString() 转换。
发送端带上 Headers
步骤 1:升级序列化器支持 Headers
修改前面的 RawKafkaSerializer,让它能处理 Headers:
export class RawKafkaSerializer implements Serializer {
serialize(value: any) {
const data = value?.data ?? value;
// 如果已经是完整的 Kafka 消息格式(包含 key/value/headers)
if (data && data.value !== undefined) {
return {
key: data.key,
value: typeof data.value === 'object' ? JSON.stringify(data.value) : data.value,
headers: data.headers, // 保留 headers
};
}
// 简单数据,只包含 value
return {
value: typeof data === 'object' ? JSON.stringify(data) : data,
};
}
}步骤 2:发送消息时携带 Headers
this.kafkaClient.emit('java.service.topic', {
key: 'user-1001', // 分区键
value: {
userId: 1001,
action: 'LOGIN'
},
headers: { // 元数据
'traceId': uuidv4(), // 链路追踪 ID
'Authorization': 'Bearer xxx', // 认证令牌
'contentType': 'application/json',
'source': 'nestjs-service',
}
});实际发送到 Kafka 的消息结构:
{
"key": "user-1001",
"value": "{\"userId\":1001,\"action\":\"LOGIN\"}",
"headers": {
"traceId": "550e8400-e29b-41d4-a716-446655440000",
"Authorization": "Bearer xxx",
"contentType": "application/json",
"source": "nestjs-service"
}
}NestJS 生态集成
官方微服务模块最大优势是完美融入 NestJS 生态,支持 Guards、Interceptors、Pipes。保证只有满足要求的数据,才会被handler 接收处理
数据校验(Pipes)
export class CreateUserDto {
@IsEmail()
readonly email: string;
@MinLength(6)
readonly password: string;
}
@EventPattern('user.create')
@UsePipes(new ValidationPipe())
handleUserCreate(@Payload() data: CreateUserDto) {
// data 已通过校验
}权限控制(Guards)
@Injectable()
export class RolesGuard implements CanActivate {
canActivate(context: ExecutionContext): boolean {
const rpcContext = context.switchToRpc().getContext<KafkaContext>();
const headers = rpcContext.getMessage().headers;
const role = headers['x-role']?.toString();
return role === 'admin';
}
}
@EventPattern('user.create')
@UseGuards(RolesGuard)
handleUserCreate(@Payload() data: CreateUserDto) {
// 只有 admin 角色才能到达这里
// 校验不过会这条消息相当于没有处理
}关键行为 当 Pipe 或 Guard 抛出异常时:
- ❌ 消息不会被标记为已处理(Offset 不会提交)
- 🔄 Kafka 会重发这条消息(因为 Consumer 认为处理失败了)
- ⚠️ 可能导致”毒药消息”问题(反复重试同一条错误消息,阻塞队列)
毒药消息的问题,需要参考下面的 异常过滤器 + 死信队列 来解决这个问题。
日志拦截(Interceptors)
@Injectable()
export class KafkaLoggingInterces withmplements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
// 注意这里是 switchToRpc, 不是 switchToHttp
const rpcContext = context.switchToRpc().getContext<KafkaContext>();
const topic = rpcContext.getTopic();
const now = Date.now();
return next.handle().pipe(
tap(() => console.log(`处理 ${topic} 耗时: ${Date.now() - now}ms`))
);
}
}错误处理与死信队列
Offset 机制:Kafka 怎么知道消息被处理了
Offset(偏移量) 是 Kafka 的”书签”,用于标记每个 Consumer 读到了哪条消息。
NestJS 的 Offset 提交时机:
@EventPattern('user.created')
async handleMessage(@Payload() data: any) {
// 1. 收到消息 (Offset = 2)
console.log(data);
// 2. 处理逻辑
await saveToDatabase(data);
// 3. ✅ 函数正常返回 → Kafka 自动提交 Offset = 3
// 下次从 Offset 3 开始读
}如果抛出异常会怎样:
@EventPattern('user.created')
async handleMessage(@Payload() data: any) {
// 1. 收到消息 (Offset = 2)
// 2. 校验失败,抛出异常
throw new Error('数据格式错误');
// 3. ❌ Offset 不会提交,仍然停在 2
// 4. 🔄 Kafka 重新推送 Offset = 2 的消息
// 5. ♾️ 无限循环,队列被"毒药消息"阻塞
}“毒药消息”问题
当一条消息因为格式错误、业务逻辑 Bug 等原因无法被处理时:
消息到达 → 处理失败 → 不提交 Offset
↑ ↓
└────────── Kafka 重发 ────┘
结果:后续所有消息都被阻塞 🚫
真实场景示例:
// 第 100 条消息的 email 字段格式错误
@EventPattern('user.created')
@UsePipes(new ValidationPipe()) // 校验失败抛异常
handleMessage(@Payload() data: CreateUserDto) {
// 如果上面的 Pipe 或 Guard 失败:永远卡在第 100 条消息, 第 101、102...条消息都无法处理
// 1. 这个 handler 不会执行
// 2. Offset 不会提交
// 3. Kafka 会重新推送这条消息
// 4. 陷入死循环 ♾️生产者 vs 消费者的错误策略
| 角色 | 错误场景 | 处理策略 |
|---|---|---|
| 生产者 | 网络抖动、Broker 宕机 | Kafka 客户端自动重试(默认重试几次),失败后向上层抛错 |
| 消费者 | 数据格式错误、业务异常 | 必须捕获异常,发送到死信队列,提交 Offset 继续处理后续消息 |
为什么消费者必须处理异常?
- 生产者失败可以重试或通知用户
- 消费者失败会阻塞整个队列,影响所有后续消息
死信队列(DLQ):无法处理的消息去哪里
死信队列是一个普通的 Kafka Topic,专门存储”处理失败”的消息,供后续排查和重新处理。
正常流程:
user.created (主 Topic)
↓
Consumer 处理成功
↓
提交 Offset
异常流程:
user.created (主 Topic)
↓
Consumer 处理失败
↓
发送到 user.created.DLQ (死信 Topic)
↓
提交原消息的 Offset (重要!)
↓
继续处理下一条消息
DLQ 命名规范:通常是 <原Topic名称>.DLQ,例如:
user.created→user.created.DLQorder.paid→order.paid.DLQ
配置底层重试
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 5,
},
},
},
});实现 DLQ 过滤器
@Catch()
export class KafkaDlqFilter implements ExceptionFilter {
constructor(
@Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
) {}
catch(exception: any, host: ArgumentsHost) {
const ctx = host.switchToRpc().getContext<KafkaContext>();
const originalTopic = ctx.getTopic();
const originalMessage = ctx.getMessage();
const dlqTopic = `${originalTopic}.DLQ`;
// 发送到死信队列
this.kafkaClient.emit(dlqTopic, {
key: originalMessage.key,
value: {
originalPayload: originalMessage.value,
errorReason: exception.message,
failedAt: new Date().toISOString(),
},
headers: originalMessage.headers,
});
// 返回空 Observable,告诉 Kafka 已妥善处理,提交 Offset
return of(null);
}
}应用过滤器:
@Controller()
@UseFilters(KafkaDlqFilter) // 应用了Filter
export class AppController {
@EventPattern('user.create')
@UsePipes(new ValidationPipe())
handleUserCreate(@Payload() data: CreateUserDto) {
// 校验失败或业务异常会被 KafkaDlqFilter 捕获
}
}消息发送到 DLQ 后,后续处理流程是啥?
- 报警:监控系统检测到 DLQ 有新消息,发送报警
- 排查:开发人员通过 Kafka UI 查看消息内容和错误原因
- 修复:修复代码 Bug 或恢复下游服务
- 重播(Replay):通过工具将 DLQ 消息重新发送回原 Topic
- 推进 Offset:处理完毕后,推进 DLQ 的 Offset
Kafka 的不可变日志设计
不像RabbitMQ,Kafka 不支持删除特定消息(类似录音带,只能追加)。处理过的死信通过推进 Offset 标记为已处理,物理数据保留在磁盘,直到达到保留策略(如 14 天)后自动清理。
这种设计保留了审计和追溯能力,允许后续重新查看历史死信。
方案选择建议
官方微服务模块 vs 原生 kafkajs:官方模块开发体验好(装饰器、自动路由),但跨语言通信需自定义序列化器;原生 kafkajs 完全掌控但代码繁琐。
推荐方案:90% 场景使用官方模块 + 自定义序列化器,享受 NestJS 生态优势的同时抹平框架差异。只在数据密集型或需要精细控制 Offset 的场景使用原生封装。