当多个请求同时修改同一份数据时,如果没有并发控制机制,就会出现 Race Condition(竞态条件)。锁是解决并发问题的核心机制。

锁是什么

锁(Lock)的本质是协调机制,用来保证多个操作者在访问共享资源时不会互相干扰。

想象一个场景:100 个人同时修改一个文档,没有任何规则,最后文档会变成什么样?锁就是这个”规则”:

  • 互斥(Mutual Exclusion):同一时刻,只有一个人能修改数据
  • 有序化(Serialization):把并发请求变成串行执行
  • 原子性保证(Atomicity):复合操作要么全部成功,要么全部失败

在编程中,锁的核心目标是让 Read-Check-Write 这种复合操作变成原子的

Race Condition 问题

Node.js 虽然是单线程,但 await 会让出控制权,让事件循环处理其他请求:

async function processData(id: string) {
  const data = await db.findOne({ id });
  // ⬆️ await 让出控制权,其他请求可以执行到这里
  
  if (data.count > 0) {
    // ⬆️ 100 个并发请求都读到 count=100,全部通过检查
    await db.update({ id }, { count: data.count - 1 });
    // ⬆️ 100 个请求都执行 update,最终 count 变成负数
  }
}

核心问题:Read-Check-Write 操作不是原子性的

几种锁方案

速览

方案复杂度吞吐量多服务实例多数据库实例适用场景
数据库原子操作简单最高简单扣减(库存 -1)、单表,只能处理简单的场景
乐观锁(Version)中等高(低冲突)读多写少(文章编辑)、冲突 < 10%
应用层内存锁简单单机(定时任务)、低并发 < 100 QPS
悲观锁(SELECT FOR UPDATE)中等高冲突(订单支付)、单数据库
Redis 分布式锁跨实例协调(秒杀活动)、跨服务

下面按从简单到复杂的顺序介绍几种常见的锁方案,从最简单的数据库原子操作开始,逐步到需要外部依赖的分布式锁。

1. 数据库原子操作

严格来说,这不是”锁”,而是利用数据库的**原子性(Atomicity)**来避免并发问题。这是最简单的方案,如果能用,优先用这个。

数据库保证:单条 SQL 语句的执行是原子的,不会被打断。如果能把”检查条件 + 更新数据”合并到一条 SQL,就不需要显式加锁了。

async function processData(id: string) {
  // 一条 SQL 完成检查和更新
  const result = await db.query(`
    UPDATE data 
    SET count = count - 1 
    WHERE id = $1 AND count > 0
    RETURNING *
  `, [id]);
  
  if (result.rowCount === 0) {
    throw new Error('数量不足');
  }
}

关键点

  • 数据库保证 WHERE count > 0 的检查和 count - 1 的更新是原子的
  • 不会出现两个请求同时通过检查的情况
  • 性能最高(无需额外的锁操作)

2. 乐观锁(Optimistic Lock)

乐观锁的哲学是”先做再说”,假设大部分时候不会有冲突。就像多人协作编辑文档,大家先各自改,提交时系统检查有没有冲突,有冲突就重新编辑。

核心机制:版本号(Version)。每条数据都有一个版本号,每次更新版本号 +1。更新时必须带上读取时的版本号,如果版本号对不上,说明被别人改过了,更新失败。

interface Data {
  id: string;
  count: number;
  version: number;  // 每次更新 +1
}
 
async function processData(id: string) {
  const data = await db.findOne({ id });
  
  if (data.count <= 0) {
    throw new Error('数量不足');
  }
  
  // 更新时带上版本号条件
  const updated = await db.updateOne(
    { 
      id, 
      version: data.version,  // WHERE version = 旧版本号
      count: { $gt: 0 }
    },
    { 
      $inc: { count: -1, version: 1 }  // count-1, version+1
    }
  );
  
  if (updated.modifiedCount === 0) {
    throw new Error('操作失败,请重试');
  }
}

3. 应用层内存锁

3. 应用层内存锁

应用层内存锁是在单个应用实例内部使用内存变量来实现的锁,适用于单机部署或低并发场景。

核心思想:用内存锁保证同一时刻只有一个请求在操作共享资源

实现

import AsyncLock from 'async-lock';
 
const lock = new AsyncLock();
 
