1.概念
AQS(AbstractQueuedSynchronizer),是一个用于构建锁、同步器、协作工具类的框架。

2.三要素
(1).state
根据具体实现类的不同显示不同的含义,如CountDownLatch类表示还需要倒数的数量,Semaphore类表示剩余许可证,ReentrantLock类中表示重入次数。

(2).控制线程抢锁和配合的FIFO队列
队列是用来存放等待的线程,当多个线程竞争同一把锁时,只有一个线程获取锁,其它线程就会被封装node结点加入到队列中,当锁被释放时,AQS就需要从队列中挑选一个合适的线程来占有锁。
在这里插入图片描述

(3).期望协作工具类去实现的重要方法

  • 获取锁方法
  • 释放锁方法

3.接口概览
(1).Lock

public interface Lock {
	//如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
	void lock();
	
	//和lock()方法相似, 但阻塞的线程可中断,抛出java.lang.InterruptedException异常
	void lockInterruptibly() throws InterruptedException;

	//非阻塞方式获取锁
	boolean tryLock();
	
	//带有超时时间非阻塞方式获取锁
	boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
	
	//释放锁
	void unlock();
	
	Condition newCondition();
}

在这里插入图片描述

(2).ReadWriteLock

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

在这里插入图片描述

4.ReentrantLock源码解析
(1).类关系图
在这里插入图片描述

(2).非公平锁的lock()方法源码解析
在这里插入图片描述

static final class NonfairSync extends Sync {
	final void lock() {
	    if (compareAndSetState(0, 1))
	        setExclusiveOwnerThread(Thread.currentThread());
	    else
	        acquire(1);
	}
}
  • 先通过CAS尝试获取锁,如果获取成功,就将锁的拥有者设置为自己
  • 否则调用AbstractQueuedSynchronizer.acquire()
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}
  • 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
  • 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
static final class NonfairSync extends Sync {
	protected final boolean tryAcquire(int acquires) {
		return nonfairTryAcquire(acquires);
	}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    final boolean nonfairTryAcquire(int acquires) {
    	//获得当前执行的线程
        final Thread current = Thread.currentThread();
        //获得state的值
        int c = getState();
        //state=0说明当前是无锁状态
        if (c == 0) {
        	//通过cas操作将state的值改为1
        	//使用cas操作是为了避免多线程环境下存在的线程不安全问题
            if (compareAndSetState(0, acquires)) {
            	//设置锁的拥有者为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        } 
        //如果是同一个线程来获得锁,则直接增加重入次数
        else if (current == getExclusiveOwnerThread()) {
        	//增加重入次数
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
  • 获取当前线程,判断当前的锁的状态
  • 如果state=0表示当前是无锁状态,通过cas更新state状态的值
  • 如果当前线程是属于重入,则增加重入次数
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        //tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
        Node pred = tail;
        //tail不为空,说明队列中存在被阻塞的线程
        if (pred != null) {
        	//当前线程Node的prev节点指向tail
            node.prev = pred;
            //通过CAS将node添加到AQS队列
            if (compareAndSetTail(pred, node)) {
            	//CAS成功,把旧tail的next指针指向新的tail
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
}
  • 将当前线程封装成Node
  • 判断当前链表中的tail节点是否为空,如果不为空,则通过cas操作把当前线程的node添加到AQS队列
  • 如果为空或者cas失败,调用enq()方法通过自旋将节点添加到AQS队列
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	private Node enq(final Node node) {
        for (;;) {
			//如果是第一次添加到队列,那么tail=null
            Node t = tail;
            if (t == null) {
            	//CAS的方式创建一个空的Node作为头结点
                if (compareAndSetHead(new Node()))
                	//此时队列中只一个头结点,所以tail也指向它
                    tail = head;
            } else {
            	//进行第二次循环时,tail不为null,进入else区域。
            	//将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                	//t此时指向tail,所以可以CAS成功,将tail重新指向Node,此时t为更新前tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
                    t.next = node;
                    return t;
                }
            }
        }
    }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果前驱为head才有资格进行锁的抢夺
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                //如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}
  • 获取当前节点的prev节点
  • 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁
  • 抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点
  • 如果获得锁失败,则根据waitStatus决定是否需要挂起线程
  • 最后,通过cancelAcquire取消获得锁的操作

(3).unlock()方法源码解析

public class ReentrantLock implements Lock, java.io.Serializable {
	public void unlock() {
        sync.release(1);
    }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	public final boolean release(int arg) {
		//释放锁
	    if (tryRelease(arg)) {
			Node h = head;
	   	    if (h != null && h.waitStatus != 0)
	   	    	//唤醒队首线程
				unparkSuccessor(h);
			return true;
	    }
		return false;
	}
}
  • 释放锁
  • 唤醒队首线程
public class ReentrantLock implements Lock, java.io.Serializable {
	protected final boolean tryRelease(int releases) {
		//将锁的数量减1
        int c = getState() - releases;
        //如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
        	//直到最后一次释放锁时,才会把当前线程释放
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}
  • 将状态减掉传入的参数值(参数是1)
  • 如果结果状态为0,就将排它锁的Owner设置为null,使得其它线程有机会获取锁
  • 如果结果状态不为0,就设置state

5.CountDownLatch源码解析

public class CountDownLatch {
	public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

	//加锁
	public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

	//释放
	public void countDown() {
        sync.releaseShared(1);
    }

	private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
		
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

		//加锁
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

		//释放
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
}

6.Semaphore源码解析

public class Semaphore implements java.io.Serializable {
	public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
	
	public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
	
	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
	abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

		//非公平锁加锁
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

		//释放
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
	
	//非公平
	static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }
		
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

	//公平
	static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

		//公平锁加锁
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