在公司某个人才匹配系统 Code Review 时,看到同事写的一个 EmployeeScoreCalculator 类disable lint。打开一看,3800 行,所有算分逻辑全在一个类里:先算职级匹配,再算技能,再算地域,再算可用性,中间还有一堆 if 判断要不要提前退出。改一个算分维度,得在几千行里找代码;写单测只能端到端跑,根本不知道哪个环节出问题。

新需求要修改「技能匹配度」的逻辑,同事说要改核心类,可能影响现有逻辑,不敢动。

不想搞复杂设计

工作多年后,很反对过度设计。见过太多项目为了「优雅」硬套设计模式,结果代码绕来绕去,新人看半天看不懂。团队 Code Review 我一般只强调两点:代码简洁、逻辑清晰,不会去要求用什么模式。

但 3800 行真必须拆。这里算分逻辑就是顺序执行几个步骤,中间可能提前退出。明显适合 Pipeline 或者 责任链模式

Pipeline Pattern 是什么

Pipeline Pattern(管道模式)把复杂流程拆成多个独立的 Stage(阶段),数据按顺序流过这些 Stage,每个 Stage 处理完传给下一个。像工厂流水线:原料 → 切割 → 打磨 → 组装 → 包装,每个工位只管自己的活。

核心思路:

  • 数据流转:通过 context 对象在各 Stage 间传递
  • 职责独立:每个 Stage 只做一件事
  • 顺序执行:Stage 按添加顺序依次处理
  • 可提前退出:某个 Stage 可以标记退出,跳过后续步骤

为什么我们这适合Pipeline模式

责任链模式太重:要定义抽象基类、实现 setNext()、处理链路断开,写完代码量翻倍。不需要责任链那种「每个节点决定要不要往下传」的灵活性,直接用 Pipeline 就够了 —— 写几个类,.addStage() 串起来,清楚直接。

看两个实际执行例子就明白了:

提前退出(节省 70% 计算)

{
  "totalScore": 0.0,
  "details": {
    "role": { "score": 0.0 },
    "grade": { "score": 0.0 },
    "location": { "score": 0.0 },
    "source": { "score": 0.0 },
    // ✅ Stage 2 检测到 basicSum = 0,提前退出
    // 后续维度直接设为 null,不执行计算
    "skills": null,
    "availability": null,
    "archetype": null,
    "industry": null
  }
}
// 省了技能匹配(查数据库)、可用性计算(查日历)

完整计算

{
  "totalScore": 0.88,
  "details": {
    // Stage 1: 基础维度
    "role": { "score": 1.0 },
    "grade": { "score": 0.75 },
    "location": { "score": 1.0 },
    "source": { "score": 0.85 },
    // Stage 2: basicSum > 0,继续执行
    // Stage 3: 扩展维度
    "skills": { "score": 0.82 },
    "availability": { "score": 0.9 },
    "archetype": { "score": 1.0 },
    "industry": { "score": 0.5 }
  }
}

实际运行时,约 30% 的候选人在 Stage 2 就提前退出了,省了技能匹配(查数据库、遍历技能关系图)和可用性计算(查日历、算工作量)。

实际运行时,约 30% 的候选人在 Stage 2 就提前退出了,省了技能匹配(查数据库、遍历技能关系图)和可用性计算(查日历、算工作量)。

拆成 Pipeline

算分逻辑本身就是顺序的:基础维度(职级、地域)→ 判断要不要提前退出 → 扩展维度(技能、项目)→ 聚合分数。拆成独立的 Stage,Pipeline 负责串起来。

代码实现

Stage 接口(定义统一规范):

interface Stage {
  execute(ctx: Context): Promise<Context>;
}

Pipeline 编排器(核心就 20 行):

class ScoringPipeline {
  private stages: Stage[] = [];
 
  addStage(stage: Stage): this {
    this.stages.push(stage);
    return this;  // 链式调用
  }
 