async function processData(id: string) {
  // 按资源 ID 加锁,不同资源可以并发
  return await lock.acquire(`resource:${id}`, async () => {
    const data = await db.findOne({ id });
    if (data.count > 0) {
      await db.update({ id }, { count: data.count - 1 });
    }
  });
}

关键限制

  • ❌ 只在单个实例内有效(多实例部署会失效)
  • ❌ 进程重启后锁丢失

4. 悲观锁(Pessimistic Lock)

悲观锁与乐观锁相反,假设冲突一定会发生,所以先把数据锁住,不让别人碰。类似公共厕所,进去先锁门,用完再开门,保证同一时间只有一个人使用。

核心思想:独占式访问。谁先拿到锁,谁就独占资源,其他人必须等待。数据库的行锁(Row Lock)就是典型的悲观锁。

SELECT ... FOR UPDATE 详解

数据库提供的 SELECT ... FOR UPDATE 是悲观锁的标准实现,PostgreSQL、MySQL、Oracle 都支持。

基础语法(PostgreSQL)

async function processData(id: string) {
  // 1. 开启事务
  await db.query('BEGIN');
  
  // 2. 锁住这一行(其他事务的 UPDATE/DELETE/SELECT FOR UPDATE 都会等待)
  const result = await db.query(
    'SELECT count FROM data WHERE id = $1 FOR UPDATE',
    [id]
  );
  
  // 3. 业务逻辑:检查和更新
  if (result.rows[0].count > 0) {
    await db.query('UPDATE data SET count = count - 1 WHERE id = $1', [id]);
  }
  
  // 4. 提交事务(锁在这里释放)
  await db.query('COMMIT');
}

锁的生命周期

时刻 1: BEGIN                      # 开启事务
时刻 2: SELECT ... FOR UPDATE      # 加锁(从这里开始,其他事务等待)
时刻 3: 业务逻辑处理                # 锁仍然存在
时刻 4: UPDATE ...                 # 锁仍然存在(不是这里释放)
时刻 5: COMMIT                     # 锁在这里释放

关键点

  • 锁从 SELECT FOR UPDATE 开始
  • 锁到 COMMITROLLBACK 结束
  • 即使不执行 UPDATE,锁也会持续到事务结束

PostgreSQL 增强语法

-- FOR UPDATE NOWAIT:如果有锁就立即报错,不等待
SELECT * FROM data WHERE id = 1 FOR UPDATE NOWAIT;
 
-- FOR UPDATE SKIP LOCKED:跳过被锁的行(秒杀场景)
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE SKIP LOCKED;
 
-- FOR SHARE:共享锁,允许其他事务读但不能写
SELECT * FROM data WHERE id = 1 FOR SHARE;

FOR UPDATE SKIP LOCKED 秒杀场景优化

SKIP LOCKED 是 PostgreSQL(9.5+)和 MySQL(8.0+)提供的强大特性,专门为高并发场景设计。

问题场景:普通 FOR UPDATE 会排队等待

-- 秒杀场景:100 个用户同时抢 10 个商品
-- 用户 A
BEGIN;
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE;
-- 锁住 10 行,处理中...
 
-- 用户 B(几乎同时)
BEGIN;
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE;
-- 卡住!等待用户 A 释放锁,即使还有其他商品也在等待

SKIP LOCKED 解决方案

async function grabProducts(userId: number, count: number) {
  // 1. 开启事务
  await db.query('BEGIN');
  
  // 2. 关键:SKIP LOCKED 跳过被锁的行
  const result = await db.query(`
    SELECT id, name, stock 
    FROM products 
    WHERE stock > 0 
    LIMIT $1 
    FOR UPDATE SKIP LOCKED
  `, [count]);
  
  // 3. 如果没有可用商品(全被锁了)
  if (result.rows.length === 0) {
    await db.query('ROLLBACK');
    return { success: false };
  }
  
  // 4. 扣减库存 + 创建订单
  await db.query('UPDATE products SET stock = stock - 1 WHERE ...');
  await db.query('INSERT INTO orders ...');
  
  // 5. 提交事务
  await db.query('COMMIT');
  
  return { success: true, products: result.rows };
}

并发效果对比

假设 20 个商品,100 个用户同时抢购(每人抢 10 个):

# 普通 FOR UPDATE:排队等待
用户 A: 0-100ms   (锁住 id 1-10)
用户 B: 100-200ms (锁住 id 11-20,等了 100ms)
用户 C: 200ms+    (没商品了)
总耗时:200ms,成功 2
 
