谈谈Redis分布式锁

## 怎么加锁

当我们要给一段代码加锁,要经历如下三步:

  1. 首先要保证要加锁的部分没有被其他人所占领
  2. 在没有被其他人占领的同时要占领下来
  3. 通常我们要给锁设置一个过期时间,防止未能正确释放锁从而导致一直占领锁

上面三步一定要保证的一点是一定要是一个原子操作,任何一步出错都有可能导致出问题。Redis给我们提供了一个命令setnx,能够一口气实现上面三个步骤。另外如果使用lua脚本不使用setnx也是可以保证上面三步的一个原子性的。

Redis分布式锁需要解决的问题及解决方案

  • 处理时间过长导致锁失效

    上面也提到了当我们加锁时是需要给锁设置一个过期时间的,但是我们的业务处理的时间有可能会超过这个时间,尤其是如果有调用其他服务时,例如:在加锁的过程中,A服务去调用B服务,此时B服务压力比较大,响应时间比较长,比我们设置的锁过期时间还要长,这样就会导致当前锁失效,其他请求拿到锁。这里就有小伙伴说了,我设置一个时间很长的锁不久可以了嘛,这样是可以解决大部分问题,但是一旦不能正常解锁,就会这个锁被占用时间很长,这样加过期时间的意义就小了不少。

    解决办法:给加锁操作添加一个续约机制,当到处理时间超过当前加锁时间的2/3时,就给当前的锁续约一个时间,只有当去释放这个锁,才算结束。

  • 一个请求的锁被另外的请求给处理了

    在不能保证第一个问题时:当A请求的锁过期,此时B请求获取了一把同样key的锁,此时A请求处理完任务要去释放锁,直接把B请求的锁给释放掉了,就出现了问题。

    在能保证第一个问题时:这个续约机制续约的时候需要保证只给自己请求的那个锁进行续约,不然如果A请求的锁的续约线程没有被正确终止,其实任务已经完成,这个key不需要被续约了,但是请求A还是会一直去续约这个key,不管续约的这个锁是不是当前请求的。

    解决办法:基于上面两种情况,我们需要做的是在key一样的前提下区分不同的请求的锁,即给相同key不同请求的value设置不一样的值,比如uuid时间戳都可以。

  • Redis Master节点宕机导致锁丢失

    在主从复制过程中,当一个锁加到了master节点上,在此时还没有同步到slave节点的时候master宕机了,此时这个锁就没了。

    解决办法:单机情况下没办法。当为Redis Cluster情况下,可以看一下官方给出的RedLock方案,大概就是给多个master加锁成功才算加锁成功。

  • 不可重入

    当一个请求已经加锁了,然后又要去请求一个需要相同锁的方法,此时就出现了问题,怎么也获取不到自己持有的锁。。这就是锁不可重入。

    解决办法:需要让锁支持可重入这个特性,那么应该怎么去做呢,大体思想是需要记录当前线程加锁的次数,然后没解锁一次减去一次,直到这个计数归零才算正确解锁。具体做法我了解的有两个,一个是使用threadlocal去记录当前线程的锁的次数。另外一种是加锁使用hash类型,把线程id当作hash的key,value为计数,这个在最后我们会分析。

我们的方案及实现

通过上面的分析,以及我们自己业务的需求,我们解决了上面的四个问题中的前两个,这两个已经能满足我们的业务的需要,算是在复杂和稳定中找了个我们需求的平衡。

我们使用的Lettuce Java客户端,主要是因为他支持响应式调用,下面来看一下Kotlin下的实现:

加锁操作,为了解决上述问题2中的问题,这里的value值我们用的是uuid:

1
2
3
4
5
6
7
8
9
10
11
suspend fun lock(leaseTime: Long, timeUnit: TimeUnit): Boolean {
value = UUID.randomUUID().toString()
return (
redis.reactive().set(key, value, SetArgs().nx().ex(timeUnit.toSeconds(leaseTime)))
.awaitFirstOrNull() != null
).also {
if (it) {
renewExpiration(leaseTime, timeUnit)
}
}
}

为了解决业务处理时间长锁失效问题进行了续约:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private fun renewExpiration(leaseTime: Long, timeUnit: TimeUnit) {
job = GlobalScope.launch(Dispatchers.IO) {
delay(timeUnit.toMillis(leaseTime * 2 / 3))
while (!released) {
redis.sync().eval<Long>(
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return 0 end",
ScriptOutputType.INTEGER,
arrayOf(key),
value,
leaseTime.toString()
).takeIf { it == 1L } ?: kotlin.run {
released = true
}
delay(timeUnit.toMillis(leaseTime * 2 / 3))
}
}
}

这里的GlobalScope.launch在Java中可以用守护线程去实现,效果一样,这里使用GlobalScope.launch充分利用了协程的优势,也可以用netty中的timer去实现,redission就是用这种方式实现的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private val timer = HashedWheelTimer()
private fun renewExpiration(key: String, leaseTime: Long, timeUnit: TimeUnit) {
val lockInfo = renewExpirationMap[key] ?: return
lockInfo.timeout = timer.newTimeout(
{
redis.sync().eval<Long>(
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return 0 end",
ScriptOutputType.INTEGER,
arrayOf(key),
lockInfo.value,
leaseTime.toString()
).takeIf { it == 1L }?.let {
renewExpiration(key, leaseTime, timeUnit)
}
},
leaseTime * 2 / 3, timeUnit
)
}

解锁的时候会判断value值,通过lua脚本

1
2
3
4
5
6
7
8
9
10
suspend fun unlock() {
redis.reactive().eval<Long>(
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
ScriptOutputType.INTEGER,
arrayOf(key),
value
).awaitFirstOrNull().takeIf { it == 1L }?.let {
cancelRenewExpiration()
}
}

源码在这里可以看到sisyphus/DistributedLock.kt

再来看下可重入性

我们的分布式锁没有实现可重入性,梳理了下场景好像没哪里用的着,就没有去实现,不过倒是研究了下redission是如何实现可重入性的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

先来看下这个lua脚本的几个变量都代表了什么,KEYS[1]:getRawName()代表锁的key,ARGV[1]:unit.toMillis(leaseTime)代表了过期时间,ARGV[2]:getLockName(threadId)代表了当前锁的value值,来看下这个value是什么:

1
2
3
4
5
6
7
8
9
10
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();//id值的获取为当前连接的id
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}

这下我们就可以知道了,锁的Value为当前值的连接id加上当前的线程id就是当前锁的value值。然后我们再来看下脚本的意思:

当不存在key的时候,这时候是可以进行加锁的,执行了hincrby key value 1 并且设置了过期时间。当这次请求需要重入的时候,即在key存在时,又要去加锁,先判断value是否是一致的(也就是说来确定是重入,而不是其他线程的非法加锁请求,若是其他线程按理说应该加锁失败)一致则进行计数hincrby key value 1 这样就可以记录当前重入的次数了。

当解锁的时候每次解锁就把重入次数减1,当减到0的时候则直接删除这个key。删除的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

以上就是redission中锁的重入性的实现。redission中的锁做的比较完整,如果对redis锁有比较高的要求的话可以直接使用redission。

上面就是这次redis分布式锁的分享,如果我上面有写的哪里不准确或者有错误,希望能够给我指出。