在 NestJS 中使用 Kafka 有两种方式:NestJS 官方微服务模块原生 kafkajs 封装。两者底层都依赖 kafkajs

核心依赖

需要的依赖有 @nestjs/microserviceskafkajs

方式一:Nest官方微服务模块(推荐)

消费者端

main.ts 配置微服务监听:

import { MicroserviceOptions, Transport } from '@nestjs/microservices';
 
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: { brokers: ['localhost:9092'] },
    consumer: { groupId: 'my-kafka-consumer-group' },
  },
});
await app.listen();

Controller 中接收消息:

@Controller()
export class AppController {
  @EventPattern('user.created')
  handleUserCreated(@Payload() message: any) {
    console.log('接收到消息:', message);
  }
}

生产者端

Module 中注册 Kafka 客户端:

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: { brokers: ['localhost:9092'] },
          // ...
        },
      },
    ]),
  ],
})
export class AppModule {}

Service 中发送消息:

@Injectable()
export class AppService implements OnModuleInit {
  constructor(
    @Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
  ) {}
 
  async onModuleInit() {
    await this.kafkaClient.connect(); // 内部同时连接 producer 和 consumer
  }
  // ✅ 框架自动处理断开连接, 不用实现onModuleDestroy,手动disconnect
 
  sendMessage() {
    this.kafkaClient.emit('user.created', { userId: 123 });
  }
}

方式二:原生 kafkajs 封装

适用于需要精细控制底层 API 的场景。

@Injectable()
export class KafkaCustomService implements OnModuleInit, OnModuleDestroy {
  private kafka: Kafka;
  private producer: Producer;
  private consumer: Consumer;
 
  constructor() {
    this.kafka = new Kafka({
      clientId: 'custom-client',
      brokers: ['localhost:9092'],
    });
    this.producer = this.kafka.producer();
    this.consumer = this.kafka.consumer({ groupId: 'custom-group' });
  }
 
  async onModuleInit() {
    await this.producer.connect(); // 连接生产者,用于发消息
    await this.consumer.connect(); // 连接消费者,用于收消息
    
    await this.consumer.subscribe({ topic: 'my-topic' });
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        // 处理消息...
      },
    });
  }
 
  async sendMessage(topic: string, message: any) {
    await this.producer.send({
      topic,
      messages: [{ value: JSON.stringify(message) }],
    });
  }
  
  // ...
}

跨语言通信:序列化问题

NestJS 默认会在消息外包裹一层元数据,主要用于Nest框架,做事件响应:

{
  "pattern": "user.created", // 添加的字段,主要用于Nest框架,做事件响应
  "data": { "userId": 123 }
}

这会导致与 Java/Go 等其他语言系统对接时出现问题, 为了解决这个问题,通常需要自定义序列化器

自定义序列化器(发送端)

export class RawKafkaSerializer implements Serializer {
  serialize(value: any) {
    if (value && value.value !== undefined) {
      return value;
    }
    return value?.data ?? value; // 去除 NestJS 外壳
  }
}

自定义反序列化器(接收端)

export class RawKafkaDeserializer implements Deserializer {
  deserialize(value: any, options?: Record<string, any>) {
    let parsedValue = value;
    
    if (Buffer.isBuffer(value)) {
      parsedValue = JSON.parse(value.toString());
    }
 
    // 包上 NestJS 认识的外壳,用 Topic 作为 pattern
    // 因为需要让Nest框架的事件能够响应,Guard/Pipe等能够正常工作
    return {
      pattern: options?.channel, 
      data: parsedValue,
    };
  }
}

应用序列化器

消费者端(main.ts):

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: { brokers: ['localhost:9092'] },
    deserializer: new RawKafkaDeserializer(), // 这里自定义的反序列化
  },
});

生产者端(app.module.ts):

ClientsModule.register([
  {
    name: 'KAFKA_SERVICE',
    transport: Transport.KAFKA,
    options: {
      client: { brokers: ['localhost:9092'] },
      serializer: new RawKafkaSerializer(), // 这里自定义的序列化
    },
  },
])

消息头(Headers)处理

什么是消息头

Kafka 消息由三部分组成:

┌─────────────────┐
│ Key (可选)       │  用于分区路由
├─────────────────┤
│ Value (必需)     │  实际业务数据
├─────────────────┤
│ Headers (可选)   │  元数据(不是业务数据)
└─────────────────┘

Headers 典型用途

  • 链路追踪traceId 用于跟踪请求在微服务间的流转
  • 认证信息Authorization 传递用户身份
  • 消息属性contentTypetimestampsource
  • 业务标识tenantIdregion

为什么不放在 Value 里

  • 保持业务数据纯净,避免污染
  • 便于中间件统一处理(如日志、监控)
  • 不同消费者可能只需要 Headers,无需解析整个 Value

