![cover](https://img-home.csdnimg.cn/images/20230309035550.png)
20.AQS
1.概念AQS(AbstractQueuedSynchronizer),是一个用于构建锁、同步器、协作工具类的框架。2.三要素(1).state根据具体实现类的不同显示不同的含义,如CountDownLatch类表示还需要倒数的数量,Semaphore类表示剩余许可证,ReentrantLock类中表示重入次数。(2).控制线程抢锁和配合的FIFO队列队列是用来存放等待的线程,当多个线程竞争同一把
·
1.概念
AQS(AbstractQueuedSynchronizer),是一个用于构建锁、同步器、协作工具类的框架。
2.三要素
(1).state
根据具体实现类的不同显示不同的含义,如CountDownLatch类表示还需要倒数的数量,Semaphore类表示剩余许可证,ReentrantLock类中表示重入次数。
(2).控制线程抢锁和配合的FIFO队列
队列是用来存放等待的线程,当多个线程竞争同一把锁时,只有一个线程获取锁,其它线程就会被封装node结点加入到队列中,当锁被释放时,AQS就需要从队列中挑选一个合适的线程来占有锁。
(3).期望协作工具类去实现的重要方法
- 获取锁方法
- 释放锁方法
3.接口概览
(1).Lock
public interface Lock {
//如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
void lock();
//和lock()方法相似, 但阻塞的线程可中断,抛出java.lang.InterruptedException异常
void lockInterruptibly() throws InterruptedException;
//非阻塞方式获取锁
boolean tryLock();
//带有超时时间非阻塞方式获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
Condition newCondition();
}
(2).ReadWriteLock
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
4.ReentrantLock源码解析
(1).类关系图
(2).非公平锁的lock()方法源码解析
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
- 先通过CAS尝试获取锁,如果获取成功,就将锁的拥有者设置为自己
- 否则调用AbstractQueuedSynchronizer.acquire()
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
- 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
- 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
//获得当前执行的线程
final Thread current = Thread.currentThread();
//获得state的值
int c = getState();
//state=0说明当前是无锁状态
if (c == 0) {
//通过cas操作将state的值改为1
//使用cas操作是为了避免多线程环境下存在的线程不安全问题
if (compareAndSetState(0, acquires)) {
//设置锁的拥有者为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
//增加重入次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 获取当前线程,判断当前的锁的状态
- 如果state=0表示当前是无锁状态,通过cas更新state状态的值
- 如果当前线程是属于重入,则增加重入次数
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
Node pred = tail;
//tail不为空,说明队列中存在被阻塞的线程
if (pred != null) {
//当前线程Node的prev节点指向tail
node.prev = pred;
//通过CAS将node添加到AQS队列
if (compareAndSetTail(pred, node)) {
//CAS成功,把旧tail的next指针指向新的tail
pred.next = node;
return node;
}
}
enq(node);
return node;
}
}
- 将当前线程封装成Node
- 判断当前链表中的tail节点是否为空,如果不为空,则通过cas操作把当前线程的node添加到AQS队列
- 如果为空或者cas失败,调用enq()方法通过自旋将节点添加到AQS队列
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private Node enq(final Node node) {
for (;;) {
//如果是第一次添加到队列,那么tail=null
Node t = tail;
if (t == null) {
//CAS的方式创建一个空的Node作为头结点
if (compareAndSetHead(new Node()))
//此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
//进行第二次循环时,tail不为null,进入else区域。
//将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
node.prev = t;
if (compareAndSetTail(t, node)) {
//t此时指向tail,所以可以CAS成功,将tail重新指向Node,此时t为更新前tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
t.next = node;
return t;
}
}
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果前驱为head才有资格进行锁的抢夺
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
- 获取当前节点的prev节点
- 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁
- 抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点
- 如果获得锁失败,则根据waitStatus决定是否需要挂起线程
- 最后,通过cancelAcquire取消获得锁的操作
(3).unlock()方法源码解析
public class ReentrantLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
//释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒队首线程
unparkSuccessor(h);
return true;
}
return false;
}
}
- 释放锁
- 唤醒队首线程
public class ReentrantLock implements Lock, java.io.Serializable {
protected final boolean tryRelease(int releases) {
//将锁的数量减1
int c = getState() - releases;
//如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//直到最后一次释放锁时,才会把当前线程释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
- 将状态减掉传入的参数值(参数是1)
- 如果结果状态为0,就将排它锁的Owner设置为null,使得其它线程有机会获取锁
- 如果结果状态不为0,就设置state
5.CountDownLatch源码解析
public class CountDownLatch {
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//加锁
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//释放
public void countDown() {
sync.releaseShared(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//加锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//释放
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
6.Semaphore源码解析
public class Semaphore implements java.io.Serializable {
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
//非公平锁加锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
//释放
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
//非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//公平
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//公平锁加锁
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
更多推荐
所有评论(0)