一、前言

基于 Redis 的 Redisson 分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。当然,这是官网的介绍,具体是什么?一起看看联锁 MultiLock 使用以及源码吧!

二、MultiLock 使用

在这里插入图片描述

按照官方文档的说法,这里 Redisson 客户端可以不是同一个。当然,一般工作中也不会说不用一个客户端吧。

三、加锁

源码入口:org.redisson.RedissonMultiLock#lock(),默认超时时间 leaseTime 没有设置,所以为 -1。

  public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

在这里插入图片描述
这块方法太长,咱们拆分进行阅读。

  1. 基础等待时间 baseWaitTime = 锁数量 * 1500,在这里就是 4500 毫秒;
  2. leaseTime == -1 所以 waitTime = baseWaitTime,也就是 4500;
  3. while (true) 调用 tryLock 加锁,直到成功。

调用 tryLock 方法,其中参数 waitTime = 4500,leaseTime = -1,unit = MILLISECONDS

下面看一下 tryLock 里面有什么逻辑?

在这里插入图片描述
leaseTime != -1 不满足,这部分直接跳过。

在这里插入图片描述
waitTime != -1 条件满足,remainTime = 4500,lockWaitTime = 4500。

在这里插入图片描述
所以,failedLocksLimit() 这个方法直接返回 0,就是必须全部加锁成功。

在这里插入图片描述

这里才是重点:遍历所有的锁,依次加锁。加锁逻辑就和可重入锁加锁并无区别了。所以 Lua 脚本就不进行分析了。

在这里插入图片描述

上面就是 tryLock 加锁之后的结果。

加锁成功,则将成功的锁放进 acquiredLocks 集合中;加锁失败,需要判断 failedLocksLimit,因为这里是 0,所以会直接对成功加锁集合 acquiredLocks 中的所有锁执行锁释放,同时清空成功集合,恢复迭代器。
在这里插入图片描述

每次加锁之后,会更新锁剩余时间 remainTime,如果 remainTime 小于等于 0 了,则说明加锁超时,直接返回 false。这样就会执行外部的 while (true) 逻辑,然后重新再走一遍 RedissonMultiLock#tryLock

  • 总结
    根据理解,画图如下:总体而言,就是将 key1、key2、key3 …… keyN 放到一个 List 集合中,然后迭代循环加锁,直到所有的都成功。

在这里插入图片描述

  • lock和tryLock的区别
  1. tryLock() 它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false
  2. tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true
  3. tryLock(long waitTime, long leaseTime, TimeUnit unit)在2的基础上,如果获取到锁,锁的最长持有时间为leaseTime

四、锁释放

看完加锁逻辑,锁释放就更容易理解了。
在这里插入图片描述
直接遍历释放锁即可,lock.unlockAsync() 是调用的 RedissonBaseLock#unlockAsync() 方法。

五、使用MultiLock实现分布式锁

  • 建立一个三主三从的redis集群,参考文章
    在这里插入图片描述

  • 创建springboot项目

  • redissonCluster.yml

