在公司某个人才匹配系统 Code Review 时,看到同事写的一个 EmployeeScoreCalculator 类disable lint。打开一看,3800 行,所有算分逻辑全在一个类里:先算职级匹配,再算技能,再算地域,再算可用性,中间还有一堆 if 判断要不要提前退出。改一个算分维度,得在几千行里找代码;写单测只能端到端跑,根本不知道哪个环节出问题。
新需求要修改「技能匹配度」的逻辑,同事说要改核心类,可能影响现有逻辑,不敢动。
不想搞复杂设计
工作多年后,很反对过度设计。见过太多项目为了「优雅」硬套设计模式,结果代码绕来绕去,新人看半天看不懂。团队 Code Review 我一般只强调两点:代码简洁、逻辑清晰,不会去要求用什么模式。
但 3800 行真必须拆。这里算分逻辑就是顺序执行几个步骤,中间可能提前退出。明显适合 Pipeline 或者 责任链模式
Pipeline Pattern 是什么
Pipeline Pattern(管道模式)把复杂流程拆成多个独立的 Stage(阶段),数据按顺序流过这些 Stage,每个 Stage 处理完传给下一个。像工厂流水线:原料 → 切割 → 打磨 → 组装 → 包装,每个工位只管自己的活。
核心思路:
- 数据流转:通过
context对象在各 Stage 间传递 - 职责独立:每个 Stage 只做一件事
- 顺序执行:Stage 按添加顺序依次处理
- 可提前退出:某个 Stage 可以标记退出,跳过后续步骤
为什么我们这适合Pipeline模式
责任链模式太重:要定义抽象基类、实现 setNext()、处理链路断开,写完代码量翻倍。不需要责任链那种「每个节点决定要不要往下传」的灵活性,直接用 Pipeline 就够了 —— 写几个类,.addStage() 串起来,清楚直接。
看两个实际执行例子就明白了:
提前退出(节省 70% 计算)
{
"totalScore": 0.0,
"details": {
"role": { "score": 0.0 },
"grade": { "score": 0.0 },
"location": { "score": 0.0 },
"source": { "score": 0.0 },
// ✅ Stage 2 检测到 basicSum = 0,提前退出
// 后续维度直接设为 null,不执行计算
"skills": null,
"availability": null,
"archetype": null,
"industry": null
}
}
// 省了技能匹配(查数据库)、可用性计算(查日历)完整计算
{
"totalScore": 0.88,
"details": {
// Stage 1: 基础维度
"role": { "score": 1.0 },
"grade": { "score": 0.75 },
"location": { "score": 1.0 },
"source": { "score": 0.85 },
// Stage 2: basicSum > 0,继续执行
// Stage 3: 扩展维度
"skills": { "score": 0.82 },
"availability": { "score": 0.9 },
"archetype": { "score": 1.0 },
"industry": { "score": 0.5 }
}
}实际运行时,约 30% 的候选人在 Stage 2 就提前退出了,省了技能匹配(查数据库、遍历技能关系图)和可用性计算(查日历、算工作量)。
实际运行时,约 30% 的候选人在 Stage 2 就提前退出了,省了技能匹配(查数据库、遍历技能关系图)和可用性计算(查日历、算工作量)。
拆成 Pipeline
算分逻辑本身就是顺序的:基础维度(职级、地域)→ 判断要不要提前退出 → 扩展维度(技能、项目)→ 聚合分数。拆成独立的 Stage,Pipeline 负责串起来。
代码实现
Stage 接口(定义统一规范):
interface Stage {
execute(ctx: Context): Promise<Context>;
}Pipeline 编排器(核心就 20 行):
class ScoringPipeline {
private stages: Stage[] = [];
addStage(stage: Stage): this {
this.stages.push(stage);
return this; // 链式调用
}
async execute(context: Context): Promise<Context> {
for (const stage of this.stages) {
// 关键:context 在各 Stage 间传递
// 每个 Stage 拿到 context,处理完返回新的 context
context = await stage.execute(context);
// 关键:检查提前退出标记
// 任何 Stage 都可以设置 earlyExit,跳过后续所有 Stage
if (context.earlyExit) break;
}
return context;
}
}主计算器(链式调用串起 Stage):
class EmployeeScoreCalculator {
private pipeline: ScoringPipeline;
constructor() {
this.pipeline = new ScoringPipeline()
.addStage(new CalculateBasicScoresStage())
.addStage(new CheckEarlyExitStage())
.addStage(new CalculateExtendedScoresStage())
.addStage(new AggregateScoresStage());
}
async calculate(employee: Employee, request: Request) {
const context: Context = { employee, request };
const result = await this.pipeline.execute(context);
return { totalScore: result.totalScore, details: result.scoreDetails };
}
}Stage 定义(每个 Stage 是独立的类):
// Stage 1: 基础维度
class CalculateBasicScoresStage implements Stage {
async execute(ctx: Context): Promise<Context> {
const basicScores = {
level: calcLevelScore(ctx.employee, ctx.request),
location: calcLocationScore(ctx.employee, ctx.request),
// ... 其他基础维度
};
const basicSum = Object.values(basicScores)
.reduce((sum, s) => sum + s.score, 0);
// 返回新的 context,包含计算结果
// 下一个 Stage 会拿到这些数据
return { ...ctx, basicScores, basicSum };
}
}
// Stage 2: 检查提前退出
class CheckEarlyExitStage implements Stage {
async execute(ctx: Context): Promise<Context> {
if (ctx.basicSum === 0) {
// 设置 earlyExit 标记,Pipeline 会跳过后续 Stage
return { ...ctx, earlyExit: true, totalScore: 0 };
}
// 不退出,继续执行下一个 Stage
return ctx;
}
}
// Stage 3: 扩展维度
class CalculateExtendedScoresStage implements Stage {
async execute(ctx: Context): Promise<Context> {
// 如果已经提前退出,直接透传 context
if (ctx.earlyExit) return ctx;
const extendedScores = {
skills: calcSkillScore(ctx.employee, ctx.request),
// ... 其他扩展维度
};
return { ...ctx, extendedScores };
}
}
// Stage 4: 聚合
class AggregateScoresStage implements Stage {
async execute(ctx: Context): Promise<Context> {
if (ctx.earlyExit) return ctx;
// 使用前面 Stage 传过来的数据
const allScores = { ...ctx.basicScores, ...ctx.extendedScores };
const totalScore = Object.values(allScores)
.reduce((sum, s) => sum + s.score * s.weight, 0);
return { ...ctx, totalScore };
}
}改完的效果
| 改动 | 之前 | 之后 |
|---|---|---|
| 加新算分维度 | 在 3800 行方法里插代码,怕影响其他逻辑 | 写新 Stage,addStage() 加进去 |
| 写单测 | 只能端到端测,mock 一堆数据 | 每个 Stage 单独测,mock context 就行 |
| 调试 | console.log 满天飞找哪步出错 | 看是哪个 Stage 返回的结果不对 |
| 可读性 | 一个类 3800 行 | 每个 Stage 30-50 行 |
什么时候用 Pipeline
不是所有代码都该拆 Pipeline,但符合这几点就很合适:
- 顺序执行多步骤:数据要经过好几个处理阶段
- 各步逻辑独立:Stage A 不需要知道 Stage C 怎么实现
- 可能提前退出:某些条件满足就不用跑后面的步骤
- 经常加新步骤:需求变化频繁,老是要插新逻辑
我们的算分场景完美匹配:基础算分 → 判断退出 → 扩展算分 → 聚合,每步职责清楚,还能提前退出省计算。拆成 Pipeline 后,每个 Stage 测试覆盖率都能到 90% 以上,改代码也不怕影响其他维度。
类似的场景还有数据处理管道、HTTP 中间件链、编译器 Pass —— 都是「数据流过多个处理阶段」的结构。