JDK源码阅读计划(Day16) j.u.c之Semaphore
注释阅读Semaphore是计数信号量,允许N个线程同时访问资源。然而Semaphore并不会创建相应的许可证object,它内部仅仅是维护一个计数量。acquire操作会阻塞线程,除非拿到许可证才能继续进行。release操作会添加许可证,让线程去竞争这个许可证。Semaphore默认计数为1,相当于一个二元信号量,可以当作Lock来使用。可以实现为公平和非公平模式继承关系/** 信号量,适合多
注释阅读
Semaphore是计数信号量,允许N个线程同时访问资源。然而Semaphore并不会创建相应的许可证object,它内部仅仅是维护一个计数量。
- acquire操作会阻塞线程,除非拿到许可证才能继续进行。
- release操作会添加许可证,让线程去竞争这个许可证。
- Semaphore默认计数为1,相当于一个二元信号量,可以当作Lock来使用。
- 可以实现为公平和非公平模式
继承关系
/*
* 信号量,适合多个线程一起工作,即在某个时间段,可以有多个线程同时持有锁(线程数量受许可证总量限制)
*
* 内部实现了两种锁:【共享-非公平锁】和【共享-公平锁】
*
* 初始化锁(同步队列)时,会生产一定数量的许可证
* 申请锁的过程,可以看做是借出许可证,线程拿到锁的控制权时,许可证总量会减少
* 释放锁的过程,可以看做是归还许可证,线程丧失锁的控制权时,许可证总量会增加
*/
public class Semaphore implements Serializable
可以看到只是继承了Serializable接口
内部类
图来自ref
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync
一共有三个内部类,Sync作为抽象类是AQS的具体实现
而NonFairSync和FairSync作为两个公平锁和非公平锁的两个实现,分别都继承了Sync
Sync
图来自ref,Sync抽象类主要提供了上述五种方法
- 非公平模式的申请许可证
如果许可证数目充足,CAS操作更新当前可用的许可证数量。
如果许可证数目不充足,抢锁失败返回预期剩余的许可证数量
其中许可证数量本质上是AQS中的state
变量
// 允许单个或多个线程多次申请锁(借出许可证)
final int nonfairTryAcquireShared(int acquires) {
for(; ; ) {
// 获取当前可用的许可证数量
int available = getState();
// 计算借出一定数量许可证之后,还剩余的许可证数量
int remaining = available - acquires;
// 如果许可证数量不足,说明本次抢锁失败
if(remaining<0) {
// 返回【预期剩余】的许可证数量,<0
return remaining;
}
/* 至此,说明许可证数量充足,即成功借到了许可证 */
// 更新许可证数量
if(compareAndSetState(available, remaining)) {
// 返回【实际剩余】的许可证数量,>=0
return remaining;
}
}
}
- 公平模式下申请许可证
final int fairTryAcquireShared(int acquires)
与上面代码一样,不过要判断同步队列的队头是否还有其他(非当前线程)的线程在排队。
if(hasQueuedPredecessors()) {
return -1;
}
//Head.next的线程是否是当前线程,如果不是的话说明当前线程不是同步队列下一个要唤醒的线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 释放锁,归还许可证
// 释放锁,即归还许可证
protected final boolean tryReleaseShared(int releases) {
for(; ; ) {
// 计算原本剩余的许可证数量
int current = getState();
// 计算归还许可证之后的许可证数量
int next = current + releases;
if(next<current) {
// 归还许可证之后许可证数量应该增加而不是减少
throw new Error("Maximum permit count exceeded");
}
// 原子地更新许可证数量
if(compareAndSetState(current, next)) {
return true;
}
}
}
FairSync
重写了AQS的tryAcquireShard因为这个方法父类没有实现
/*
* 公平锁
*
* 许可证数量充足时:(体现了公平性)
* 如果线程T进入了抢锁状态,则需要先检查同步队列队头是否还有其他排队线程
* 如果同步队列中有其他排队线程,则本次抢锁失败,否则,抢锁成功
*
* 许可证数量不足时:
* 不管谁来抢锁,都会失败
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 申请一次公平锁
protected int tryAcquireShared(int acquires) {
return fairTryAcquireShared(acquires);
}
}
UnFairSync
重写了AQS的tryAcquireShard因为这个方法父类没有实现
/*
* 非公平锁
*
* 许可证数量充足时:(体现了非公平性)
* 如果线程T进入了抢锁状态,则不管同步队列中有没有其他排队线程,都会抢锁成功
*
* 许可证数量不足时:
* 不管谁来抢锁,都会失败
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 申请一次非公平锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
接下来我们分析下acquire和release函数的引用链
- acquire
// 申请共享锁,不允许阻塞带有中断标记的线程
public void acquire() throws InterruptedException {
// 借出一张许可证
sync.acquireSharedInterruptibly(1);
}
acquire->AQS:acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Semaphore:tryAcquireShard -> AQS:doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
//只有在前驱节点1为头节点才能被唤醒
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置node结点为头节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//如果抢锁失败就调用park阻塞线程,线程会响应中断,如果发生中断会抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
- release
release->AQS:releaseShared->Semaphore:tryReleaseShared->AQS:doReleaseShared
// 释放锁,并唤醒排队的结点
public void release() {
// 归还一张许可证
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for(; ; ) {
Node h = head;
// 如果队列中已经没有排队者,则到下面直接退出
if(h != null && h != tail) {
int ws = h.waitStatus;
// 需要唤醒后续结点(如果当前节点为SIGNAL,表明需要顺手唤醒该节点后续的结点)
if(ws == Node.SIGNAL) {
if(!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
continue; // loop to recheck cases
}
// 唤醒node后面陷入阻塞的“后继”
unparkSuccessor(h);
} else {
// 尝试设置Node.PROPAGATE标记,说明该节点没有经过阻塞就可以直接获得这个锁,会把该节点status设置为PROPAGATE
if(ws == 0) {
if(!h.compareAndSetWaitStatus(0, Node.PROPAGATE)){
continue; // loop on failed CAS
}
}
}
}
// loop if head changed
if(h == head){
break;
}
}
}
下面用Semaphore写个生产者消费者demo
import java.util.concurrent.Semaphore;
public class SemaphoreUserCase {
private static int count = 0;
private static Semaphore mutex = new Semaphore(1);
private static Semaphore notFull = new Semaphore(10);
private static Semaphore notNull = new Semaphore(0);
static class Producer implements Runnable {
@Override
public void run(){
while(true) {
try{
notFull.acquire();
mutex.acquire();
System.out.println("Produce: " + ++count);
}catch (InterruptedException e){
e.printStackTrace();
} finally {
mutex.release();
notNull.release();
}
}
}
}
static class Consumer implements Runnable{
@Override
public void run(){
while(true) {
try{
notNull.acquire();
mutex.acquire();
System.out.println("Consume: " + --count);
}catch (InterruptedException e){
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}
}
public static void main(String[] args) {
Thread producer = new Thread(new Producer());
Thread consumer1 = new Thread(new Consumer());
Thread consumer2 = new Thread(new Consumer());
producer.start();
consumer1.start();
consumer2.start();
}
}
ref
https://www.cnblogs.com/leesf456/p/5414778.html
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)