Appearance
Redis 分布式锁
1. SETNX
1.1. SETNX 命令介绍
SETNX 是 Redis 提供的 “SET if Not eXists” 命令,用于在键不存在时设置键值,如果键已存在则操作失败。这个特性使其成为实现分布式锁的基础。
- 命令格式:
SETNX key value; - 返回值:成功返回
1,失败返回0。
1.2. 代码示例
Java
public void processWithLock() {
String lockKey = "business:lock";
String FIXED_VALUE = "locked";
Boolean acquired = redisTmpl.setIfAbsent(lockKey, FIXED_VALUE);
if (Boolean.FALSE.equals(acquired)) throw new RuntimeException("获取锁失败");
try {
doBusinessLogic();
} finally {
redisTmpl.delete(lockKey);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
1.3. 存在的问题
try-finally 期间宕机,将产生死锁。如果在执行业务逻辑期间服务宕机或进程崩溃,finally 块中的删除操作无法执行,导致锁永远无法释放,其他线程永远无法获取锁,形成死锁。
2. 锁过期
2.1. 增加设置锁的过期时间
为了避免死锁问题,我们为锁增加过期时间,即使服务宕机,锁也会在过期后自动释放。
2.1.1. 代码示例
Java
public void processWithLock() {
String lockKey = "business:lock";
String FIXED_VALUE = "locked";
Boolean acquired = redisTmpl.setIfAbsent(lockKey, FIXED_VALUE);
if (Boolean.FALSE.equals(acquired)) throw new RuntimeException("获取锁失败");
try {
redisTmpl.expire(lockKey, 30, TimeUnit.SECONDS);
doBusinessLogic();
} finally {
redisTmpl.delete(lockKey);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2.1.2. 存在的问题
setIfAbsent 和 expire 之间宕机,仍会产生死锁。虽然添加了过期时间,但 setIfAbsent 和 expire 是两个独立的操作,如果在这两个操作之间服务宕机,锁仍然不会过期,依然会造成死锁。
2.2. 原子性设置锁的过期时间
使用 Redis 的原子操作,在获取锁的同时设置过期时间,确保操作的原子性。
2.2.1. 代码示例
Java
public void processWithLock() {
String lockKey = "business:lock";
String FIXED_VALUE = "locked";
Boolean acquired = redisTmpl.setIfAbsent(lockKey, FIXED_VALUE, 30, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(acquired)) throw new RuntimeException("获取锁失败");
try {
doBusinessLogic();
} finally {
redisTmpl.delete(lockKey);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
2.2.2. 存在的问题
问题 1:业务执行时间超过锁过期时间,导致并发问题。
如果业务逻辑执行时间超过 30 秒,锁会自动过期,其他线程可以获取锁,导致多个线程同时执行业务逻辑,破坏了互斥性。
问题 2:误删其他线程的锁,进一步加剧并发问题。
- 线程 A 获取锁,开始执行业务;
- 线程 A 的锁过期(业务未执行完);
- 线程 B 成功获取锁,开始执行业务;
- 线程 A 业务执行完毕,在
finally中删除锁(实际删除的是线程 B 的锁); - 线程 C 获取锁,与线程 B 并发执行;
- 并发问题越来越严重...
3. 锁标识
为每个锁设置唯一标识(通常使用 UUID),释放锁时先判断标识是否匹配,避免误删其他线程的锁。
3.1. 代码示例
Java
public void processWithLock() {
String lockKey = "business:lock";
String lockToken = UUID.randomUUID().toString();
Boolean acquired = redisTmpl.setIfAbsent(lockKey, lockToken, 30, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(acquired)) throw new RuntimeException("获取锁失败");
try {
doBusinessLogic();
} finally {
if (lockToken.equals(redisTmpl.get(lockKey))) {
redisTmpl.delete(lockKey);
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
3.2. 存在的问题
问题 1:业务执行时间超过锁过期时间,导致并发问题。
这个问题依然存在,如果业务执行时间过长,锁会自动过期,无法保证互斥性。
问题 2:判断和删除操作不是原子性的。
极端情况下的时序问题:
- 线程 A 执行
lockToken.equals(currentToken)判断,结果为true; - 此时线程 A 的锁刚好过期;
- 线程 B 成功获取锁;
- 线程 A 继续执行
redisTmpl.delete(lockKey),删除了线程 B 的锁。
解决方案:使用 Lua 脚本确保判断和删除的原子性:
Java
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTmpl.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
lockToken);1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4. 锁续期
为了解决业务执行时间超过锁过期时间的问题,我们需要实现锁的自动续期机制。
4.1. Redisson 示例
Redisson 是一个 Redis Java 客户端,提供了开箱即用的分布式锁实现,支持自动续期功能。
4.1.1. Maven 依赖
xml
<!-- Redisson Spring Boot Starter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.52.0</version>
<exclusions>
<exclusion>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-35</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Redisson Spring Data -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-27</artifactId>
<version>3.52.0</version>
</dependency>4.1.2. 代码示例
Java
@Autowired
private RedissonClient redissonClient;
public void processWithLock() {
String lockKey = "business:lock";
RLock lock = redissonClient.getLock(lockKey);
// 尝试获取锁
// waitTime:等待获取锁的最长时间(10 秒)
// leaseTime:不传则使用看门狗自动续期,传了则在指定时间后自动释放
boolean acquired = lock.tryLock(10, TimeUnit.SECONDS);
if (!acquired) throw new RuntimeException("获取锁失败");
try {
// 执行业务逻辑(即使超过 30 秒,锁也会自动续期)
doBusinessLogic();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁被中断", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Redisson 自动续期机制:
- 默认锁过期时间:30 秒(
lockWatchdogTimeout,可通过配置修改); - 续期时间间隔:每 10 秒续期一次(过期时间的 1/3);
- 触发条件:只有在不指定
leaseTime参数时才会启动看门狗自动续期; - 只要业务未执行完且线程未释放锁,看门狗(WatchDog)会持续为锁续期。
4.2. Redisson 源码解读
4.2.1. 加锁流程
Java
public class RedissonLock extends RedissonBaseLock {
// ...
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
// ...
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
// ...
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
关键点:
第 6 行:如果指定了
leaseTime > 0,直接加锁,不会启动看门狗自动续期;第 8-9 行:如果
leaseTime <= 0,使用默认值(30 秒),并准备启动看门狗;第 20 行:获取锁成功后(
ttlRemaining == null),如果未指定leaseTime,则调度自动续期任务;第 29-37 行:使用 Lua 脚本原子性地加锁并设置过期时间:
使用 Hash 结构存储锁,支持可重入(
key为锁名,field为线程标识,value为重入次数);hexists判断当前线程是否已持有锁;hincrby增加重入次数;pexpire设置过期时间(毫秒级)。
4.2.2. 启动续期任务
将当前锁加入到续期队列。
Java
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
// ...
protected void scheduleExpirationRenewal(long threadId) {
renewalScheduler.renewLock(getRawName(), threadId, getLockName(threadId));
}
// ...
}1
2
3
4
5
6
7
2
3
4
5
6
7
Java
public final class LockRenewalScheduler {
// ...
public void renewLock(String name, Long threadId, String lockName) {
reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
LockTask task = reference.get();
task.add(name, lockName, threadId);
}
// ...
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.2.3. 续期任务执行
Java
public class LockTask extends RenewalTask {
// ...
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) return CompletableFuture.completedFuture(null);
Map<String, Long> name2threadId = new HashMap<>(chunkSize);
List<Object> args = new ArrayList<>(chunkSize + 1);
args.add(internalLockLeaseTime);
List<String> keys = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
LockEntry entry = name2entry.get(key);
if (entry == null) {
continue;
}
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
args.add(entry.getLockName(threadId));
name2threadId.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
String firstName = keys.get(0);
CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
"local result = {} " +
"for i = 1, #KEYS, 1 do " +
"if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
"redis.call('pexpire', KEYS[i], ARGV[1]); " +
"table.insert(result, 1); " +
"else " +
"table.insert(result, 0); " +
"end; " +
"end; " +
"return result;",
new ArrayList<>(keys),
args.toArray());
return f.thenCompose(existingNames -> {
keys.removeAll(existingNames);
for (String key : keys) {
cancelExpirationRenewal(key, name2threadId.get(key));
}
return renew(iter, chunkSize);
});
}
public void add(String rawName, String lockName, long threadId) {
LockEntry entry = new LockEntry();
entry.addThreadId(threadId, lockName);
add(rawName, lockName, threadId, entry);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
关键点:
- 第 40-53 行:使用 Lua 脚本批量为锁续期,检查锁是否存在,存在则重置过期时间;
- 第 56-59 行:对于不存在的锁(已释放或过期),取消其续期任务;
- 第 60 行:递归处理下一批。
4.2.4. 定时续期调度
Java
abstract class RenewalTask implements TimerTask {
final CommandAsyncExecutor executor;
AtomicBoolean running = new AtomicBoolean();
final Map<Integer, Set<String>> slot2names = new ConcurrentHashMap<>();
final Map<String, LockEntry> name2entry = new ConcurrentHashMap<>();
// ...
boolean tryRun() {
return running.compareAndSet(false, true);
}
public void schedule() {
if (!running.get()) return;
long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}
final CompletionStage<Void> execute() {
if (name2entry.isEmpty()) return CompletableFuture.completedFuture(null);
if (!executor.getServiceManager().getCfg().isClusterConfig()) {
return renew(name2entry.keySet().iterator(), chunkSize);
}
return renewSlots(slot2names.values().iterator(), chunkSize);
}
private CompletionStage<Void> renewSlots(Iterator<Set<String>> iter, int chunkSize) {
if (!iter.hasNext()) return CompletableFuture.completedFuture(null);
CompletionStage<Void> c = renew(iter.next().iterator(), chunkSize);
return c.thenCompose(r -> renewSlots(iter, chunkSize));
}
abstract CompletionStage<Void> renew(Iterator<String> iter, int chunkSize);
final void add(String rawName, String lockName, long threadId, LockEntry entry) {
addSlotName(rawName);
LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId, lockName);
} else {
if (tryRun()) {
schedule();
}
}
}
@Override
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) return;
CompletionStage<Void> future = execute();
future.whenComplete((result, e) -> {
if (e != null) {
log.error("Can't update locks {} expiration", name2entry.keySet(), e);
schedule();
return;
}
schedule();
});
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
关键点:
- 第 17 行:续期间隔为锁过期时间的 1/3(默认 30 秒过期,每 10 秒续期一次);
- 第 30-35 行:集群模式下按 slot 分组续期;
- 第 37 行:抽象方法,由子类实现具体续期逻辑;
- 第 39 行:添加锁到续期队列;
- 第 46-48 行:首次添加锁时启动续期调度;
- 第 53 行:定时任务入口,执行续期操作;
- 第 64 行:续期完成后,继续调度下一次续期。
5. 架构方面的细节
5.1. Redis 主从切换时锁丢失问题
Redis 通常采用主从架构实现高可用,但 Redis 主从复制是异步的,这会导致以下问题:
- 客户端在主节点上成功获取锁;
- 主节点在将锁数据同步到从节点之前宕机;
- 从节点被提升为新的主节点,但没有锁数据;
- 其他客户端在新主节点上成功获取同一把锁;
- 多个客户端同时持有锁,互斥性被破坏。
5.2. 解决方案 1:使用 Zookeeper
特点:
- Zookeeper 是 CP 架构(一致性 + 分区容错性);
- 基于 Paxos/ZAB 协议,写入操作必须在过半节点成功后才返回;
- 保证强一致性,不会出现锁丢失问题。
缺点:
- 性能较 Redis 差;
- 过半节点故障时服务不可用。
5.3. 解决方案 2:Redis Red Lock
原理:使用多个独立的 Redis 主节点(通常 5 个),客户端在过半数节点成功获取锁才算成功。
实现步骤:
- 获取当前时间戳 T1;
- 依次在所有 Redis 节点上尝试获取锁(使用相同的 Key 和随机 Value);
- 只有在过半数节点成功获取锁,且总耗时小于锁的有效时间时,才认为加锁成功;
- 如果加锁失败,在所有节点上释放锁。
注意事项 1:不需要从节点
- Red Lock 本身通过多个主节点的过半写入已经满足高可用
- 引入主从复制后,反而会回到原来的主从切换问题
- 如果主节点宕机,从节点被提升为主节点,可能没有锁数据
正确做法:每个 Redis 实例都是独立的主节点,不配置从节点。
注意事项 2:持久化配置
如果 Redis 使用非同步持久化(如 RDB 或
appendfsync everysec),节点重启后可能丢失锁数据。正确做法:
- 使用 AOF 持久化,配置
appendfsync always(每次写入都同步刷盘),但性能显著下降; - 或者在节点重启后延迟一段时间(大于锁的最大过期时间)再加入集群。
- 使用 AOF 持久化,配置
6. 性能优化
6.1. 分段锁
在高并发场景下,单一分布式锁会成为系统瓶颈。借鉴 ConcurrentHashMap 的分段锁思想,可以有效降低锁粒度,提高并发性能。
6.1.1. 共享资源分段
将共享资源划分为 N 个段,每个段使用独立的锁,不同段之间的操作可以并发执行。
示例:库存扣减场景。假设商品总库存为 10000,将其分为 10 个段:
Text
段 0:库存 1000 (lock:inventory:0)
段 1:库存 1000 (lock:inventory:1)
...
段 9:库存 1000 (lock:inventory:9)6.1.2. 负载均衡策略
关键点:必须确保同一订单始终访问同一分段,避免同一订单的多次操作分散到不同分段,导致库存统计错误。此外需要考虑,当分段库存售罄时需要自动切换至其它分段,以及分段数量动态调整策略等。
常用策略:
基于订单 Id 的 Hash 取模
Javaint segmentIndex = Math.abs(orderId.hashCode()) % SEGMENT_COUNT;1基于用户 Id 的 Hash 取模(适用于用户维度的资源限制)
Javaint segmentIndex = Math.abs(userId.hashCode()) % SEGMENT_COUNT;1一致性 Hash(适用于分段数量动态调整的场景)
Javaint segmentIndex = consistentHash.getSegment(orderId);1
6.2. 其他优化思路
- 异步削峰:使用消息队列削峰,将扣库存操作异步化;
- 预分配令牌:提前生成库存令牌,用户抢到令牌即可下单。