redisson 联锁MultiLock原理及分布式锁的应用
一、前言基于 Redis 的 Redisson 分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。当然,这是官网的介绍,具体是什么?一起看看联锁 MultiLock 使用以及源码吧!二、MultiLock 使用按照官方文档的说法,这里 Redisson 客户端可以不是同一个。当然,一般
一、前言
基于 Redis 的 Redisson 分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。
当然,这是官网的介绍,具体是什么?一起看看联锁 MultiLock 使用以及源码吧!
二、MultiLock 使用
按照官方文档的说法,这里 Redisson 客户端可以不是同一个。当然,一般工作中也不会说不用一个客户端吧。
三、加锁
源码入口:org.redisson.RedissonMultiLock#lock()
,默认超时时间 leaseTime 没有设置,所以为 -1。
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
这块方法太长,咱们拆分进行阅读。
- 基础等待时间 baseWaitTime = 锁数量 * 1500,在这里就是 4500 毫秒;
- leaseTime == -1 所以 waitTime = baseWaitTime,也就是 4500;
- while (true) 调用 tryLock 加锁,直到成功。
调用 tryLock 方法,其中参数 waitTime = 4500,leaseTime = -1,unit = MILLISECONDS
。
下面看一下 tryLock 里面有什么逻辑?
leaseTime != -1 不满足,这部分直接跳过。
waitTime != -1 条件满足,remainTime = 4500,lockWaitTime = 4500。
所以,failedLocksLimit() 这个方法直接返回 0,就是必须全部加锁成功。
这里才是重点:遍历所有的锁,依次加锁。
加锁逻辑就和可重入锁加锁并无区别了。所以 Lua 脚本就不进行分析了。
上面就是 tryLock 加锁之后的结果。
加锁成功,则将成功的锁放进 acquiredLocks 集合中;加锁失败,需要判断 failedLocksLimit
,因为这里是 0,所以会直接对成功加锁集合 acquiredLocks 中的所有锁执行锁释放,同时清空成功集合,恢复迭代器。
每次加锁之后,会更新锁剩余时间 remainTime,如果 remainTime 小于等于 0 了,则说明加锁超时,直接返回 false。这样就会执行外部的 while (true) 逻辑,然后重新再走一遍 RedissonMultiLock#tryLock
。
- 总结
根据理解,画图如下:总体而言,就是将 key1、key2、key3 …… keyN 放到一个 List 集合中,然后迭代循环加锁,直到所有的都成功。
- lock和tryLock的区别
- tryLock() 它表示用来尝试获取锁,
如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false
- tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,
只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。
如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true- tryLock(long waitTime, long leaseTime, TimeUnit unit)
在2的基础上,如果获取到锁,锁的最长持有时间为leaseTime
四、锁释放
看完加锁逻辑,锁释放就更容易理解了。
直接遍历释放锁即可,lock.unlockAsync() 是调用的 RedissonBaseLock#unlockAsync() 方法。
五、使用MultiLock实现分布式锁
-
建立一个三主三从的redis集群,参考文章
-
创建springboot项目
-
redissonCluster.yml
clusterServersConfig:
# 连接空闲超时,单位:毫秒 默认10000
idleConnectionTimeout: 10000
pingConnectionInterval: 1000
# 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
connectTimeout: 10000
# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
timeout: 3000
# 命令失败重试次数
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 重新连接时间间隔,单位:毫秒
failedSlaveReconnectionInterval: 3000
# 执行失败最大次数
#failedAttempts: 3
# 密码
password:
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
clientName: null
# loadBalancer 负载均衡算法类的选择
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
# 主节点最小空闲连接数 默认32
masterConnectionMinimumIdleSize: 32
# 主节点连接池大小 默认64
masterConnectionPoolSize: 64
# 订阅操作的负载均衡模式
subscriptionMode: SLAVE
# 只在从服务器读取
readMode: SLAVE
# 集群地址
nodeAddresses:
- "redis://xxx.xxx.xxx.xxx:9001"
- "redis://xxx.xxx.xxx.xxx:9002"
- "redis://xxx.xxx.xxx.xxx:9003"
# 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
scanInterval: 1000
#这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,单位毫秒,默认30s,每1/3的lockWatchdogTimeout时间,如果没执行玩业务,会自动给锁续约
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
#
#该参数仅限于Redisson PRO版本。
#performanceMode: HIGHER_THROUGHPUT
@Configuration
public class RedissonHttpSessionConfig {
//服务停用后调用shutdown方法
@Bean(destroyMethod="shutdown")
public RedissonClient getRedissonClient() throws IOException {
ResourceLoader loader = new DefaultResourceLoader();
Resource resource = loader.getResource("redissonCluster.yml");
Config config = Config.fromYAML(resource.getInputStream());
return Redisson.create(config);
}
}
@Component
public class RedissonMultiLockInit {
private final ArrayList<RLock> rLockList=new ArrayList<>();
@Autowired
RedissonClient redissonClient;
public RedissonMultiLock initLock(String... locksName){
for (String lockName : locksName) {
rLockList.add(redissonClient.getLock(lockName));
}
RLock[] rLocks = rLockList.toArray(new RLock[0]);
return new RedissonMultiLock(rLocks);
}
public List<RLock> getRLocks(){
return rLockList;
}
}
@Controller
@RequestMapping("/lock")
public class LockController {
@Autowired
RedissonMultiLockInit redissonMultiLockInit;
@Autowired
UserMapper userMapper;
@Autowired
PlatformTransactionManager transactionManager;
@GetMapping("/get/{waitTime}/{leaseTime}")
@ResponseBody
public String getLock(@PathVariable long waitTime, @PathVariable long leaseTime) throws InterruptedException {
String[] strings={"test1","test2","test3"};
RedissonMultiLock lock = redissonMultiLockInit.initLock(strings);
//手动开启事务管理,@Transitional无法控制redis的分布式锁
//创建事务定义对象
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
//设置是否只读,false支持事务
def.setReadOnly(false);
//设置事务隔离级别,可以重复读mysql默认级别
def.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ);
//设置事务传播行为
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
//配置事务管理器
TransactionStatus status = transactionManager.getTransaction(def);
if (lock.tryLock(waitTime,leaseTime, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName()+" waiting time is "+waitTime+"s " +
"leaseTime is "+leaseTime+"s "+
"execute time is "+(leaseTime+10)+" s" );
try {
userMapper.updateById(new User(1L,23,"beijing","myname2"));
//模拟执行超时释放锁
Thread.sleep((leaseTime+10)*1000);
List<RLock> rLocks = redissonMultiLockInit.getRLocks();
//判断是否仍然持有所有锁,防止锁过期
if(rLocks.stream().allMatch(RLock::isLocked)){
//提交业务
transactionManager.commit(status);
//提交业务后再释放分布式锁
lock.unlock();
return "unlock success,transition success";
}
else {
//回滚业务
transactionManager.rollback(status);
return "lock is expired,transition fail";
}
} catch (Exception e) {
e.printStackTrace();
return "transition error";
}
}
else {
return Thread.currentThread().getName()+" can't get the lock,because the waiting time isn't enough. Waiting time is "+waitTime+"s, " +
"leaseTime is "+leaseTime+"s ";
}
}
}
- 注意:这里有一个很经典的@Transitional和分布式锁同时使用的问题,所以为了解决该问题,我们手动开启事务,并确保在事务提交后,再释放分布式锁,关于这个问题,可以参考这篇文章
- 测试
-
http://localhost:8090/lock/get/6/-1
,表示最多有6s的等待获取锁的时间,并且业务的执行时间可以无续约(启用看门狗机制),那么这次业务是一定会成功的
-
http://localhost:8090/lock/get/6/9
,表示最多有6s的等待获取锁的时间,并且最多有9s的业务执行时间,超时就会释放分布式锁,业务失败,由于我在controller中写死了业务超过了9s,所以这次业务肯定失败。
-
http://localhost:8090/lock/get/6/-1
和http://localhost:8090/lock/get/2/-1
由于第一次业务要花费9s的业务执行时间,那么第二次业务无法在2s的时间内获取到分布式锁
,会退出此次业务。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)