Redisson 分布式锁原理
<>1. 工具类
package com.meta.mall.common.utils; import lombok.extern.slf4j.Slf4j; import
org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.
springframework.stereotype.Component; import javax.annotation.Resource; import
java.util.concurrent.TimeUnit; /** * redisson 分布式工具类 * * @author gaoyang *
@date 2022-05-14 08:58 */ @Slf4j @Component public class RedissonUtils {
@Resource private RedissonClient redissonClient; /** * 加锁 * * @param lockKey */
public void lock(String lockKey) { RLock lock = redissonClient.getLock(lockKey);
lock.lock(); } /** * 带过期时间的锁 * * @param lockKey key * @param leaseTime
上锁后自动释放锁时间 */ public void lock(String lockKey, long leaseTime) { RLock lock =
redissonClient.getLock(lockKey); lock.lock(leaseTime, TimeUnit.SECONDS); } /**
* 带超时时间的锁 * * @param lockKey key * @param leaseTime 上锁后自动释放锁时间 * @param unit
时间单位 */ public void lock(String lockKey, long leaseTime, TimeUnit unit) { RLock
lock= redissonClient.getLock(lockKey); lock.lock(leaseTime, unit); } /** *
尝试获取锁 * * @param lockKey key * @return */ public boolean tryLock(String lockKey)
{ RLock lock = redissonClient.getLock(lockKey); return lock.tryLock(); } /** *
尝试获取锁 * * @param lockKey key * @param waitTime 最多等待时间 * @param leaseTime
上锁后自动释放锁时间 * @return boolean */ public boolean tryLock(String lockKey, long
waitTime, long leaseTime) { RLock lock = redissonClient.getLock(lockKey); try {
return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); } catch (
InterruptedException e) { log.error("RedissonUtils - tryLock异常", e); } return
false; } /** * 尝试获取锁 * * @param lockKey key * @param waitTime 最多等待时间 * @param
leaseTime 上锁后自动释放锁时间 * @param unit 时间单位 * @return boolean */ public boolean
tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit) { RLock
lock= redissonClient.getLock(lockKey); try { return lock.tryLock(waitTime,
leaseTime, unit); } catch (InterruptedException e) { log.error("RedissonUtils -
tryLock异常", e); } return false; } /** * 释放锁 * * @param lockKey key */ public
void unlock(String lockKey) { RLock lock = redissonClient.getLock(lockKey); lock
.unlock(); } /** * 是否存在锁 * * @param lockKey key * @return */ public boolean
isLocked(String lockKey) { RLock lock = redissonClient.getLock(lockKey); return
lock.isLocked(); } }
<>2. lock和tryLock的区别
*
返回值
lock 是 void;
tryLock 是 boolean。
*
时机
lock 一直等锁释放;
tryLock 获取到锁返回true,获取不到锁并直接返回false。
lock拿不到锁会一直等待。tryLock是去尝试,拿不到就返回false,拿到返回true。
tryLock是可以被打断的,被中断的,lock是不可以。
<>3. 源码分析
<>3.1 lock
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws
InterruptedException { // 获取当前线程 ID long threadId = Thread.currentThread().getId
(); // 获取锁,正常获取锁则ttl为null,竞争锁时返回锁的过期时间 Long ttl = tryAcquire(-1, leaseTime, unit
, threadId); // lock acquired if (ttl == null) { return; } // 订阅锁释放事件 //
如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争 RFuture<
RedissonLockEntry> future = subscribe(threadId); if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.
syncSubscription(future); } try { while (true) { // 循环重试获取锁,直至重新获取锁成功才跳出循环 //
此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程 ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0
) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); }
catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow()
.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly
) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().
acquireUninterruptibly(); } } } } finally { // 最后释放订阅事件 unsubscribe(future,
threadId); } // get(lockAsync(leaseTime, unit)); } <T> RFuture<T>
tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.
INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire',
KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists',
KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); "
+ "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +
"return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()),
unit.toMillis(leaseTime), getLockName(threadId)); }
此段脚本为一段lua脚本:
KEY[1]: 为你加锁的lock值
ARGV[2]: 为线程id
ARGV[1]: 为设置的过期时间
第一个if:
判断是否存在设置lock的key是否存在,不存在则利用redis的hash结构设置一个hash,值为1,并设置过期时间,后续返回锁。
第二个if:
判断是否存在设置lock的key是否存在,存在此线程的hash,则为这个锁的重入次数加1(将hash值+1),并重新设置过期时间,后续返回锁。
最后返回:
这个最后返回不是说最后结果返回,是代表以上两个if都没有进入,则代表处于竞争锁的情况,后续返回竞争锁的过期时间。
<>3.2 tryLock
tryLock具有返回值,true或者false,表示是否成功获取锁。tryLock前期获取锁逻辑基本与lock一致,主要是后续获取锁失败的处理逻辑与lock不一致。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws
InterruptedException { long time = unit.toMillis(waitTime); long current =
System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long
ttl= tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl
== null) { return true; } //
获取锁失败后,中途tryLock会一直判断中间操作耗时是否已经消耗锁的过期时间,如果消耗完则返回false time -= System.
currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit,
threadId); return false; } current = System.currentTimeMillis(); // 订阅锁释放事件 //
如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争. RFuture<
RedissonLockEntry> subscribeFuture = subscribe(threadId); //
将订阅阻塞,阻塞时间设置为我们调用tryLock设置的最大等待时间,超过时间则返回false if (!subscribeFuture.await(time,
TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.
onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId)
; } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -=
System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime,
unit, threadId); return false; } // 循环获取锁,但由于上面有最大等待时间限制,基本会在上面返回false while (
true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime
, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; }
time-= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed
(waitTime, unit, threadId); return false; } // waiting for message currentTime =
System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow
().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.
getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.
currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit
, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId
); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
应尽量使用tryLock,且携带参数,因为可设置最大等待时间以及可及时获取加锁返回值,后续可做一些其他加锁失败的业务