当多个请求同时修改同一份数据时,如果没有并发控制机制,就会出现 Race Condition(竞态条件)。锁是解决并发问题的核心机制。
锁是什么
锁(Lock)的本质是协调机制,用来保证多个操作者在访问共享资源时不会互相干扰。
想象一个场景:100 个人同时修改一个文档,没有任何规则,最后文档会变成什么样?锁就是这个”规则”:
- 互斥(Mutual Exclusion):同一时刻,只有一个人能修改数据
- 有序化(Serialization):把并发请求变成串行执行
- 原子性保证(Atomicity):复合操作要么全部成功,要么全部失败
在编程中,锁的核心目标是让 Read-Check-Write 这种复合操作变成原子的。
Race Condition 问题
Node.js 虽然是单线程,但 await 会让出控制权,让事件循环处理其他请求:
async function processData(id: string) {
const data = await db.findOne({ id });
// ⬆️ await 让出控制权,其他请求可以执行到这里
if (data.count > 0) {
// ⬆️ 100 个并发请求都读到 count=100,全部通过检查
await db.update({ id }, { count: data.count - 1 });
// ⬆️ 100 个请求都执行 update,最终 count 变成负数
}
}核心问题:Read-Check-Write 操作不是原子性的。
几种锁方案
速览
| 方案 | 复杂度 | 吞吐量 | 多服务实例 | 多数据库实例 | 适用场景 |
|---|---|---|---|---|---|
| 数据库原子操作 | 简单 | 最高 | ✅ | ❌ | 简单扣减(库存 -1)、单表,只能处理简单的场景 |
| 乐观锁(Version) | 中等 | 高(低冲突) | ✅ | ✅ | 读多写少(文章编辑)、冲突 < 10% |
| 应用层内存锁 | 简单 | 低 | ❌ | ❌ | 单机(定时任务)、低并发 < 100 QPS |
| 悲观锁(SELECT FOR UPDATE) | 中等 | 低 | ✅ | ❌ | 高冲突(订单支付)、单数据库 |
| Redis 分布式锁 | 高 | 高 | ✅ | ✅ | 跨实例协调(秒杀活动)、跨服务 |
下面按从简单到复杂的顺序介绍几种常见的锁方案,从最简单的数据库原子操作开始,逐步到需要外部依赖的分布式锁。
1. 数据库原子操作
严格来说,这不是”锁”,而是利用数据库的**原子性(Atomicity)**来避免并发问题。这是最简单的方案,如果能用,优先用这个。
数据库保证:单条 SQL 语句的执行是原子的,不会被打断。如果能把”检查条件 + 更新数据”合并到一条 SQL,就不需要显式加锁了。
async function processData(id: string) {
// 一条 SQL 完成检查和更新
const result = await db.query(`
UPDATE data
SET count = count - 1
WHERE id = $1 AND count > 0
RETURNING *
`, [id]);
if (result.rowCount === 0) {
throw new Error('数量不足');
}
}关键点:
- 数据库保证
WHERE count > 0的检查和count - 1的更新是原子的 - 不会出现两个请求同时通过检查的情况
- 性能最高(无需额外的锁操作)
2. 乐观锁(Optimistic Lock)
乐观锁的哲学是”先做再说”,假设大部分时候不会有冲突。就像多人协作编辑文档,大家先各自改,提交时系统检查有没有冲突,有冲突就重新编辑。
核心机制:版本号(Version)。每条数据都有一个版本号,每次更新版本号 +1。更新时必须带上读取时的版本号,如果版本号对不上,说明被别人改过了,更新失败。
interface Data {
id: string;
count: number;
version: number; // 每次更新 +1
}
async function processData(id: string) {
const data = await db.findOne({ id });
if (data.count <= 0) {
throw new Error('数量不足');
}
// 更新时带上版本号条件
const updated = await db.updateOne(
{
id,
version: data.version, // WHERE version = 旧版本号
count: { $gt: 0 }
},
{
$inc: { count: -1, version: 1 } // count-1, version+1
}
);
if (updated.modifiedCount === 0) {
throw new Error('操作失败,请重试');
}
}3. 应用层内存锁
3. 应用层内存锁
应用层内存锁是在单个应用实例内部使用内存变量来实现的锁,适用于单机部署或低并发场景。
核心思想:用内存锁保证同一时刻只有一个请求在操作共享资源。
实现
import AsyncLock from 'async-lock';
const lock = new AsyncLock();
async function processData(id: string) {
// 按资源 ID 加锁,不同资源可以并发
return await lock.acquire(`resource:${id}`, async () => {
const data = await db.findOne({ id });
if (data.count > 0) {
await db.update({ id }, { count: data.count - 1 });
}
});
}关键限制:
- ❌ 只在单个实例内有效(多实例部署会失效)
- ❌ 进程重启后锁丢失
4. 悲观锁(Pessimistic Lock)
悲观锁与乐观锁相反,假设冲突一定会发生,所以先把数据锁住,不让别人碰。类似公共厕所,进去先锁门,用完再开门,保证同一时间只有一个人使用。
核心思想:独占式访问。谁先拿到锁,谁就独占资源,其他人必须等待。数据库的行锁(Row Lock)就是典型的悲观锁。
SELECT ... FOR UPDATE 详解
数据库提供的 SELECT ... FOR UPDATE 是悲观锁的标准实现,PostgreSQL、MySQL、Oracle 都支持。
基础语法(PostgreSQL)
async function processData(id: string) {
// 1. 开启事务
await db.query('BEGIN');
// 2. 锁住这一行(其他事务的 UPDATE/DELETE/SELECT FOR UPDATE 都会等待)
const result = await db.query(
'SELECT count FROM data WHERE id = $1 FOR UPDATE',
[id]
);
// 3. 业务逻辑:检查和更新
if (result.rows[0].count > 0) {
await db.query('UPDATE data SET count = count - 1 WHERE id = $1', [id]);
}
// 4. 提交事务(锁在这里释放)
await db.query('COMMIT');
}锁的生命周期
时刻 1: BEGIN # 开启事务
时刻 2: SELECT ... FOR UPDATE # 加锁(从这里开始,其他事务等待)
时刻 3: 业务逻辑处理 # 锁仍然存在
时刻 4: UPDATE ... # 锁仍然存在(不是这里释放)
时刻 5: COMMIT # 锁在这里释放关键点:
- 锁从
SELECT FOR UPDATE开始 - 锁到
COMMIT或ROLLBACK结束 - 即使不执行
UPDATE,锁也会持续到事务结束
PostgreSQL 增强语法
-- FOR UPDATE NOWAIT:如果有锁就立即报错,不等待
SELECT * FROM data WHERE id = 1 FOR UPDATE NOWAIT;
-- FOR UPDATE SKIP LOCKED:跳过被锁的行(秒杀场景)
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE SKIP LOCKED;
-- FOR SHARE:共享锁,允许其他事务读但不能写
SELECT * FROM data WHERE id = 1 FOR SHARE;FOR UPDATE SKIP LOCKED 秒杀场景优化
SKIP LOCKED 是 PostgreSQL(9.5+)和 MySQL(8.0+)提供的强大特性,专门为高并发场景设计。
问题场景:普通 FOR UPDATE 会排队等待
-- 秒杀场景:100 个用户同时抢 10 个商品
-- 用户 A
BEGIN;
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE;
-- 锁住 10 行,处理中...
-- 用户 B(几乎同时)
BEGIN;
SELECT * FROM products WHERE stock > 0 LIMIT 10 FOR UPDATE;
-- 卡住!等待用户 A 释放锁,即使还有其他商品也在等待SKIP LOCKED 解决方案
async function grabProducts(userId: number, count: number) {
// 1. 开启事务
await db.query('BEGIN');
// 2. 关键:SKIP LOCKED 跳过被锁的行
const result = await db.query(`
SELECT id, name, stock
FROM products
WHERE stock > 0
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [count]);
// 3. 如果没有可用商品(全被锁了)
if (result.rows.length === 0) {
await db.query('ROLLBACK');
return { success: false };
}
// 4. 扣减库存 + 创建订单
await db.query('UPDATE products SET stock = stock - 1 WHERE ...');
await db.query('INSERT INTO orders ...');
// 5. 提交事务
await db.query('COMMIT');
return { success: true, products: result.rows };
}并发效果对比
假设 20 个商品,100 个用户同时抢购(每人抢 10 个):
# 普通 FOR UPDATE:排队等待
用户 A: 0-100ms (锁住 id 1-10)
用户 B: 100-200ms (锁住 id 11-20,等了 100ms)
用户 C: 200ms+ (没商品了)
总耗时:200ms,成功 2 人
# FOR UPDATE SKIP LOCKED:并行抢购
用户 A: 0-100ms (锁住 id 1-10)
用户 B: 0-100ms (锁住 id 11-20,并行!)
用户 C: 0-100ms (返回 0 行,但不阻塞)
总耗时:100ms,成功 2 人,响应快 50%适用场景:
- ✅ 秒杀抢购(商品多,用户多)
- ✅ 批量任务分配(多个 Worker 处理队列)
- ✅ 座位选择(电影院、火车票)
- ❌ 单个资源抢占(只有 1 个商品,SKIP 没意义)
- ❌ 需要严格顺序(FIFO 队列)
5. Redis 分布式锁
分布式锁是为了解决”多个应用实例”之间的协调问题。当你的服务部署了多个实例(比如 3 个 Node.js 进程),数据库锁和应用层内存锁都只能管住单个实例内部的并发,实例之间还是会冲突。
这时需要一个外部的、所有实例都认可的”裁判”,Redis 就是这个裁判。所有实例通过 Redis 来”抢锁”,谁抢到谁执行,其他人等待。
单 Redis 实例:SET NX/PX
SET 命令的原子性参数
Redis 的 SET 命令支持两个关键参数:
- NX(Not eXists):只在 key 不存在时设置成功
- PX milliseconds:设置过期时间(毫秒)
SET lock:resource:123 "owner-uuid" NX PX 5000这条命令的含义:
- 尝试设置 key
lock:resource:123 - 只有 key 不存在时才设置成功(NX = 抢锁)
- 同时设置 5 秒过期(PX = 防止死锁)
- 返回值:成功返回
OK,失败返回null
为什么这就是锁?
- 互斥性:同一时刻只有一个客户端能
SET NX成功 - 防死锁:
PX保证锁会自动过期 - 原子性:
SET NX PX是单条命令,不会被打断
基础实现
import Redis from 'ioredis';
const redis = new Redis();
async function processData(id: string) {
const lockKey = `lock:data:${id}`;
const lockValue = `${Date.now()}-${Math.random()}`; // 唯一标识
// 尝试加锁(5 秒过期)
const acquired = await redis.set(lockKey, lockValue, 'PX', 5000, 'NX');
if (!acquired) {
throw new Error('获取锁失败,请重试');
}
try {
// 业务逻辑
const data = await db.findOne({ id });
if (data.count > 0) {
await db.update({ id }, { count: data.count - 1 });
}
} finally {
// 释放锁:用 Lua 确保只删除自己的锁
await redis.eval(
`if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end`,
1, lockKey, lockValue
);
}
}为什么释放锁要用 Lua 脚本?
如果直接用 redis.del(lockKey),会有以下问题:
// ❌ 错误示例
async function unlock(lockKey: string) {
await redis.del(lockKey); // 可能删除别人的锁!
}
// 时序图:
// 时刻 1: 客户端 A 获取锁
// 时刻 2: 客户端 A 业务超时,锁自动过期
// 时刻 3: 客户端 B 获取锁(同一个 key)
// 时刻 4: 客户端 A 执行 del(删除了 B 的锁!)Lua 脚本的作用:
- 原子性:
GET + DEL合并为一条命令 - 安全性:只删除自己加的锁(通过对比
lockValue)
如何延长锁?
如果业务逻辑可能超过锁的过期时间,可以用 PEXPIRE 续期:
async function processWithLongTask(id: string) {
const lockKey = `lock:data:${id}`;
const lockValue = `${Date.now()}-${Math.random()}`;
// 初始加锁 5 秒
const acquired = await redis.set(lockKey, lockValue, 'PX', 5000, 'NX');
if (!acquired) throw new Error('获取锁失败');
// 每 2 秒续期 5 秒
const interval = setInterval(async () => {
// 用 Lua 确保是自己的锁才续期
await redis.eval(
`if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end`,
1, lockKey, lockValue, 5000
);
}, 2000);
try {
await slowOperation();
} finally {
clearInterval(interval);
// 释放锁(同样用 Lua)
await redis.eval(
`if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end`,
1, lockKey, lockValue
);
}
}关键点:
- 续期前检查
lockValue,避免续期别人的锁 - 续期间隔应小于锁的过期时间(推荐 1/2 或 1/3)
多 Redis 实例:Redlock 算法
为什么需要 Redlock?
单 Redis 有单点故障风险:Redis 宕机 = 所有锁失效。Redlock 通过多个独立 Redis 节点提升可用性。
核心原理
- 向 N 个独立 Redis 节点申请锁(每个节点用
SET NX PX) - 只要**超过半数(N/2 + 1)**成功,就算获取锁成功
- 失败时删除所有已获取的锁(回滚)
- 释放时用 Lua 脚本删除所有节点的锁
为什么是”超过半数”?
- 防止脑裂:两个客户端不可能同时获取超过半数的锁
- 容错性:3 个节点可容忍 1 个宕机,5 个节点可容忍 2 个宕机
生产环境使用
直接用成熟库 redlock,无需自己实现:
import Redlock from 'redlock';
import Redis from 'ioredis';
const redlock = new Redlock(
[new Redis(6379, 'redis1'), new Redis(6379, 'redis2'), new Redis(6379, 'redis3')],
{ retryCount: 3, retryDelay: 200 }
);
const lock = await redlock.acquire([`lock:${id}`], 5000);
try {
// 业务逻辑
} finally {
await lock.release();
}最佳实践
1. 选择合适的锁粒度
// ❌ 粗粒度锁:所有资源共用一把锁
await redis.set('lock:global', value, 'PX', 5000, 'NX');
// ✅ 细粒度锁:按资源 ID 加锁,不同资源可并发
await redis.set(`lock:data:${id}`, value, 'PX', 5000, 'NX');2. 缩短锁持有时间
// ❌ 锁住所有操作(包括慢操作)
const acquired = await redis.set(lockKey, value, 'PX', 10000, 'NX');
try {
await db.update({ ... });
await sendEmail(); // 耗时操作
} finally {
await unlock();
}
// ✅ 只锁关键操作
const acquired = await redis.set(lockKey, value, 'PX', 1000, 'NX');
try {
await db.update({ ... });
} finally {
await unlock();
}
await sendEmail(); // 耗时操作放到锁外面3. 避免死锁
// ❌ 可能死锁(两个请求反向加锁)
async function transfer(from: string, to: string) {
await redis.set(`lock:${from}`, v1, 'PX', 1000, 'NX');
await redis.set(`lock:${to}`, v2, 'PX', 1000, 'NX');
}
// ✅ 按顺序加锁
async function transfer(from: string, to: string) {
const [id1, id2] = [from, to].sort(); // 统一顺序
await redis.set(`lock:${id1}`, v1, 'PX', 1000, 'NX');
await redis.set(`lock:${id2}`, v2, 'PX', 1000, 'NX');
}决策指南
| 你的情况 | 推荐方案 | 理由 |
|---|---|---|
| 简单扣减场景(如库存 -1) | 数据库原子操作 | 最简单,一条 SQL 搞定 |
| 单数据库实例 + 低并发(<500 QPS) | 乐观锁(Version) | 简单,无需事务 |
| 单数据库实例 + 中并发(500-1000 QPS) | SELECT FOR UPDATE | 强一致性,无需重试 |
| 多实例部署 + 需要跨实例协调 | Redis 分布式锁 + 同步落库 | 数据库锁只管单实例,Redis 管全局 |
| 极高并发(>1000 QPS) | Redis 原子扣减 + MQ异步落库 | 参见 three-level-cache-timeliness |
实战建议
从简单方案开始,观察指标决定是否升级:
- 能用原子操作就用原子操作(最简单)
- 冲突率低(<10%) → 乐观锁
- 冲突率高(>50%) → 悲观锁
- 多实例部署 → Redis 分布式锁(应用层锁失效)
过早优化是万恶之源,先用简单方案跑起来,有问题再优化!