注释阅读

Semaphore是计数信号量,允许N个线程同时访问资源。然而Semaphore并不会创建相应的许可证object,它内部仅仅是维护一个计数量。

  • acquire操作会阻塞线程,除非拿到许可证才能继续进行。
  • release操作会添加许可证,让线程去竞争这个许可证。
  • Semaphore默认计数为1,相当于一个二元信号量,可以当作Lock来使用。
  • 可以实现为公平和非公平模式

继承关系

/*
 * 信号量,适合多个线程一起工作,即在某个时间段,可以有多个线程同时持有锁(线程数量受许可证总量限制)
 *
 * 内部实现了两种锁:【共享-非公平锁】和【共享-公平锁】
 *
 * 初始化锁(同步队列)时,会生产一定数量的许可证
 * 申请锁的过程,可以看做是借出许可证,线程拿到锁的控制权时,许可证总量会减少
 * 释放锁的过程,可以看做是归还许可证,线程丧失锁的控制权时,许可证总量会增加
 */
public class Semaphore implements Serializable 

可以看到只是继承了Serializable接口

内部类

在这里插入图片描述
图来自ref

abstract static class Sync extends AbstractQueuedSynchronizer

static final class NonfairSync extends Sync

static final class FairSync extends Sync

一共有三个内部类,Sync作为抽象类是AQS的具体实现

而NonFairSync和FairSync作为两个公平锁和非公平锁的两个实现,分别都继承了Sync

Sync在这里插入图片描述

图来自ref,Sync抽象类主要提供了上述五种方法

  • 非公平模式的申请许可证

如果许可证数目充足,CAS操作更新当前可用的许可证数量。
如果许可证数目不充足,抢锁失败返回预期剩余的许可证数量

其中许可证数量本质上是AQS中的state变量

// 允许单个或多个线程多次申请锁(借出许可证)
final int nonfairTryAcquireShared(int acquires) {
       for(; ; ) {
           // 获取当前可用的许可证数量
           int available = getState();
           
           // 计算借出一定数量许可证之后,还剩余的许可证数量
           int remaining = available - acquires;
           
           // 如果许可证数量不足,说明本次抢锁失败
           if(remaining<0) {
               // 返回【预期剩余】的许可证数量,<0
               return remaining;
           }
           
           /* 至此,说明许可证数量充足,即成功借到了许可证 */
           
           // 更新许可证数量
           if(compareAndSetState(available, remaining)) {
               // 返回【实际剩余】的许可证数量,>=0
               return remaining;
           }
       }
}
  • 公平模式下申请许可证
final int fairTryAcquireShared(int acquires)

与上面代码一样,不过要判断同步队列的队头是否还有其他(非当前线程)的线程在排队。

if(hasQueuedPredecessors()) {
    return -1;
}

//Head.next的线程是否是当前线程,如果不是的话说明当前线程不是同步队列下一个要唤醒的线程
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • 释放锁,归还许可证
// 释放锁,即归还许可证
protected final boolean tryReleaseShared(int releases) {
       for(; ; ) {
           // 计算原本剩余的许可证数量
           int current = getState();
           
           // 计算归还许可证之后的许可证数量
           int next = current + releases;
           if(next<current) {
               // 归还许可证之后许可证数量应该增加而不是减少
               throw new Error("Maximum permit count exceeded");
           }
           
           // 原子地更新许可证数量
           if(compareAndSetState(current, next)) {
               return true;
           }
       }
   }
FairSync

重写了AQS的tryAcquireShard因为这个方法父类没有实现

/*
* 公平锁
  *
  * 许可证数量充足时:(体现了公平性)
  * 如果线程T进入了抢锁状态,则需要先检查同步队列队头是否还有其他排队线程
  * 如果同步队列中有其他排队线程,则本次抢锁失败,否则,抢锁成功
  *
  * 许可证数量不足时:
  * 不管谁来抢锁,都会失败
  */
 static final class FairSync extends Sync {
     private static final long serialVersionUID = 2014338818796000944L;
     
     FairSync(int permits) {
         super(permits);
     }
     
     // 申请一次公平锁
     protected int tryAcquireShared(int acquires) {
         return fairTryAcquireShared(acquires);
     }
 }
UnFairSync

重写了AQS的tryAcquireShard因为这个方法父类没有实现

/*
     * 非公平锁
     *
     * 许可证数量充足时:(体现了非公平性)
     * 如果线程T进入了抢锁状态,则不管同步队列中有没有其他排队线程,都会抢锁成功
     *
     * 许可证数量不足时:
     * 不管谁来抢锁,都会失败
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        
        NonfairSync(int permits) {
            super(permits);
        }
        
        // 申请一次非公平锁
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

接下来我们分析下acquire和release函数的引用链

  • acquire
// 申请共享锁,不允许阻塞带有中断标记的线程
public void acquire() throws InterruptedException {
    // 借出一张许可证
    sync.acquireSharedInterruptibly(1);
}

acquire->AQS:acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }

Semaphore:tryAcquireShard -> AQS:doAcquireSharedInterruptibly

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                //只有在前驱节点1为头节点才能被唤醒
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置node结点为头节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                //如果抢锁失败就调用park阻塞线程,线程会响应中断,如果发生中断会抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
  • release

release->AQS:releaseShared->Semaphore:tryReleaseShared->AQS:doReleaseShared

// 释放锁,并唤醒排队的结点
public void release() {
     // 归还一张许可证
     sync.releaseShared(1);
 }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for(; ; ) {
            Node h = head;
            
            // 如果队列中已经没有排队者,则到下面直接退出
            if(h != null && h != tail) {
                int ws = h.waitStatus;
                
                // 需要唤醒后续结点(如果当前节点为SIGNAL,表明需要顺手唤醒该节点后续的结点)
                if(ws == Node.SIGNAL) {
                    if(!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
                        continue;            // loop to recheck cases
                    }
                    
                    // 唤醒node后面陷入阻塞的“后继”
                    unparkSuccessor(h);
                } else {
                    // 尝试设置Node.PROPAGATE标记,说明该节点没有经过阻塞就可以直接获得这个锁,会把该节点status设置为PROPAGATE
                    if(ws == 0) {
                        if(!h.compareAndSetWaitStatus(0, Node.PROPAGATE)){
                            continue;                // loop on failed CAS
                        }
                    }
                }
            }
            
            // loop if head changed
            if(h == head){
                break;
            }
        }
    }

下面用Semaphore写个生产者消费者demo

import java.util.concurrent.Semaphore;

public class SemaphoreUserCase {

    private static int count = 0;
    private static Semaphore mutex = new Semaphore(1);
    private static Semaphore notFull = new Semaphore(10);
    private static Semaphore notNull = new Semaphore(0);

    static class Producer implements Runnable {

        @Override
        public void run(){
            while(true) {
                try{
                    notFull.acquire();
                    mutex.acquire();
                    System.out.println("Produce: " + ++count);
                }catch (InterruptedException e){
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notNull.release();
                }
            }

        }
    }

    static class Consumer implements Runnable{
        @Override
        public void run(){

            while(true) {
                try{
                    notNull.acquire();
                    mutex.acquire();
                    System.out.println("Consume: " + --count);
                }catch (InterruptedException e){
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notFull.release();
                }
            }

        }
    }

    public static void main(String[] args) {
        Thread producer = new Thread(new Producer());
        Thread consumer1 = new Thread(new Consumer());
        Thread consumer2 = new Thread(new Consumer());
        producer.start();
        consumer1.start();
        consumer2.start();
    }

}

ref

https://www.cnblogs.com/leesf456/p/5414778.html

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