# FOR UPDATE SKIP LOCKED:并行抢购
用户 A: 0-100ms (锁住 id 1-10)
用户 B: 0-100ms (锁住 id 11-20,并行!)
用户 C: 0-100ms (返回 0 行,但不阻塞)
总耗时:100ms,成功 2 人,响应快 50%

适用场景

  • ✅ 秒杀抢购(商品多,用户多)
  • ✅ 批量任务分配(多个 Worker 处理队列)
  • ✅ 座位选择(电影院、火车票)
  • ❌ 单个资源抢占(只有 1 个商品,SKIP 没意义)
  • ❌ 需要严格顺序(FIFO 队列)

5. Redis 分布式锁

分布式锁是为了解决”多个应用实例”之间的协调问题。当你的服务部署了多个实例(比如 3 个 Node.js 进程),数据库锁和应用层内存锁都只能管住单个实例内部的并发,实例之间还是会冲突。

这时需要一个外部的、所有实例都认可的”裁判”,Redis 就是这个裁判。所有实例通过 Redis 来”抢锁”,谁抢到谁执行,其他人等待。

单 Redis 实例:SET NX/PX

SET 命令的原子性参数

Redis 的 SET 命令支持两个关键参数:

  • NX(Not eXists):只在 key 不存在时设置成功
  • PX milliseconds:设置过期时间(毫秒)
SET lock:resource:123 "owner-uuid" NX PX 5000

这条命令的含义:

  1. 尝试设置 key lock:resource:123
  2. 只有 key 不存在时才设置成功(NX = 抢锁)
  3. 同时设置 5 秒过期(PX = 防止死锁)
  4. 返回值:成功返回 OK,失败返回 null

为什么这就是锁?

  • 互斥性:同一时刻只有一个客户端能 SET NX 成功
  • 防死锁PX 保证锁会自动过期
  • 原子性SET NX PX 是单条命令,不会被打断

基础实现

import Redis from 'ioredis';
 
const redis = new Redis();
 
async function processData(id: string) {
  const lockKey = `lock:data:${id}`;
  const lockValue = `${Date.now()}-${Math.random()}`; // 唯一标识
  
  // 尝试加锁(5 秒过期)
  const acquired = await redis.set(lockKey, lockValue, 'PX', 5000, 'NX');
  
  if (!acquired) {
    throw new Error('获取锁失败,请重试');
  }
  
  try {
    // 业务逻辑
    const data = await db.findOne({ id });
    if (data.count > 0) {
      await db.update({ id }, { count: data.count - 1 });
    }
  } finally {
    // 释放锁:用 Lua 确保只删除自己的锁
    await redis.eval(
      `if redis.call("get", KEYS[1]) == ARGV[1] then
         return redis.call("del", KEYS[1])
       else
         return 0
       end`,
      1, lockKey, lockValue
    );
  }
}

为什么释放锁要用 Lua 脚本?

如果直接用 redis.del(lockKey),会有以下问题:

// ❌ 错误示例
async function unlock(lockKey: string) {
  await redis.del(lockKey);  // 可能删除别人的锁!
}
 
// 时序图:
// 时刻 1: 客户端 A 获取锁
// 时刻 2: 客户端 A 业务超时,锁自动过期
// 时刻 3: 客户端 B 获取锁(同一个 key)
// 时刻 4: 客户端 A 执行 del(删除了 B 的锁!)

