## 怎么加锁
当我们要给一段代码加锁,要经历如下三步:
首先要保证要加锁的部分没有被其他人所占领
在没有被其他人占领的同时要占领下来
通常我们要给锁设置一个过期时间,防止未能正确释放锁从而导致一直占领锁
上面三步一定要保证的一点是一定要是一个原子操作,任何一步出错都有可能导致出问题。Redis给我们提供了一个命令setnx,能够一口气实现上面三个步骤。另外如果使用lua脚本不使用setnx也是可以保证上面三步的一个原子性的。
Redis分布式锁需要解决的问题及解决方案
处理时间过长导致锁失效
上面也提到了当我们加锁时是需要给锁设置一个过期时间的,但是我们的业务处理的时间有可能会超过这个时间,尤其是如果有调用其他服务时,例如:在加锁的过程中,A服务去调用B服务,此时B服务压力比较大,响应时间比较长,比我们设置的锁过期时间还要长,这样就会导致当前锁失效,其他请求拿到锁。这里就有小伙伴说了,我设置一个时间很长的锁不久可以了嘛,这样是可以解决大部分问题,但是一旦不能正常解锁,就会这个锁被占用时间很长,这样加过期时间的意义就小了不少。
解决办法:给加锁操作添加一个续约机制,当到处理时间超过当前加锁时间的2/3时,就给当前的锁续约一个时间,只有当去释放这个锁,才算结束。
一个请求的锁被另外的请求给处理了
在不能保证第一个问题时:当A请求的锁过期,此时B请求获取了一把同样key的锁,此时A请求处理完任务要去释放锁,直接把B请求的锁给释放掉了,就出现了问题。
在能保证第一个问题时:这个续约机制续约的时候需要保证只给自己请求的那个锁进行续约,不然如果A请求的锁的续约线程没有被正确终止,其实任务已经完成,这个key不需要被续约了,但是请求A还是会一直去续约这个key,不管续约的这个锁是不是当前请求的。
解决办法:基于上面两种情况,我们需要做的是在key一样的前提下区分不同的请求的锁,即给相同key不同请求的value设置不一样的值,比如uuid时间戳都可以。
Redis Master节点宕机导致锁丢失
在主从复制过程中,当一个锁加到了master节点上,在此时还没有同步到slave节点的时候master宕机了,此时这个锁就没了。
解决办法:单机情况下没办法。当为Redis Cluster情况下,可以看一下官方给出的RedLock方案 ,大概就是给多个master加锁成功才算加锁成功。
不可重入
当一个请求已经加锁了,然后又要去请求一个需要相同锁的方法,此时就出现了问题,怎么也获取不到自己持有的锁。。这就是锁不可重入。
解决办法:需要让锁支持可重入这个特性,那么应该怎么去做呢,大体思想是需要记录当前线程加锁的次数,然后没解锁一次减去一次,直到这个计数归零才算正确解锁。具体做法我了解的有两个,一个是使用threadlocal去记录当前线程的锁的次数。另外一种是加锁使用hash类型,把线程id当作hash的key,value为计数,这个在最后我们会分析。
我们的方案及实现 通过上面的分析,以及我们自己业务的需求,我们解决了上面的四个问题中的前两个,这两个已经能满足我们的业务的需要,算是在复杂和稳定中找了个我们需求的平衡。
我们使用的Lettuce Java客户端,主要是因为他支持响应式调用,下面来看一下Kotlin下的实现:
加锁操作,为了解决上述问题2中的问题,这里的value值我们用的是uuid:
1 2 3 4 5 6 7 8 9 10 11 suspend fun lock (leaseTime: Long , timeUnit: TimeUnit ) : Boolean { value = UUID.randomUUID().toString() return ( redis.reactive().set (key, value, SetArgs().nx().ex(timeUnit.toSeconds(leaseTime))) .awaitFirstOrNull() != null ).also { if (it) { renewExpiration(leaseTime, timeUnit) } } }
为了解决业务处理时间长锁失效问题进行了续约:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private fun renewExpiration (leaseTime: Long , timeUnit: TimeUnit ) { job = GlobalScope.launch(Dispatchers.IO) { delay(timeUnit.toMillis(leaseTime * 2 / 3 )) while (!released) { redis.sync().eval<Long >( "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return 0 end" , ScriptOutputType.INTEGER, arrayOf(key), value, leaseTime.toString() ).takeIf { it == 1L } ?: kotlin.run { released = true } delay(timeUnit.toMillis(leaseTime * 2 / 3 )) } } }
这里的GlobalScope.launch在Java中可以用守护线程去实现,效果一样,这里使用GlobalScope.launch充分利用了协程的优势,也可以用netty中的timer去实现,redission就是用这种方式实现的,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private val timer = HashedWheelTimer()private fun renewExpiration (key: String , leaseTime: Long , timeUnit: TimeUnit ) { val lockInfo = renewExpirationMap[key] ?: return lockInfo.timeout = timer.newTimeout( { redis.sync().eval<Long >( "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return 0 end" , ScriptOutputType.INTEGER, arrayOf(key), lockInfo.value, leaseTime.toString() ).takeIf { it == 1L }?.let { renewExpiration(key, leaseTime, timeUnit) } }, leaseTime * 2 / 3 , timeUnit ) }
解锁的时候会判断value值,通过lua脚本
1 2 3 4 5 6 7 8 9 10 suspend fun unlock () { redis.reactive().eval<Long >( "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" , ScriptOutputType.INTEGER, arrayOf(key), value ).awaitFirstOrNull().takeIf { it == 1L }?.let { cancelRenewExpiration() } }
源码在这里可以看到sisyphus/DistributedLock.kt
再来看下可重入性 我们的分布式锁没有实现可重入性,梳理了下场景好像没哪里用的着,就没有去实现,不过倒是研究了下redission是如何实现可重入性的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
先来看下这个lua脚本的几个变量都代表了什么,KEYS[1]:getRawName()代表锁的key,ARGV[1]:unit.toMillis(leaseTime)代表了过期时间,ARGV[2]:getLockName(threadId)代表了当前锁的value值,来看下这个value是什么:
1 2 3 4 5 6 7 8 9 10 protected String getLockName (long threadId) { return id + ":" + threadId; }public RedissonBaseLock (CommandAsyncExecutor commandExecutor, String name) { super (commandExecutor, name); this .commandExecutor = commandExecutor; this .id = commandExecutor.getConnectionManager().getId(); this .internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this .entryName = id + ":" + name; }
这下我们就可以知道了,锁的Value为当前值的连接id加上当前的线程id就是当前锁的value值。然后我们再来看下脚本的意思:
当不存在key的时候,这时候是可以进行加锁的,执行了hincrby key value 1 并且设置了过期时间。当这次请求需要重入的时候,即在key存在时,又要去加锁,先判断value是否是一致的(也就是说来确定是重入,而不是其他线程的非法加锁请求,若是其他线程按理说应该加锁失败)一致则进行计数hincrby key value 1 这样就可以记录当前重入的次数了。
当解锁的时候每次解锁就把重入次数减1,当减到0的时候则直接删除这个key。删除的脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
以上就是redission中锁的重入性的实现。redission中的锁做的比较完整,如果对redis锁有比较高的要求的话可以直接使用redission。
上面就是这次redis分布式锁的分享,如果我上面有写的哪里不准确或者有错误,希望能够给我指出。