目录

1 基本介绍

1.1API介绍

1.2 示例

 2 源码解析

2.1  Semaphore设置许可数量(trySetPermits(int permits))

2.2 尝试获取许可(boolean tryAcquire())

3 Lua脚本

        3.1 加锁lua脚本

         3.2 解锁lua脚本 


1 基本介绍

Semaphore通常叫信号量,可以用来同时控制访问特定资源的线程数量,通过协调各个线程,保证合理的使用资源。

1.1API介绍

public interface RSemaphore extends RExpirable, RSemaphoreAsync {

    // 获得一个permit
    void acquire() throws InterruptedException; 

    //获得var1个permit
    void acquire(int var1) throws InterruptedException; 
 
    //尝试获得permit
    boolean tryAcquire(); 

   //尝试获得var1个permit
    boolean tryAcquire(int var1); 

   //尝试获得permit, 等待时间var1
    boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException;

   //尝试获得var1个permit, 等待时间var2
    boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException;

   //释放1个permit
    void release();

   //释放var1个permit
    void release(int var1);
 
   //信号量的permits数
    int availablePermits();

   //清空permits
    int drainPermits();

  //设置permits数
    boolean trySetPermits(int var1);

//添加permits数
    void addPermits(int var1);
}

1.2 示例

@RestController
@RequestMapping("/rsemaphone")
public class TestRsemaphoreController {

    @Resource
    private RedissonClient redissonClient;

    private ExecutorService executorService= Executors.newFixedThreadPool(5);

    /**
     * redission信号量
     */
    @RequestMapping("/rseTrySetPermits")
    public void rseTrySetPermits() throws InterruptedException {
        RSemaphore semaphore = redissonClient.getSemaphore("123");
        semaphore.trySetPermits(5);
        for (int i = 0; i < 10; i++) {
            executorService.submit(()->{
                try {
                    semaphore.acquire();
                    System.out.println("线程:"+Thread.currentThread().getName()+"获得permit");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("线程:"+Thread.currentThread().getName()+"释放permit");
                    semaphore.release();

                }
            });
        }

    }

}

运行结果:

 2 源码解析

2.1  Semaphore设置许可数量(trySetPermits(int permits))

    public boolean trySetPermits(int permits) {
        return (Boolean)this.get(this.trySetPermitsAsync(permits));
    }

    public RFuture<Boolean> trySetPermitsAsync(int permits) {
        return this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
//判断分布式信号量是否存在,如果不存在才设置
"local value = redis.call('get', KEYS[1]); if (value == false or value == 0) 
//使用string数据结构设置信号量许可数
then redis.call('set', KEYS[1], ARGV[1]); 
//发布一条消息到redisson_sc:{semaphore}通道
redis.call('publish', KEYS[2], ARGV[1]); 
//设置成功返回1
return 1;end;
//否则返回0
return 0;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{permits});
    }

 可以发现只有设置许可其实就是利用lua将值设置到redis中

RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(5);

2.2 尝试获取许可(boolean tryAcquire())

尝试获取许可:可以看到获取许可的底层还是通过lua来实现的,如果能够成功获取返回true,否则返回false。

    public boolean tryAcquire(int permits) {
        return (Boolean)this.get(this.tryAcquireAsync(permits));
    }

    public RFuture<Boolean> tryAcquireAsync() {
        return this.tryAcquireAsync(1);
    }

    public RFuture<Boolean> tryAcquireAsync(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException("Permits amount can't be negative");
        } else {
            return permits == 0 ? RedissonPromise.newSucceededFuture(true) : this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
//获取当前剩余的许可数量
"local value = redis.call('get', KEYS[1]); 
//许可数量不为空 并且当前许可数量大于等于剩余的许可数量
if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) 
//通过decrby减少剩余可用许可
then local val = redis.call('decrby', KEYS[1], ARGV[1]);
//返回1
 return 1; end; 
//其他情况返回0
return 0;", Collections.singletonList(this.getRawName()), new Object[]{permits});
        }
    }

从源码中可以看出获取许可就是通过操作redis中的数据,首先获取到剩余的许可数量,当只有剩余的许可数量大于想要获取的许可数量时返回1否则返回0.

3 Lua脚本

        3.1 加锁lua脚本

参数示例值含义
KEY个数1KEY个数
KEYS[1]my_first_lock_name        锁名
ARGV[1]60000持有锁的有效时间:毫秒
ARGV[2]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID
  • 脚本内容

-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);

  • 脚本解读

问:返回nil、返回剩余过期时间有什么目的?

答:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果

         3.2 解锁lua脚本 

  • 脚本入参
参数示例值含义
KEY个数2KEY个数
KEYS[1]my_first_lock_name锁名
KEYS[2]redisson_lock__channel:{my_first_lock_name}解锁消息PubSub频道
ARGV[1]0redisson定义0表示解锁消息
ARGV[2]300000设置锁的过期时间;默认值30秒
ARGV[3]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识;同加锁流程
  • 脚本内容

-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1; 
end;
 
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end; 
 
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end;
 
return nil;

  • 脚本解读

 问:广播解锁消息有什么用? 

答:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