阻塞队列 — DelayedWorkQueue源码分析
前言线程池运行时,会不断从任务队列中获取任务,然后执行任务。如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行。队列是先进先出的数据结构,就是先进入队列的数据,先被获取。但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。实现优先级队列高效常用的
前言
线程池运行时,会不断从任务队列中获取任务,然后执行任务。如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行。
队列是先进先出的数据结构,就是先进入队列的数据,先被获取。但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。
实现优先级队列高效常用的一种方式就是使用堆。关于堆的实现可以查看《堆和二叉堆的实现和特性》
ScheduledThreadPoolExecutor线程池
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以其内部的数据结构和ThreadPoolExecutor基本一样,并在其基础上增加了按时间调度执行任务的功能,分为延迟执行任务和周期性执行任务。
ScheduledThreadPoolExecutor的构造函数只能传3个参数corePoolSize、ThreadFactory、RejectedExecutionHandler,默认maximumPoolSize为Integer.MAX_VALUE。
工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue
,其实现原理和DelayQueue
基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer
操作永远不会阻塞,maximumPoolSize
也就用不上了,所以线程池中永远会保持至多有corePoolSize
个工作线程正在运行。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
DelayedWorkQueue延迟阻塞队列
DelayedWorkQueue 也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用。
工作原理
ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为 ScheduleThreadPoolExecutor 要求的工作队列有些特殊。
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的)。
堆结构如下图:可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组:在这种结构中,可以发现有如下特性: 假设“第一个元素” 在数组中的索引为 0 的话,则父结点和子结点的位置关系如下:
- 索引为 的左孩子的索引是 ;
- 索引为 的右孩子的索引是 ;
- 索引为 的父结点的索引是 ;
为什么要使用DelayedWorkQueue呢?
- 定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
- DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。
源码分析
定义
DelayedWorkQueue 的类继承关系如下:其包含的方法定义如下:
成员属性
// 初始时,数组长度大小。
private static final int INITIAL_CAPACITY = 16;
// 使用数组来储存队列中的元素,根据初始容量创建RunnableScheduledFuture类型的数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 使用lock来保证多线程并发安全问题。
private final ReentrantLock lock = new ReentrantLock();
// 队列中储存元素的大小
private int size = 0;
//特指队列头任务所在leader线程
private Thread leader = null;
// 当队列头的任务延时时间到了,或者新线程可能需要成为leader,用来唤醒等待线程
private final Condition available = lock.newCondition();
DelayedWorkQueue是用数组来储存队列中的元素,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容。
注意这里的leader,它是Leader-Follower
模式的变体,用于减少不必要的定时等待。什么意思呢?
对于多线程的网络模型来说:所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
构造函数
DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的静态类部类,默认只有一个无参构造方法。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// ...
}
入队方法
DelayedWorkQueue 提供了 put/add/offer(带时间) 三个插入元素方法。我们发现与普通阻塞队列相比,这三个添加方法都是调用offer方法。那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进行数组扩容。
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
offer添加元素
ScheduledThreadPoolExecutor
提交任务时调用的是DelayedWorkQueue.add
,而add
、put
等一些对外提供的添加元素的方法都调用了offer
。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 使用lock保证并发操作安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果要超过数组长度,就要进行数组扩容
if (i >= queue.length)
// 数组扩容
grow();
// 将队列中元素个数加一
size = i + 1;
// 如果是第一个元素,那么就不需要排序,直接赋值就行了
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 调用siftUp方法,使插入的元素变得有序。
siftUp(i, e);
}
// 表示新插入的元素是队列头,更换了队列头,
// 那么就要唤醒正在等待获取任务的线程。
if (queue[0] == e) {
leader = null;
// 唤醒正在等待等待获取任务的线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
其基本流程如下:
- 其作为生产者的入口,首先获取锁。
- 判断队列是否要满了(
size >= queue.length
),满了就扩容grow()
。 - 队列未满,size+1。
- 判断添加的元素是否是第一个,是则不需要堆化。
- 添加的元素不是第一个,则需要堆化
siftUp
。 - 如果堆顶元素刚好是此时被添加的元素,则唤醒take线程消费。
- 最终释放锁。
offer基本流程图如下:
扩容grow()
可以看到,当队列满时,不会阻塞等待,而是继续扩容。新容量newCapacity
在旧容量oldCapacity
的基础上扩容50%(oldCapacity >> 1
相当于oldCapacity /2
)。最后Arrays.copyOf
,先根据newCapacity
创建一个新的空数组,然后将旧数组的数据复制到新数组中。
private void grow() {
int oldCapacity = queue.length;
// 每次扩容增加原来数组的一半数量。
// grow 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
// 使用Arrays.copyOf来复制一个新数组
queue = Arrays.copyOf(queue, newCapacity);
}
向上堆化siftUp
新添加的元素先会加到堆底,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。通过循环,来查找元素key应该插入在堆二叉树那个节点位置,并交互父节点的位置。
向上堆化siftUp的详细过程可以查看《堆和二叉堆的实现和特性》
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 当k==0时,就到了堆二叉树的根节点了,跳出循环
while (k > 0) {
// 父节点位置坐标, 相当于(k - 1) / 2
int parent = (k - 1) >>> 1;
// 获取父节点位置元素
RunnableScheduledFuture<?> e = queue[parent];
// 如果key元素大于父节点位置元素,满足条件,那么跳出循环
// 因为是从小到大排序的。
if (key.compareTo(e) >= 0)
break;
// 否则就将父节点元素存放到k位置
queue[k] = e;
// 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。
setIndex(e, k);
// 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
k = parent;
}
// 循环结束,k就是元素key应该插入的节点位置
queue[k] = key;
setIndex(key, k);
}
代码很好理解,就是循环的根据key节点与它的父节点来判断,如果key节点的执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。
假设新入队的节点的延迟时间(调用getDelay()方法获得)是 5 ,执行过程如下:
- 先将新的节点添加到数组的尾部,这时新节点的索引k为7
- 计算新父节点的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的时间间隔值为8,因为 5 < 8 ,将执行queue[7] = queue[3]
- 这时将k设置为3,继续循环,再次计算parent为1,queue[1]的时间间隔为3,因为 5 > 3 ,这时退出循环,最终k为3
可见,每次新增节点时,只是根据父节点来判断,而不会影响兄弟节点。
出队方法
DelayedWorkQueue 提供了以下几个出队方法
- take(),等待获取队列头元素
- poll() ,立即获取队列头元素
- poll(long timeout, TimeUnit unit) ,超时等待获取队列头元素
take消费元素
Worker
工作线程启动后就会循环消费工作队列中的元素,因为ScheduledThreadPoolExecutor
的keepAliveTime=0
,所以消费任务其只调用了DelayedWorkQueue.take
。take基本流程如下:
- 首先获取可中断锁,判断堆顶元素是否是空,空的则阻塞等待
available.await()
。 - 堆顶元素不为空,则获取其延迟执行时间
delay
,delay <= 0
说明到了执行时间,出队列finishPoll
。 delay > 0
还没到执行时间,判断leader
线程是否为空,不为空则说明有其他take线程也在等待,当前take将无限期阻塞等待。leader
线程为空,当前take线程设置为leader
,并阻塞等待delay
时长。- 当前leader线程等待delay时长自动唤醒护着被其他take线程唤醒,则最终将
leader
设置为null
。 - 再循环一次判断
delay <= 0
出队列。 - 跳出循环后判断leader为空并且堆顶元素不为空,则唤醒其他take线程,最后是否锁。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果没有任务,就让线程在available条件下等待。
if (first == null)
available.await();
else {
// 获取任务的剩余延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果延时时间到了,就返回这个任务,用来执行。
if (delay <= 0)
return finishPoll(first);
// 将first设置为null,当线程等待时,不持有first的引用
first = null; // don't retain ref while waiting
// 如果还是原来那个等待队列头任务的线程,
// 说明队列头任务的延时时间还没有到,继续等待。
if (leader != null)
available.await();
else {
// 记录一下当前等待队列头任务的线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当任务的延时时间到了时,能够自动超时唤醒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null) // 唤醒等待任务的线程
available.signal();
ock.unlock();
}
}
take基本流程图如下:
take线程阻塞等待
可以看出这个生产者take线程会在两种情况下阻塞等待:
- 堆顶元素为空。
- 堆顶元素的delay > 0 。
take方法是什么时候调用的呢?
在ThreadPoolExecutor中,getTask方法,工作线程会循环地从workQueue中取任务。但定时任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。
leader线程
再来说一下leader的作用,这里的leader是为了减少不必要的定时等待。leader
线程的设计,是Leader-Follower
模式的变种,旨在于为了不必要的时间等待。当一个take
线程变成leader
线程时,只需要等待下一次的延迟时间,而不是leader
线程的其他take
线程则需要等leader
线程出队列了才唤醒其他take
线程。
举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。多个这时执行该段代码是没有作用的,因为只能有一个线程会从take中返回queue[0](因为有lock),其他线程这时再返回for循环执行时取的queue[0],已经不是之前的queue[0]了,然后又要继续阻塞。
所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally中调用了signal()来唤醒一个线程,而不是signalAll())。
finishPoll出队列
堆顶元素delay<=0
,执行时间到,出队列就是一个向下堆化的过程siftDown
。
// 移除队列头元素
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 将队列中元素个数减一
int s = --size;
// 获取队列末尾元素x
RunnableScheduledFuture<?> x = queue[s];
// 原队列末尾元素设置为null
queue[s] = null;
if (s != 0)
// 因为移除了队列头元素,所以进行重新排序。
siftDown(0, x);
setIndex(f, -1);
return f;
}
堆的删除方法主要分为三步:
- 先将队列中元素个数减一;
- 将原队列末尾元素设置成为队列头元素,再将队列末尾元素设置为null;
- 调用setDown(O,x)方法,保证按照元素的优先级排序。
向下堆化siftDown
由于堆顶元素出队列后,就破坏了堆的结构,需要组织整理下,将堆尾元素移到堆顶,然后向下堆化:
- 从堆顶开始,父亲节点与左右子节点中较小的孩子节点比较(左孩子不一定小于右孩子)。
- 父亲节点小于等于较小孩子节点,则结束循环,不需要交换位置。
- 若父亲节点大于较小孩子节点,则交换位置。
- 继续向下循环判断父亲节点和孩子节点的关系,直到父亲节点小于等于较小孩子节点才结束循环。
向下堆化siftDown的详细过程可以查看《堆和二叉堆的实现和特性》
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 无符号右移,相当于size/2
int half = size >>> 1;
// 通过循环,保证父节点的值不能大于子节点。
while (k < half) {
// 左子节点, 相当于 (k * 2) + 1
int child = (k << 1) + 1;
// 左子节点位置元素
RunnableScheduledFuture<?> c = queue[child];
// 右子节点, 相当于 (k * 2) + 2
int right = child + 1;
// 如果左子节点元素值大于右子节点元素值,那么右子节点才是较小值的子节点。
// 就要将c与child值重新赋值
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果父节点元素值小于较小的子节点元素值,那么就跳出循环
if (key.compareTo(c) <= 0)
break;
// 否则,父节点元素就要和子节点进行交换
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
siftDown方法执行时包含两种情况,一种是没有子节点,一种是有子节点(根据half判断)。
例如:没有子节点的情况:
假设初始的堆如下:
- 假设 k = 3 ,那么 k = half ,没有子节点,在执行siftDown方法时直接把索引为3的节点设置为数组的最后一个节点:
有子节点的情况:
假设 k = 0 ,那么执行以下步骤:
- 获取左子节点,child = 1 ,获取右子节点, right = 2 :
- 由于 right < size ,这时比较左子节点和右子节点时间间隔的大小,这里 3 < 7 ,所以 c = queue[child] ;
- 比较key的时间间隔是否小于c的时间间隔,这里不满足,继续执行,把索引为k的节点设置为c,然后将k设置为child;
- 因为 half = 3 ,k = 1 ,继续执行循环,这时的索引变为:
- 这时再经过如上判断后,将k的值为3,最终的结果如下:
- 最后,如果在finishPoll方法中调用的话,会把索引为0的节点的索引设置为-1,表示已经删除了该节点,并且size也减了1,最后的结果如下:
可见,siftdown方法在执行完并不是有序的,但可以发现,子节点的下次执行时间一定比父节点的下次执行时间要大,由于每次都会取左子节点和右子节点中下次执行时间最小的节点,所以还是可以保证在take和poll时出队是有序的。
poll()
立即获取队列头元素,当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,因此直接返回null。否则调用finishPoll方法,移除队列头元素并返回。
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// 队列头任务是null,或者任务延时时间没有到,都返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 移除队列头元素
return finishPoll(first);
} finally {
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
超时等待获取队列头元素,与take方法相比较,就要考虑设置的超时时间,如果超时时间到了,还没有获取到有用任务,那么就返回null。其他的与take方法中逻辑一样。
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果没有任务。
if (first == null) {
// 超时时间已到,那么就直接返回null
if (nanos <= 0)
return null;
else
// 否则就让线程在available条件下等待nanos时间
nanos = available.awaitNanos(nanos);
} else {
// 获取任务的剩余延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果延时时间到了,就返回这个任务,用来执行。
if (delay <= 0)
return finishPoll(first);
// 如果超时时间已到,那么就直接返回null
if (nanos <= 0)
return null;
// 将first设置为null,当线程等待时,不持有first的引用
first = null; // don't retain ref while waiting
// 如果超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。
// 在这里让线程等待超时时间nanos
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当任务的延时时间到了时,能够自动超时唤醒。
long timeLeft = available.awaitNanos(delay);
// 计算剩余的超时时间
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 唤醒等待任务的线程
available.signal();
lock.unlock();
}
}
remove删除指定元素
删除指定元素一般用于取消任务时,任务还在阻塞队列中,则需要将其删除。当删除的元素不是堆尾元素时,需要做堆化处理。
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
//维护heapIndex
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
//删除的不是堆尾元素,则需要堆化处理
//先向下堆化
siftDown(i, replacement);
if (queue[i] == replacement)
//若向下堆化后,i位置的元素还是replacement,说明四无需向下堆化的,
//则需要向上堆化
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
假设初始的堆结构如下:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SqhXOn3E-1653913362599)(data:image/gif;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVQImWNgYGBgAAAABQABh6FO1AAAAABJRU5ErkJggg==)]这时要删除8的节点,那么这时 k = 1,key为最后一个节点:这时通过上文对siftDown方法的分析,siftDown方法执行后的结果如下:这时会发现,最后一个节点的值比父节点还要小,所以这里要执行一次siftUp方法来保证子节点的下次执行时间要比父节点的大,所以最终结果如下:
总结
使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。
- DelayedWorkQueue的数据结构是基于堆实现的;
- DelayedWorkQueue采用数组实现堆,根节点出队,用最后叶子节点替换,然后下推至满足堆成立条件;最后叶子节点入队,然后向上推至满足堆成立条件;
- DelayedWorkQueue添加元素满了之后会自动扩容原来容量的1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以线程池中至多有corePoolSize个工作线程正在运行;
- DelayedWorkQueue 消费元素take,在堆顶元素为空和delay >0 时,阻塞等待;
- DelayedWorkQueue 是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式;
- DelayedWorkQueue 有一个leader线程的变量,是Leader-Follower模式的变种。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)