接收端读取 Headers

使用 @Ctx() 装饰器获取 Kafka 上下文,从中提取 Headers:

@EventPattern('nestjs.service.topic')
handleMessage(
  @Payload() message: any,
  @Ctx() context: KafkaContext
) {
  const headers = context.getMessage().headers;
  const traceId = headers['traceId']?.toString();
  const auth = headers['Authorization']?.toString();
  // ...
}

注意:Headers 的值是 Buffer 类型,需要 .toString() 转换。

发送端带上 Headers

步骤 1:升级序列化器支持 Headers

修改前面的 RawKafkaSerializer,让它能处理 Headers:

export class RawKafkaSerializer implements Serializer {
  serialize(value: any) {
    const data = value?.data ?? value;
 
    // 如果已经是完整的 Kafka 消息格式(包含 key/value/headers)
    if (data && data.value !== undefined) {
      return {
        key: data.key,
        value: typeof data.value === 'object' ? JSON.stringify(data.value) : data.value,
        headers: data.headers, // 保留 headers
      };
    }
 
    // 简单数据,只包含 value
    return {
      value: typeof data === 'object' ? JSON.stringify(data) : data,
    };
  }
}

步骤 2:发送消息时携带 Headers

this.kafkaClient.emit('java.service.topic', {
  key: 'user-1001',              // 分区键
  value: { 
    userId: 1001, 
    action: 'LOGIN' 
  },
  headers: {                      // 元数据
    'traceId': uuidv4(),          // 链路追踪 ID
    'Authorization': 'Bearer xxx', // 认证令牌
    'contentType': 'application/json',
    'source': 'nestjs-service',
  }
});

实际发送到 Kafka 的消息结构

{
  "key": "user-1001",
  "value": "{\"userId\":1001,\"action\":\"LOGIN\"}",
  "headers": {
    "traceId": "550e8400-e29b-41d4-a716-446655440000",
    "Authorization": "Bearer xxx",
    "contentType": "application/json",
    "source": "nestjs-service"
  }
}

NestJS 生态集成

官方微服务模块最大优势是完美融入 NestJS 生态,支持 Guards、Interceptors、Pipes。保证只有满足要求的数据,才会被handler 接收处理

数据校验(Pipes)

export class CreateUserDto {
  @IsEmail()
  readonly email: string;
 
  @MinLength(6)
  readonly password: string;
}
 
@EventPattern('user.create')
@UsePipes(new ValidationPipe())
handleUserCreate(@Payload() data: CreateUserDto) {
  // data 已通过校验
}

权限控制(Guards)

@Injectable()
export class RolesGuard implements CanActivate {
  canActivate(context: ExecutionContext): boolean {
    const rpcContext = context.switchToRpc().getContext<KafkaContext>();
    const headers = rpcContext.getMessage().headers;
    
    const role = headers['x-role']?.toString();
    return role === 'admin';
  }
}
 
@EventPattern('user.create')
@UseGuards(RolesGuard)
handleUserCreate(@Payload() data: CreateUserDto) {
  // 只有 admin 角色才能到达这里
  // 校验不过会这条消息相当于没有处理
}

关键行为 当 Pipe 或 Guard 抛出异常时:

  • ❌ 消息不会被标记为已处理(Offset 不会提交)
  • 🔄 Kafka 会重发这条消息(因为 Consumer 认为处理失败了)
  • ⚠️ 可能导致”毒药消息”问题(反复重试同一条错误消息,阻塞队列)

毒药消息的问题,需要参考下面的 异常过滤器 + 死信队列 来解决这个问题。

日志拦截(Interceptors)

@Injectable()
export class KafkaLoggingInterces withmplements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    // 注意这里是 switchToRpc, 不是 switchToHttp
    const rpcContext = context.switchToRpc().getContext<KafkaContext>();
    const topic = rpcContext.getTopic();
    const now = Date.now();
    
    return next.handle().pipe(
      tap(() => console.log(`处理 ${topic} 耗时: ${Date.now() - now}ms`))
    );
  }
}

错误处理与死信队列

Offset 机制:Kafka 怎么知道消息被处理了

Offset(偏移量) 是 Kafka 的”书签”,用于标记每个 Consumer 读到了哪条消息。

NestJS 的 Offset 提交时机

@EventPattern('user.created')
async handleMessage(@Payload() data: any) {
  // 1. 收到消息 (Offset = 2)
  console.log(data);
  
  // 2. 处理逻辑
  await saveToDatabase(data);
  
  // 3. ✅ 函数正常返回 → Kafka 自动提交 Offset = 3
  //    下次从 Offset 3 开始读
}

如果抛出异常会怎样