clusterServersConfig:
  # 连接空闲超时,单位:毫秒 默认10000
  idleConnectionTimeout: 10000
  pingConnectionInterval: 1000
  # 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
  connectTimeout: 10000
  # 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
  timeout: 3000
  # 命令失败重试次数
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  # 重新连接时间间隔,单位:毫秒
  failedSlaveReconnectionInterval: 3000
  # 执行失败最大次数
  #failedAttempts: 3
  # 密码
  password:
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  clientName: null
  # loadBalancer 负载均衡算法类的选择
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  # 主节点最小空闲连接数 默认32
  masterConnectionMinimumIdleSize: 32
  # 主节点连接池大小 默认64
  masterConnectionPoolSize: 64
  # 订阅操作的负载均衡模式
  subscriptionMode: SLAVE
  # 只在从服务器读取
  readMode: SLAVE
  # 集群地址
  nodeAddresses:
    - "redis://xxx.xxx.xxx.xxx:9001"
    - "redis://xxx.xxx.xxx.xxx:9002"
    - "redis://xxx.xxx.xxx.xxx:9003"
  # 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
  scanInterval: 1000
  #这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,单位毫秒,默认30s,每1/3的lockWatchdogTimeout时间,如果没执行玩业务,会自动给锁续约
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
#
#该参数仅限于Redisson PRO版本。
#performanceMode: HIGHER_THROUGHPUT
@Configuration
public class RedissonHttpSessionConfig  {
    //服务停用后调用shutdown方法
    @Bean(destroyMethod="shutdown")
    public RedissonClient getRedissonClient() throws IOException {
        ResourceLoader loader = new DefaultResourceLoader();
        Resource resource = loader.getResource("redissonCluster.yml");
        Config config = Config.fromYAML(resource.getInputStream());
        return Redisson.create(config);
    }
}
@Component
public class RedissonMultiLockInit {
    private final ArrayList<RLock> rLockList=new ArrayList<>();
    @Autowired
    RedissonClient redissonClient;
    public  RedissonMultiLock initLock(String... locksName){
        for (String lockName : locksName) {
           rLockList.add(redissonClient.getLock(lockName));
        }
        RLock[] rLocks = rLockList.toArray(new RLock[0]);
        return new RedissonMultiLock(rLocks);
    }
    public List<RLock> getRLocks(){
        return rLockList;
    }
}
@Controller
@RequestMapping("/lock")
public class LockController {
    @Autowired
    RedissonMultiLockInit redissonMultiLockInit;
    @Autowired
    UserMapper userMapper;
    @Autowired
    PlatformTransactionManager transactionManager;
    
    @GetMapping("/get/{waitTime}/{leaseTime}")
    @ResponseBody
    public String getLock(@PathVariable long waitTime, @PathVariable long leaseTime) throws InterruptedException {
        String[] strings={"test1","test2","test3"};
        RedissonMultiLock lock = redissonMultiLockInit.initLock(strings);
        //手动开启事务管理,@Transitional无法控制redis的分布式锁
        //创建事务定义对象
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        //设置是否只读,false支持事务
        def.setReadOnly(false);
        //设置事务隔离级别,可以重复读mysql默认级别
        def.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ);
        //设置事务传播行为
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        //配置事务管理器
        TransactionStatus status = transactionManager.getTransaction(def);
        if (lock.tryLock(waitTime,leaseTime, TimeUnit.SECONDS)){
            System.out.println(Thread.currentThread().getName()+" waiting time is "+waitTime+"s " +
                    "leaseTime is "+leaseTime+"s "+
                    "execute time is "+(leaseTime+10)+" s" );
            try {
                userMapper.updateById(new User(1L,23,"beijing","myname2"));
                //模拟执行超时释放锁
                Thread.sleep((leaseTime+10)*1000);
                List<RLock> rLocks = redissonMultiLockInit.getRLocks();
                //判断是否仍然持有所有锁,防止锁过期
                if(rLocks.stream().allMatch(RLock::isLocked)){
                    //提交业务
                    transactionManager.commit(status);
                    //提交业务后再释放分布式锁
                    lock.unlock();
                    return "unlock success,transition success";
                }
                else {
                    //回滚业务
                    transactionManager.rollback(status);
                    return "lock is expired,transition fail";
                }
            } catch (Exception e) {
                e.printStackTrace();
                return "transition error";
            }
        }
        else {
            return Thread.currentThread().getName()+" can't get the lock,because the waiting time isn't enough. Waiting time is "+waitTime+"s, " +
                    "leaseTime is "+leaseTime+"s ";
        }
    }
}
  • 注意:这里有一个很经典的@Transitional和分布式锁同时使用的问题,所以为了解决该问题,我们手动开启事务,并确保在事务提交后,再释放分布式锁,关于这个问题,可以参考这篇文章
  • 测试
  1. http://localhost:8090/lock/get/6/-1,表示最多有6s的等待获取锁的时间,并且业务的执行时间可以无续约(启用看门狗机制),那么这次业务是一定会成功的
    在这里插入图片描述

  2. http://localhost:8090/lock/get/6/9,表示最多有6s的等待获取锁的时间,并且最多有9s的业务执行时间,超时就会释放分布式锁,业务失败,由于我在controller中写死了业务超过了9s,所以这次业务肯定失败。
    在这里插入图片描述

  3. http://localhost:8090/lock/get/6/-1http://localhost:8090/lock/get/2/-1

由于第一次业务要花费9s的业务执行时间,那么第二次业务无法在2s的时间内获取到分布式锁,会退出此次业务。
在这里插入图片描述

参考文章

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