  async execute(context: Context): Promise<Context> {
    for (const stage of this.stages) {
      // 关键:context 在各 Stage 间传递
      // 每个 Stage 拿到 context,处理完返回新的 context
      context = await stage.execute(context);
      
      // 关键:检查提前退出标记
      // 任何 Stage 都可以设置 earlyExit,跳过后续所有 Stage
      if (context.earlyExit) break;
    }
    return context;
  }
}

主计算器(链式调用串起 Stage):

class EmployeeScoreCalculator {
  private pipeline: ScoringPipeline;
 
  constructor() {
    this.pipeline = new ScoringPipeline()
      .addStage(new CalculateBasicScoresStage())
      .addStage(new CheckEarlyExitStage())
      .addStage(new CalculateExtendedScoresStage())
      .addStage(new AggregateScoresStage());
  }
 
  async calculate(employee: Employee, request: Request) {
    const context: Context = { employee, request };
    const result = await this.pipeline.execute(context);
    return { totalScore: result.totalScore, details: result.scoreDetails };
  }
}

Stage 定义(每个 Stage 是独立的类):

// Stage 1: 基础维度
class CalculateBasicScoresStage implements Stage {
  async execute(ctx: Context): Promise<Context> {
    const basicScores = {
      level: calcLevelScore(ctx.employee, ctx.request),
      location: calcLocationScore(ctx.employee, ctx.request),
      // ... 其他基础维度
    };
    const basicSum = Object.values(basicScores)
      .reduce((sum, s) => sum + s.score, 0);
    
    // 返回新的 context,包含计算结果
    // 下一个 Stage 会拿到这些数据
    return { ...ctx, basicScores, basicSum };
  }
}
 
// Stage 2: 检查提前退出
class CheckEarlyExitStage implements Stage {
  async execute(ctx: Context): Promise<Context> {
    if (ctx.basicSum === 0) {
      // 设置 earlyExit 标记,Pipeline 会跳过后续 Stage
      return { ...ctx, earlyExit: true, totalScore: 0 };
    }
    // 不退出,继续执行下一个 Stage
    return ctx;
  }
}
 
// Stage 3: 扩展维度
class CalculateExtendedScoresStage implements Stage {
  async execute(ctx: Context): Promise<Context> {
    // 如果已经提前退出,直接透传 context
    if (ctx.earlyExit) return ctx;
    
    const extendedScores = {
      skills: calcSkillScore(ctx.employee, ctx.request),
      // ... 其他扩展维度
    };
    return { ...ctx, extendedScores };
  }
}
 
// Stage 4: 聚合
class AggregateScoresStage implements Stage {
  async execute(ctx: Context): Promise<Context> {
    if (ctx.earlyExit) return ctx;
    
    // 使用前面 Stage 传过来的数据
    const allScores = { ...ctx.basicScores, ...ctx.extendedScores };
    const totalScore = Object.values(allScores)
      .reduce((sum, s) => sum + s.score * s.weight, 0);
    
    return { ...ctx, totalScore };
  }
}

改完的效果

改动之前之后
加新算分维度在 3800 行方法里插代码,怕影响其他逻辑写新 Stage,addStage() 加进去
写单测只能端到端测,mock 一堆数据每个 Stage 单独测,mock context 就行
调试console.log 满天飞找哪步出错看是哪个 Stage 返回的结果不对
可读性一个类 3800 行每个 Stage 30-50 行

什么时候用 Pipeline

不是所有代码都该拆 Pipeline,但符合这几点就很合适:

  • 顺序执行多步骤:数据要经过好几个处理阶段
  • 各步逻辑独立:Stage A 不需要知道 Stage C 怎么实现
  • 可能提前退出:某些条件满足就不用跑后面的步骤
  • 经常加新步骤:需求变化频繁,老是要插新逻辑

我们的算分场景完美匹配:基础算分 → 判断退出 → 扩展算分 → 聚合,每步职责清楚,还能提前退出省计算。拆成 Pipeline 后,每个 Stage 测试覆盖率都能到 90% 以上,改代码也不怕影响其他维度。

类似的场景还有数据处理管道、HTTP 中间件链、编译器 Pass —— 都是「数据流过多个处理阶段」的结构。