redisson中Semaphore的信号量介绍及其原理
Semaphore通常叫信号量,可以用来同时控制访问特定资源的线程数量,通过协调各个线程,保证合理的使用资源。// 获得一个permit//获得var1个permit//尝试获得permit//尝试获得var1个permit//尝试获得permit, 等待时间var1//尝试获得var1个permit, 等待时间var2//释放1个permit//释放var1个permit//信号量的permit
目录
2.1 Semaphore设置许可数量(trySetPermits(int permits))
2.2 尝试获取许可(boolean tryAcquire())
1 基本介绍
Semaphore通常叫信号量,可以用来同时控制访问特定资源的线程数量,通过协调各个线程,保证合理的使用资源。
1.1API介绍
public interface RSemaphore extends RExpirable, RSemaphoreAsync {
// 获得一个permit
void acquire() throws InterruptedException;
//获得var1个permit
void acquire(int var1) throws InterruptedException;
//尝试获得permit
boolean tryAcquire();
//尝试获得var1个permit
boolean tryAcquire(int var1);
//尝试获得permit, 等待时间var1
boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException;
//尝试获得var1个permit, 等待时间var2
boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException;
//释放1个permit
void release();
//释放var1个permit
void release(int var1);
//信号量的permits数
int availablePermits();
//清空permits
int drainPermits();
//设置permits数
boolean trySetPermits(int var1);
//添加permits数
void addPermits(int var1);
}
1.2 示例
@RestController
@RequestMapping("/rsemaphone")
public class TestRsemaphoreController {
@Resource
private RedissonClient redissonClient;
private ExecutorService executorService= Executors.newFixedThreadPool(5);
/**
* redission信号量
*/
@RequestMapping("/rseTrySetPermits")
public void rseTrySetPermits() throws InterruptedException {
RSemaphore semaphore = redissonClient.getSemaphore("123");
semaphore.trySetPermits(5);
for (int i = 0; i < 10; i++) {
executorService.submit(()->{
try {
semaphore.acquire();
System.out.println("线程:"+Thread.currentThread().getName()+"获得permit");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("线程:"+Thread.currentThread().getName()+"释放permit");
semaphore.release();
}
});
}
}
}
运行结果:
2 源码解析
2.1 Semaphore设置许可数量(trySetPermits(int permits))
public boolean trySetPermits(int permits) {
return (Boolean)this.get(this.trySetPermitsAsync(permits));
}
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断分布式信号量是否存在,如果不存在才设置
"local value = redis.call('get', KEYS[1]); if (value == false or value == 0)
//使用string数据结构设置信号量许可数
then redis.call('set', KEYS[1], ARGV[1]);
//发布一条消息到redisson_sc:{semaphore}通道
redis.call('publish', KEYS[2], ARGV[1]);
//设置成功返回1
return 1;end;
//否则返回0
return 0;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{permits});
}
可以发现只有设置许可其实就是利用lua将值设置到redis中
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(5);
2.2 尝试获取许可(boolean tryAcquire())
尝试获取许可:可以看到获取许可的底层还是通过lua来实现的,如果能够成功获取返回true,否则返回false。
public boolean tryAcquire(int permits) {
return (Boolean)this.get(this.tryAcquireAsync(permits));
}
public RFuture<Boolean> tryAcquireAsync() {
return this.tryAcquireAsync(1);
}
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
} else {
return permits == 0 ? RedissonPromise.newSucceededFuture(true) : this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取当前剩余的许可数量
"local value = redis.call('get', KEYS[1]);
//许可数量不为空 并且当前许可数量大于等于剩余的许可数量
if (value ~= false and tonumber(value) >= tonumber(ARGV[1]))
//通过decrby减少剩余可用许可
then local val = redis.call('decrby', KEYS[1], ARGV[1]);
//返回1
return 1; end;
//其他情况返回0
return 0;", Collections.singletonList(this.getRawName()), new Object[]{permits});
}
}
从源码中可以看出获取许可就是通过操作redis中的数据,首先获取到剩余的许可数量,当只有剩余的许可数量大于想要获取的许可数量时返回1否则返回0.
3 Lua脚本
3.1 加锁lua脚本
参数 | 示例值 | 含义 |
KEY个数 | 1 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
- 脚本内容
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
- 脚本解读
问:返回nil、返回剩余过期时间有什么目的?
答:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果
3.2 解锁lua脚本
- 脚本入参
参数 | 示例值 | 含义 |
KEY个数 | 2 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
KEYS[2] | redisson_lock__channel:{my_first_lock_name} | 解锁消息PubSub频道 |
ARGV[1] | 0 | redisson定义0表示解锁消息 |
ARGV[2] | 300000 | 设置锁的过期时间;默认值30秒 |
ARGV[3] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识;同加锁流程 |
- 脚本内容
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
- 脚本解读
问:广播解锁消息有什么用?
答:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)