Lua 脚本的作用:

  1. 原子性GET + DEL 合并为一条命令
  2. 安全性:只删除自己加的锁(通过对比 lockValue

如何延长锁?

如果业务逻辑可能超过锁的过期时间,可以用 PEXPIRE 续期:

async function processWithLongTask(id: string) {
  const lockKey = `lock:data:${id}`;
  const lockValue = `${Date.now()}-${Math.random()}`;
  
  // 初始加锁 5 秒
  const acquired = await redis.set(lockKey, lockValue, 'PX', 5000, 'NX');
  if (!acquired) throw new Error('获取锁失败');
  
  // 每 2 秒续期 5 秒
  const interval = setInterval(async () => {
    // 用 Lua 确保是自己的锁才续期
    await redis.eval(
      `if redis.call("get", KEYS[1]) == ARGV[1] then
         return redis.call("pexpire", KEYS[1], ARGV[2])
       else
         return 0
       end`,
      1, lockKey, lockValue, 5000
    );
  }, 2000);
  
  try {
    await slowOperation();
  } finally {
    clearInterval(interval);
    // 释放锁(同样用 Lua)
    await redis.eval(
      `if redis.call("get", KEYS[1]) == ARGV[1] then
         return redis.call("del", KEYS[1])
       else
         return 0
       end`,
      1, lockKey, lockValue
    );
  }
}

关键点

  • 续期前检查 lockValue,避免续期别人的锁
  • 续期间隔应小于锁的过期时间(推荐 1/2 或 1/3)

多 Redis 实例:Redlock 算法

为什么需要 Redlock?

单 Redis 有单点故障风险:Redis 宕机 = 所有锁失效。Redlock 通过多个独立 Redis 节点提升可用性。

核心原理

  1. 向 N 个独立 Redis 节点申请锁(每个节点用 SET NX PX
  2. 只要**超过半数(N/2 + 1)**成功,就算获取锁成功
  3. 失败时删除所有已获取的锁(回滚)
  4. 释放时用 Lua 脚本删除所有节点的锁

为什么是”超过半数”?

  • 防止脑裂:两个客户端不可能同时获取超过半数的锁
  • 容错性:3 个节点可容忍 1 个宕机,5 个节点可容忍 2 个宕机

生产环境使用

直接用成熟库 redlock,无需自己实现:

import Redlock from 'redlock';
import Redis from 'ioredis';
 
const redlock = new Redlock(
  [new Redis(6379, 'redis1'), new Redis(6379, 'redis2'), new Redis(6379, 'redis3')],
  { retryCount: 3, retryDelay: 200 }
);
 
const lock = await redlock.acquire([`lock:${id}`], 5000);
try {
  // 业务逻辑
} finally {
  await lock.release();
}

最佳实践

1. 选择合适的锁粒度

// ❌ 粗粒度锁:所有资源共用一把锁
await redis.set('lock:global', value, 'PX', 5000, 'NX');
 
// ✅ 细粒度锁:按资源 ID 加锁,不同资源可并发
await redis.set(`lock:data:${id}`, value, 'PX', 5000, 'NX');

2. 缩短锁持有时间

// ❌ 锁住所有操作(包括慢操作)
const acquired = await redis.set(lockKey, value, 'PX', 10000, 'NX');
try {
  await db.update({ ... });
  await sendEmail();  // 耗时操作
} finally {
  await unlock();
}
 
// ✅ 只锁关键操作
const acquired = await redis.set(lockKey, value, 'PX', 1000, 'NX');
try {
  await db.update({ ... });
} finally {
  await unlock();
}
await sendEmail();  // 耗时操作放到锁外面

3. 避免死锁

// ❌ 可能死锁(两个请求反向加锁)
async function transfer(from: string, to: string) {
  await redis.set(`lock:${from}`, v1, 'PX', 1000, 'NX');
  await redis.set(`lock:${to}`, v2, 'PX', 1000, 'NX');
}
 
// ✅ 按顺序加锁
async function transfer(from: string, to: string) {
  const [id1, id2] = [from, to].sort();  // 统一顺序
  await redis.set(`lock:${id1}`, v1, 'PX', 1000, 'NX');
  await redis.set(`lock:${id2}`, v2, 'PX', 1000, 'NX');
}

决策指南

你的情况推荐方案理由
简单扣减场景(如库存 -1)数据库原子操作最简单,一条 SQL 搞定
单数据库实例 + 低并发(<500 QPS)乐观锁(Version)简单,无需事务
单数据库实例 + 中并发(500-1000 QPS)SELECT FOR UPDATE强一致性,无需重试
多实例部署 + 需要跨实例协调Redis 分布式锁 + 同步落库数据库锁只管单实例,Redis 管全局
极高并发(>1000 QPS)Redis 原子扣减 + MQ异步落库参见 three-level-cache-timeliness

实战建议

从简单方案开始,观察指标决定是否升级:

  • 能用原子操作就用原子操作(最简单)
  • 冲突率低(<10%) → 乐观锁
  • 冲突率高(>50%) → 悲观锁
  • 多实例部署 → Redis 分布式锁(应用层锁失效)

过早优化是万恶之源,先用简单方案跑起来,有问题再优化!