@EventPattern('user.created')
async handleMessage(@Payload() data: any) {
  // 1. 收到消息 (Offset = 2)
  
  // 2. 校验失败,抛出异常
  throw new Error('数据格式错误');
  
  // 3. ❌ Offset 不会提交,仍然停在 2
  // 4. 🔄 Kafka 重新推送 Offset = 2 的消息
  // 5. ♾️ 无限循环,队列被"毒药消息"阻塞
}

“毒药消息”问题

当一条消息因为格式错误、业务逻辑 Bug 等原因无法被处理时:

消息到达 → 处理失败 → 不提交 Offset
    ↑                         ↓
    └────────── Kafka 重发 ────┘
    
结果:后续所有消息都被阻塞 🚫

真实场景示例

// 第 100 条消息的 email 字段格式错误
@EventPattern('user.created')
@UsePipes(new ValidationPipe())  // 校验失败抛异常
handleMessage(@Payload() data: CreateUserDto) {
  // 如果上面的 Pipe 或 Guard 失败:永远卡在第 100 条消息, 第 101、102...条消息都无法处理  
  // 1. 这个 handler 不会执行
  // 2. Offset 不会提交
  // 3. Kafka 会重新推送这条消息
  // 4. 陷入死循环 ♾️

生产者 vs 消费者的错误策略

角色错误场景处理策略
生产者网络抖动、Broker 宕机Kafka 客户端自动重试(默认重试几次),失败后向上层抛错
消费者数据格式错误、业务异常必须捕获异常,发送到死信队列,提交 Offset 继续处理后续消息

为什么消费者必须处理异常

  • 生产者失败可以重试或通知用户
  • 消费者失败会阻塞整个队列,影响所有后续消息

死信队列(DLQ):无法处理的消息去哪里

死信队列是一个普通的 Kafka Topic,专门存储”处理失败”的消息,供后续排查和重新处理。

正常流程:
user.created (主 Topic)
    ↓
Consumer 处理成功
    ↓
提交 Offset

异常流程:
user.created (主 Topic)
    ↓
Consumer 处理失败
    ↓
发送到 user.created.DLQ (死信 Topic)
    ↓
提交原消息的 Offset (重要!)
    ↓
继续处理下一条消息

DLQ 命名规范:通常是 <原Topic名称>.DLQ,例如:

  • user.createduser.created.DLQ
  • order.paidorder.paid.DLQ

配置底层重试

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
      retry: {
        initialRetryTime: 100,
        retries: 5,
      },
    },
  },
});

实现 DLQ 过滤器

@Catch()
export class KafkaDlqFilter implements ExceptionFilter {
  constructor(
    @Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
  ) {}
 
  catch(exception: any, host: ArgumentsHost) {
    const ctx = host.switchToRpc().getContext<KafkaContext>();
    const originalTopic = ctx.getTopic();
    const originalMessage = ctx.getMessage();
    const dlqTopic = `${originalTopic}.DLQ`;
 
    // 发送到死信队列
    this.kafkaClient.emit(dlqTopic, {
      key: originalMessage.key,
      value: {
        originalPayload: originalMessage.value,
        errorReason: exception.message,
        failedAt: new Date().toISOString(),
      },
      headers: originalMessage.headers,
    });
 
    // 返回空 Observable,告诉 Kafka 已妥善处理,提交 Offset
    return of(null); 
  }
}

应用过滤器:

@Controller()
@UseFilters(KafkaDlqFilter) // 应用了Filter
export class AppController {
  @EventPattern('user.create')
  @UsePipes(new ValidationPipe())
  handleUserCreate(@Payload() data: CreateUserDto) {
    // 校验失败或业务异常会被 KafkaDlqFilter 捕获
  }
}

消息发送到 DLQ 后,后续处理流程是啥?

  1. 报警:监控系统检测到 DLQ 有新消息,发送报警
  2. 排查:开发人员通过 Kafka UI 查看消息内容和错误原因
  3. 修复:修复代码 Bug 或恢复下游服务
  4. 重播(Replay):通过工具将 DLQ 消息重新发送回原 Topic
  5. 推进 Offset:处理完毕后,推进 DLQ 的 Offset

Kafka 的不可变日志设计

不像RabbitMQ,Kafka 不支持删除特定消息(类似录音带,只能追加)。处理过的死信通过推进 Offset 标记为已处理,物理数据保留在磁盘,直到达到保留策略(如 14 天)后自动清理。

这种设计保留了审计和追溯能力,允许后续重新查看历史死信。

方案选择建议

官方微服务模块 vs 原生 kafkajs:官方模块开发体验好(装饰器、自动路由),但跨语言通信需自定义序列化器;原生 kafkajs 完全掌控但代码繁琐。

推荐方案:90% 场景使用官方模块 + 自定义序列化器,享受 NestJS 生态优势的同时抹平框架差异。只在数据密集型或需要精细控制 Offset 的场景使用原生封装。