导航: 

【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析

目录

一、什么是线程池?

1.1 基本介绍

1.2 创建线程的两种方式

1.2.1 方式1:自定义线程池(推荐)

1.2.2 方式2:线程池工具类

二、ThreadPoolExecutor源码解析

2.1 基本介绍 

2.2 扩展:ExecutorService接口

2.3 线程池参数

2.3.1 ThreadPoolExecutor的七个参数

2.3.2 如何为线程池设置合适的线程数

2.3.3 代码示例

2.3.4 线程池的初始化

2.3.4.1 不指定线程工厂的构造方法

2.3.4.2 全参数核心构造方法

2.3.5 拒绝策略详解

2.3.5.0 概述

2.3.5.1 AbortPolicy:抛异常

2.3.5.2 CallerRunsPolicy:调用线程run()方法

2.3.5.3 DiscardOldestPolicy:丢弃最老任务

2.3.5.4 DiscardPolicy:丢弃新任务

2.3.5.5 自定义拒绝策略

2.4 核心字段和内部类

2.4.1 基本介绍

2.4.2 ctl:线程池当前状态和数量

2.4.3 COUNT_BITS和CAPACITY:线程数上限

2.4.4 五个状态字段:线程池的生命周期 

2.4.5 扩展:线程的生命周期  ​

2.4.6 workers和mainLock:维护所有工作线程

2.4.7 Worker类:控制中断线程、执行runWorker()方法

2.4.8 allowCoreThreadTimeOut:是否允许核心线程超时

2.5 execute():启动线程

2.5.1 工作原理

2.5.2 源码解析

2.6 addWorker():添加工作线程

2.7 runWorker ():执行线程

2.8 getTask():从队列获取任务

2.9 关闭

2.9.1 shutdown():关闭线程

2.9.1.1 源码解析

2.9.1.2 校验关闭权限:checkShutdownAccess()

2.9.1.3 修改线程池状态:advanceRunState()

2.9.1.4 中断空闲线程:interruptIdleWorkers()

2.9.1.5 尝试终止线程池:tryTerminate()

2.9.2 shutdownNow():停止线程

2.9.2.1 源码解析

2.9.2.2 清空阻塞队列:drainQueue()

三、Executors源码解析

3.1 Executors工具类

3.2 newFixedThreadPool:固定大小的线程池

3.3 newCachedThreadPool:缓存线程池(无限大)

3.4 newScheduledThreadPool:定时任务线程池

3.5 newSingleThreadExecutor:单线程化的线程池


一、什么是线程池?

1.1 基本介绍

为了对多线程进行统一的管理,Java引入了线程池,它通过限制并发线程的数量、将待执行的线程放入队列、销毁空闲线程,来控制资源消耗,使线程更合理地运行,避免系统因为创建过多线程而崩溃。

线程池作用: 

  • 管理线程数量:它可以管理线程的数量,可以避免无节制的销毁、创建线程,导致额外的性格损耗、或者线程数超出系统负荷直至崩溃。
  • 提高性能:当有新的任务到来时,可以直接从线程池中取出一个空闲线程来执行任务,而不需要等待创建新线程,从而减少了响应时间。
  • 让线程复用:它还可以让线程复用,可以大大地减少创建和销毁线程所带来的开销。
  • 合理的拒绝策略:线程池提供了多种拒绝策略,当线程池队列满了时,可以采用不同的策略进行处理,如抛出异常、丢弃任务或调用者运行等。

1.2 创建线程的两种方式

线程池一共有两种,分别是:

  • 自定义线程池ThreadPoolExecutor(推荐)
  • 线程池工具类Executors.newXxxThreadPool

1.2.1 方式1:自定义线程池(推荐)

线程池执行器ThreadPoolExecutor创建自定义线程池:

        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor( 
    			 5,    //核心线程数
                 200,    //最大线程数量,控制资源并发
                 10,    //存活时间
                TimeUnit.SECONDS,    //时间单位
                new LinkedBlockingDeque<>(  100000),    //任务队列,大小100000个
        Executors.defaultThreadFactory(),    //线程的创建工厂
        new ThreadPoolExecutor.AbortPolicy());    //拒绝策略

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {    //开启异步编排,有返回值
            return 1;
        }, threadPoolExecutor).thenApplyAsync(res -> {    //串行化,接收参数并有返回值
            return res+1;
        }, threadPoolExecutor);
        Integer integer = future.get();    //获取返回值

 七个参数:

  • corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。
  • maximumPoolSize:最大线程数量。阻塞队列满了
  • keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。
  • TimeUnitunit:时间单位
  • workQueue: 任务队列。如果线程数超过核心数量,就把剩余的任务放到队列里。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。
    • SynchronousQueue是一个同步队列,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
    • LinkedBlockingQueue是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。无限缓存,拒绝策略就能随意了。
    • ArrayBlockingQueue是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。推荐,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千,新任务丢弃后未来重新入队。
    • PriorityBlockingQueue是一个优先级队列,能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。
      new PriorityBlockingQueue<>((o1, o2) -> o1.length() - o2.length());

  • threadFactory:线程的创建工厂。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以自定义线程工厂(实现ThreadFactory接口)
  • RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。
    • Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
    • CallerRuns:直接同步调用线程run()方法,不创建线程了
    • DiscardOldest:丢弃最老任务
    • Discard:直接丢弃新任务
    • 实现拒绝执行处理器接口(RejectedExecutionHandler),自定义拒绝策略。

1.2.2 方式2:线程池工具类

执行器工具类Executors创建线程池: 底层都是return new ThreadPoolExecutor(...)。一般不使用这种方式,参数配置死了不可控。

  • newFixedThreadPool:固定大小的线程池。
    • 核心线程数:所有线程都是核心线程(通过构造参数指定),最大线程数=核心线程数。
    • 存活时间0s:因为所有线程都是核心线程,所以用不到存活时间,线程都会一直存活。keepAliveTime为0S。
    • 链表阻塞队列:超出的线程会在LinkedBlockingQueue队列中等待。
  • newCachedThreadPool:缓存线程池(无限大)。 
    • 核心线程数是0,最大线程数无限大:最大线程数Integer.MAX_VALUE。线程数量可以无限扩大,所有线程都是非核心线程。
    • 空闲线程存活时间60s:keepAliveTime为60S,空闲线程超过60s会被杀死。
    • 同步队列:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
  • newScheduledThreadPool:定时任务线程池。创建一个定长线程池, 支持定时及周期性任务执行。可指定核心线程数,最大线程数。
  • newSingleThreadExecutor:单线程化的线程池。核心线程数与最大线程数都只有一个,不回收。后台从LinkedBlockingQueue队列中获取任务​。创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 
// 源码
// FixedThreadPool: 固定大小线程池
// CachedTheadPool: 缓存线程池
new ThreadExcutor(0, max_valuem, 60L, s, new SynchronousQueue<Runnable>());
new ThreadExcutor(n, n, 0L, ms, new LinkedBlockingQueue<Runable>()

// ScheduledThreadPoolExcutor: 定时任务线程池
ScheduledThreadPool, SingleThreadScheduledExecutor
// SingleThreadExecutor: 单线程化线程池
new ThreadExcutor(1, 1, 0L, ms, new LinkedBlockingQueue<Runable>())

代码示例:

/**
 * @Author: vince
 * @CreateTime: 2024/09/10
 * @Description: 测试类
 * @Version: 1.0
 */
public class Test {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 3; i++) {
            pool.execute(() -> System.out.println("Hello World"));
        }
    }
}

二、ThreadPoolExecutor源码解析

2.1 基本介绍 

ThreadPoolExecutor是线程池最核心的类,在JDK1.5引入,位于java.util.concurrent包下,是ExecutorService接口的实现类。 

package java.util.concurrent;
/**
 * 自定义线程池
 * @since 1.5
 * @author Doug Lea
 */
public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
}

public abstract class AbstractExecutorService implements ExecutorService {
    //  ...
}

2.2 扩展:ExecutorService接口

ExecutorService是Java提供的线程池接口,所有线程池都直接或间接实现ExecutorService接口。它有两个最核心的方法,即submit或execute方法,用于线程池提交任务。

submit或execute方法:

  • execute:参数只能是Runnable,没有返回值
  • submit:参数可以是Runnable、Callable,返回值是FutureTask 

Executor接口:

只有一个execute方法,用来替代通常创建或启动线程的方法。 

代码示例:

execute:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 创建一个具有固定线程数的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 提交任务给线程池执行
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executorService.execute(() -> {
                System.out.println("Task ID : " + taskId + " 执行中...");

                // 模拟任务执行耗时
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Task ID : " + taskId + " 完成");
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

submit:

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 创建一个具有固定线程数的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 提交任务给线程池执行
        for (int i = 0; i < 10; i++) {
            int taskId = i;
            Future<String> future = executorService.submit(() -> {
                System.out.println("Task ID : " + taskId + " 执行中...");

                // 模拟任务执行耗时
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                return "Task ID : " + taskId + " 完成";
            });

            // 获取任务的执行结果
            try {
                String result = future.get();
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

2.3 线程池参数

2.3.1 ThreadPoolExecutor的七个参数

        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, // 核心线程数
                200, // 最大线程数量
                10, // 空闲线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingDeque<>(100000), // 任务队列,大小100000
                Executors.defaultThreadFactory(), // 线程工厂
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );

 ThreadPoolExecutor 的构造方法有7个重要的参数,分别是下面几个:

  • corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。
  • maximumPoolSize:最大线程数量。阻塞队列满了
  • keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。
  • TimeUnitunit:时间单位
  • workQueue: 任务队列。如果线程数超过核心数量,就把剩余的任务放到队列里。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。
    • LinkedBlockingQueue:链表阻塞队列。是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。无限缓存,拒绝策略就能随意了。
    • ArrayBlockingQueue:数组阻塞队列。是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。推荐,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千,新任务丢弃后未来重新入队。
    • SynchronousQueue:同步队列。这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
    • PriorityBlockingQueue:优先级队列。能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。
      new PriorityBlockingQueue<>((o1, o2) -> o1.length() - o2.length());

    • DelayedWorkQueue:延迟阻塞队列。无界的延迟+优先级队列,每个任务有个延时时间,按时长逆序排列在队列中。延时队列越短,则优先级最高,在队列中越靠前,优先被消费。

  • threadFactory:线程的创建工厂。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以自定义线程工厂(实现ThreadFactory接口)
  • RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。
    • Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
    • CallerRuns:直接同步调用线程run()方法,不创建线程了
    • DiscardOldest:丢弃最老任务
    • Discard:直接丢弃新任务
    • 实现拒绝执行处理器接口(RejectedExecutionHandler),自定义拒绝策略。

2.3.2 如何为线程池设置合适的线程数

下面的参数只是一个预估值,适合初步设置,具体的线程数需要经过压测确定,压榨(更好的利用)CPU的性能。

CPU核心数为N;

核心线程数:

  • CPU密集型:N+1。数量与CPU核数相近是为了不浪费CPU,并防止频繁的上下文切换,加1是为了有线程被阻塞后还能不浪费CPU的算力。
  • I/O密集型:2N,或N/(1-阻塞系数)。I/O密集型任务CPU使用率并不是很高,可以让CPU在等待I/O操作的时去处理别的任务,充分利用CPU,所以数量就比CPU核心数高一倍。有些公司会考虑阻塞系数,阻塞系数是任务线程被阻塞的比例,一般是0.8~0.9。
  • 实际开发中更适合的公式:N*((线程等待时间+线程计算时间)/线程计算时间)

最大线程数:设成核心线程数的2-4倍。数量主要由CPU和IO的密集性、处理的数据量等因素决定。

需要增加线程的情况:jstack打印线程快照,如果发现线程池中大部分线程都等待获取任务、则说明线程够用。如果大部分线程都处于运行状态,可以继续适当调高线程数量。

jstack:打印指定进程此刻的线程快照。定位线程长时间停顿的原因,例如死锁、等待资源、阻塞。如果有死锁会打印线程的互相占用资源情况。线程快照:该进程内每条线程正在执行的方法堆栈的集合。

2.3.3 代码示例

创建一个线程池,再创建10个线程打印文字:

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, // 核心线程数
                200, // 最大线程数量
                10, // 空闲线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingDeque<>(100000), // 任务队列,大小100000
                Executors.defaultThreadFactory(), // 线程工厂
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );

        // 提交任务到线程池
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            });
        }

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

可以看见只有5个核心线程执行任务,原因阻塞队列有100000的大小,所以用不到新增非核心线程:

2.3.4 线程池的初始化

2.3.4.1 不指定线程工厂的构造方法

ThreadPoolExecutor有多个构造方法,这些构造方法内部实际都调用了全参数的构造方法

例如不指定线程的创建工厂,将使用默认的线程创建工厂:

    /**
     * 不指定线程工厂的构造方法
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 线程存活时间
     * @param unit 线程存活时间单位
     * @param workQueue 任务队列
     * @param handler 拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        // 不指定线程工厂,将使用默认工厂,调用7参数的构造方法
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler);
    }

2.3.4.2 全参数核心构造方法

核心流程:

  1. 参数校验;非空校验、大于0校验、最大线程数大于核心线程数;
  2. 参数赋值:7个参数分别赋值给对应的成员变量

具体代码: 

    /**
     * 构造方法初始化
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 非核心空闲线程存活时间
     * @param unit 非核心空闲线程存活时间单位
     * @param workQueue 任务队列
     * @param threadFactory 线程工厂(生产线程的地方)
     * @param handler 拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 1.参数校验:
        // 核心线程数不能为负
        // 最大线程数不能为0且必须大于核心线程数
        // 非核心空闲线程存活时间不能为负
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();

        // 任务队列、线程工厂和拒绝策略不能为空
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();

        // 2.将传入的参数赋值给线程池的成员变量
        // 核心线程数
        this.corePoolSize = corePoolSize;
        // 最大线程数
        this.maximumPoolSize = maximumPoolSize;
        // 任务队列
        this.workQueue = workQueue;
        // 将存活时间转换为纳秒存储
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        // 线程工厂,用于创建新线程
        this.threadFactory = threadFactory;
        // 拒绝策略,当线程池无法执行任务时使用
        this.handler = handler; 
    }

2.3.5 拒绝策略详解

2.3.5.0 概述

四个拒绝策略:

  • Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
  • CallerRuns:直接同步调用线程run()方法,不创建线程了
  • DiscardOldest:丢弃最老任务
  • Discard:直接丢弃新任务

线程池共有以上4个拒绝策略,他们都属于ThreadPoolExecutor的内部类:

2.3.5.1 AbortPolicy:抛异常

核心流程:

  • 直接抛出拒绝执行异常(RejectedExecutionException)

具体代码:

    /**
     * 直接抛异常拒绝策略
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * 构造方法
         */
        public AbortPolicy() {
        }

        /**
         * 拒绝执行方法:总是抛出RejectedExecutionException异常
         *
         * @param r 线程
         * @param e 线程池
         * @throws RejectedExecutionException 总是抛出该异常
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 代码只有一行,抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
2.3.5.2 CallerRunsPolicy:调用线程run()方法

核心流程:

  • 如果线程池还活着,直接同步调用线程的run()方法

具体代码:

    /**
     * 同步调用拒绝策略
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {

        /**
         * 创建一个 {@code CallerRunsPolicy} 实例。
         */
        public CallerRunsPolicy() {
        }

        /**
         * 拒绝执行方法
         * 此时任务将被丢弃。
         *
         * @param r 线程
         * @param e 线程池
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 如果线程池还活着,直接同步调用线程的run()方法
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
2.3.5.3 DiscardOldestPolicy:丢弃最老任务

核心流程:

  • 如果当前的阻塞队列满了,弹出时间最久的

具体代码:

    /**
     * 丢弃最老任务策略
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        /**
         * 为给定的执行器创建一个 {@code DiscardOldestPolicy} 实例。
         */
        public DiscardOldestPolicy() {
        }

        /**
         * 丢弃最老任务
         *
         * @param r 线程
         * @param e 线程池
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 如果线程池还活着,就让最老任务出队,执行最新任务
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
2.3.5.4 DiscardPolicy:丢弃新任务

核心流程:

  • 丢弃新任务:方法内一行代码也没有,不执行任何操作,相当于丢弃任务

具体代码:

    /**
     * 丢弃新任务策略
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {

        /**
         * 创建一个 {@code DiscardPolicy} 实例。
         */
        public DiscardPolicy() {
        }

        /**
         * 不执行任何操作,相当于丢弃任务 r
         *
         * @param r 线程
         * @param e 线程池
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
2.3.5.5 自定义拒绝策略

通过实现拒绝执行处理器接口(RejectedExecutionHandler),重写rejectedExection()方法,可以创建一个自定义的拒绝策略。

例如:创建一个丢弃策略,丢弃最老任务,并且把让新任务优先执行

/**
 * @Author: vince
 * @CreateTime: 2024/08/20
 * @Description: 丢最老和新任务拒绝策略
 * @Version: 1.0
 */
public class DiscardOldestAndNewestPolicy implements RejectedExecutionHandler {

    /**
     * 自定义拒绝策略,丢弃最老的和最新的任务,并打印日志。
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        BlockingQueue<Runnable> queue = executor.getQueue();

        // 检查队列是否为空,不为空则移除最老的任务
        if (!queue.isEmpty()) {
            Runnable oldestTask = queue.poll();
            System.out.println("丢弃最老的任务: " + oldestTask.toString());
        }

        // 打印日志并丢弃最新的任务:不对线程对象执行操作,就相当于丢弃了它
        System.out.println("丢弃最新的任务: " + r.toString());
    }
}

2.4 核心字段和内部类

2.4.1 基本介绍

ThreadPoolExecutor有以下一些核心字段和内部类:

  /*
     * 状态和线程数:用来标记线程池状态(高3位),线程个数(低29位,所以线程个数最多是2^29-1)
     * 默认是RUNNING状态,线程个数为0
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    /*
     * 线程个数掩码位数:线程池中最大可以容纳的线程数量。
     *  int:4字节(32位),数据范围是 `-2^31 ~ 2^31-1`。
     * 所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
     * 32-3=29
     * 为什么COUNT_BITS是29而不是32?因为int类型是32位,我们需要将高3位保留用于表示线程池的状态。剩下的29位则用来表示线程个数。
     */
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /*
     * 线程最大个数(低29位)00011111111111111111111111111111
     * 默认容量:2^29-1
     * 位运算1左移32位,相当于2的32次方
     * (左移<<:是将二进制数的位左移若干位,低位补0,相当于乘以2的n次方。)
     */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    /*
     * 线程池有5种状态,按大小排序如下:
     * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
     * RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。
     * SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
     * STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
     * TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。
     * TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。
     */
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    /**
     * 可重入锁,用于加锁往workers里加线程
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * 一个HashSet,存有所有工作线程
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * 是否允许核心线程超时:默认是false,即核心线程永不超时
     * 如果是true,核心线程将根据参数的超时时间存活
     */
    private volatile boolean allowCoreThreadTimeOut;

2.4.2 ctl:线程池当前状态和数量

  • ctl:标记线程池状态(高3位)和数量(低29位,所以线程个数最多是2^29-1)。
    /*
     * 状态和线程数:用来标记线程池状态(高3位),线程个数(低29位,所以线程个数最多是2^29-1)
     * 默认是RUNNING状态,线程个数为0
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2.4.3 COUNT_BITS和CAPACITY:线程数上限

  • COUNT_BITS:线程数上限的掩码,值是29
  • CAPACITY:线程数上限,值是2^29-1

为什么COUNT_BITS是29而不是32?

因为int类型是32位,并且我们需要将高3位保留用于表示线程池的状态。剩下的29位则用来表示线程个数。 

源码: 

    private static final int COUNT_BITS = Integer.SIZE - 3;

    /*
     * 线程最大个数(低29位)00011111111111111111111111111111
     * 默认容量:2^29-1
     * 位运算1左移32位,相当于2的32次方
     * (左移<<:是将二进制数的位左移若干位,低位补0,相当于乘以2的n次方。)
     */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

2.4.4 五个状态字段:线程池的生命周期 

通常线程池的生命周期包含5个状态,对应状态值分别是:-1、0、1、2、3,这些状态只能由小到大迁移,不可逆。

  1. RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。
  2. SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
  3. STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
  4. TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。
  5. TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。 

    /*
     * 线程池有5种状态,按大小排序如下:
     * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
     * RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。
     * SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
     * STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
     * TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。
     * TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。
     */
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;

2.4.5 扩展:线程的生命周期  ​

Java线程在运行的生命周期中,在任意给定的时刻,只能处于下列6种状态之一:

  • NEW :初始状态,线程被创建,但是还没有调用start方法。
  • RUNNABLE:可运行状态,等待调度或运行。线程正在JVM中执行,但是有可能在等待操作系统的调度。
  • BLOCKED :阻塞状态,线程正在等待获取监视器锁。
  • WAITING :等待状态,线程正在等待其他线程的通知或中断。线程等待状态不占用 CPU 资源,被唤醒后进入可运行状态(等待调度或运行)。
  • TIMED_WAITING:超时等待状态,在WAITING的基础上增加了超时时间,即超出时间自动返回。Thread.sleep(1000);让线程超时等待1s。
  • TERMINATED:终止状态,线程已经执行完毕。

线程的运行过程:

线程在创建之后默认为NEW(初始状态),在调用start方法之后进入RUNNABLE(可运行状态)。

注意:

可运行状态不代表线程正在运行,它有可能正在等待操作系统的调度。

WAITING (等待状态)的线程需要其他线程的通知才能返回到可运行状态,而TIMED_WAITING(超时等待状态)相当于在等待状态的基础上增加了超时限制,除了他线程的唤醒,在超时时间到达时也会返回运行状态。

此外,线程在执行同步方法时,在没有获取到锁的情况下,会进入到BLOCKED(阻塞状态)。线程在执行完run方法之后,会进入到TERMINATED(终止状态)。

等待状态如何被唤醒?

Object类:

  • wait()方法让线程进入等待状态
  • notify()唤醒该对象上的随机一个线程
  • notifyAll()唤醒该对象上的所有线程。

这3个方法必须处于synchronized代码块或方法中,否则会抛出IllegalMonitorStateException异常。因为调用这三个方法之前必须拿要到当前锁对象的监视器(Monitor对象),synchronized基于对象头和Monitor对象。

也可以通过Condition类的 await/signal/signalAll方法实现线程的等待和唤醒,从而实现线程的通信,令线程之间协作处理任务。这两个方法依赖于Lock对象。

2.4.6 workers和mainLock:维护所有工作线程

这两个字段主要在addWorker()方法内用到,每次调用线程.start()方法启动线程前,都往wokers集合里放进一个线程,并给ctl加1

    /**
     * 可重入锁,用于加锁往workers里加线程
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * mainLock锁的Condition对象,用于线程通信
     * Condition对象:用于线程间通信,通过await()和signal(),signalAll()让线程等待或唤醒。通常用lock锁创建Condition对象,即lock.newCondition();
     */
    private final Condition termination = mainLock.newCondition();
    /**
     * 一个HashSet,存有所有工作线程
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

2.4.7 Worker类:控制中断线程、执行runWorker()方法

Worker类是线程池的内部类,它继承了AQS, 实现了Runnable接口,它

   /**
     * @Author: vince
     * @CreateTime: 2024/09/09
     * @Description: Worker继承了AQS,目的就是为了控制工作线程的中断。
     * Worker实现了Runnable,它的run()方法调用runWorker()启动线程
     * @Version: 1.0
     */
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

        private static final long serialVersionUID = 6138294804551838833L;
        // 当前线程工厂创建的线程(也是执行任务使用的线程)
        final Thread thread;

        // 当前的第一个任务
        Runnable firstTask;

        // 记录执行了多少个任务
        volatile long completedTasks;

        // 构造方法
        Worker(Runnable firstTask) {
            // 将State设置为-1,代表当前不允许中断线程
            setState(-1);
            // 设置任务
            this.firstTask = firstTask;
            // 设置线程
            this.thread = getThreadFactory().newThread(this);
        }

        /**
         * 线程启动执行的方法,调用了runWorker()方法
         */
        public void run() {
            runWorker(this);
        }

        /**
         * 中断工作线程
         */
        void interruptIfStarted() {
            Thread t;
            // 只有Worker中的state >= 0的时候,可以中断工作线程
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 如果状态正常,并且线程未中断,这边就中断线程
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

2.4.8 allowCoreThreadTimeOut:是否允许核心线程超时

allowCoreThreadTimeOut用于指定核心线程是否能超时,它在通过getTask()获取线程时调用,通过它判断是否允许核心线程超时。

    /**
     * 是否允许核心线程超时:默认是false,即核心线程永不超时
     * 如果是true,核心线程将根据参数的超时时间存活
     */
    private volatile boolean allowCoreThreadTimeOut;

volatile特性:

  • 有序性:被volatile声明的变量之前的代码一定会比它先执行,而之后的代码一定会比它慢执行。底层是在生成字节码文件时,在指令序列中插入内存屏障防止指令重排序。
  • 可见性:一旦修改变量则立即刷新到共享内存中,当其他线程要读取这个变量的时候,最终会去内存中读取,而不是从自己的工作空间中读取。每个线程自己的工作空间用于存放堆栈(存方法的参数和返回地址)和局部变量。
  • 原子性:volatile变量不能保证完全的原子性,只能保证单次的读/写操作具有原子性(在同一时刻只能被一个线程访问和修改),自增减、复合操作(+=,/=等)则不具有原子性。这也是和synchronized的区别。

读写内存语义:

  • 写内存语义:当写一个volatile变量时,JMM(Java内存模型)会把该线程本地内存中的共享变量的值刷新到主内存中。
  • 读内存语义:当读一个volatile变量时,JMM会把该线程本地内存置为无效,使其从主内存中读取共享变量。

有序性实现机制:

volatile有序性是通过内存屏障来实现的。内存屏障就是在编译器生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。

机器指令:JVM包括类加载子系统、运行时数据区、执行引擎。 执行引擎负责将字节码指令转为操作系统能识别的本地机器指令。

指令重排序:处理器为了提高运算速度会对指令重排序,重排序分三种类型:编译器优化重排序、处理器指令级并行重排序、内存系统重排序。 

  • 编译器优化的重排序:编译器在不改变单线程程序的语义前提下,可以重新安排语句的执行顺序。
  • 指令级并行的重排序:现在处理器采用了指令集并行技术,将多条指令重叠执行。如果不存在依赖性,处理器可以改变语句对应的机器指令的执行顺序。
  • 内存系统的重排序:由于处理器使用缓存和读写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

2.5 execute():启动线程

2.5.1 工作原理

任务加入线程池时,判断的顺序:

  • 核心线程数
  • 阻塞队列
  • 最大线程数
  • 拒绝策略

线程池工作原理: 

  1. 判断核心线程数:新加入任务,判断corePoolSize是否到最大值;如果没到最大值就创建核心线程执行新任务,如果到最大值就判断是否有空闲的核心线程;
  2. 判断加入队列:如果有空闲的核心线程,则空闲核心线程执行新任务,如果没空闲的核心线程,则尝试加入FIFO阻塞队列;
  3. 判断队列容量:若加入成功,则等待空闲核心线程将队头任务取出并执行,若加入失败(例如队列满了),则判断是否到最大值;
  4. 判断丢弃策略:如果没到最大值就创建非核心线程执行新任务,如果到了最大值就执行丢弃策略,默认丢弃新任务;
  5. 非核心线程自动回收:线程数大于corePoolSize时,空闲线程将在keepAliveTime后回收,直到线程数等于核心线程数。这些核心线程也不会被回收。

注意:

实际上线程本身没有核心和非核心的概念,都是靠比较corePoolSize和当前线程数判断一个线程是不是能看作核心线程。

可能某个线程之前被看作是核心线程,等它空闲了,线程池又有corePoolSize个线程在执行任务,这个线程到keepAliveTime后还是会被回收。

验证:

 1.阻塞队列大时只有核心线程数的线程工作

 创建一个线程池,再创建10个线程打印文字:

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, // 核心线程数
                200, // 最大线程数量
                10, // 空闲线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingDeque<>(100000), // 任务队列,大小100000
                Executors.defaultThreadFactory(), // 线程工厂
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );

        // 提交任务到线程池
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            });
        }

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

可以看见只有5个核心线程执行任务,原因阻塞队列有100000的大小,所以用不到新增非核心线程:

2.阻塞队列小时,会创建非核心线程工作:

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, // 核心线程数
                200, // 最大线程数量
                10, // 空闲线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingDeque<>(3), // 任务队列,大小100000
                Executors.defaultThreadFactory(), // 线程工厂
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );

        // 提交任务到线程池
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            });
        }

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

可以看到,有7个线程在工作。流程如下:

  • 5个任务创建核心线程工作;
  • 3个任务因为核心线程数满了,所以进入阻塞队列,等待有空闲线程时执行;
  • 2个任务因为阻塞队列满了,所以创建非核心线程工作;

2.5.2 源码解析

核心流程:

  1. 启动核心线程数:如果corePoolSize没到最大值就创建核心线程直接执行新任务
  2. 任务放至阻塞队列:判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
  3. 启动非核心线程数:尝试添加非核心工作线程,若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略

具体代码:

    /**
     * 执行任务
     * @param command Runnable对象
     */
    public void execute(Runnable command) {
        // 0.判空:如果当前传过来的任务是null,直接抛出异常即可
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        // 1.启动核心线程数:如果corePoolSize没到最大值就创建核心线程直接执行新任务
        if (workerCountOf(c) < corePoolSize) {
            // Step2:添加一个核心线程
            if (addWorker(command, true)){
                return;
            }
            // 更新一下当前值
            c = ctl.get();
        }
        // 如果走到下面会有两种情况:
        // a、核心线程数满了,需要往阻塞队列里面扔任务
        // b、核心线程数满了,阻塞队列也满了,执行拒绝策略

        // 2.任务放至阻塞队列:判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
        // 如果是Running的状态,则尝试将任务放至阻塞队列。队列的offer(E e):用于非阻塞的插入操作,插入失败会返回false而不是等待
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次更新数值
            int recheck = ctl.get();
            // 再次校验当前的线程池状态是不是Running
            // 如果线程池状态不是Running的话,需要删除掉刚刚放的任务
            if (!isRunning(recheck) && remove(command)){
                // 执行拒绝策略
                reject(command);
            }
            // 如果到这里,说明上面阻塞队列中已经有数据了
            // 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务
            // 不能让人家堵塞着
            else if (workerCountOf(recheck) == 0){
                addWorker(null, false);
            }
        }
        // 此时会有两种情况:
        // a、任务成功放进阻塞队列,正在等待被执行;
        // b、放进阻塞队列失败,等待添加非核心工作线程

        // 3.启动非核心线程数
        //    尝试添加非核心工作线程,若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
        // 添加一个非核心工作线程
        else if (!addWorker(command, false))
            // 工作队列中添加任务失败,执行拒绝策略
            reject(command);
    }

2.6 addWorker():添加工作线程

调用关系:

可以看到,上面execute()方法,启动核心线程、非核心线程时都要执行addWorker()方法,这个方法是往线程池添加工作线程。

核心流程:

  1. ctl自旋加1:在一个retry循环里自旋给ctl字段加1。ctl存放线程池当前状态和数量
    1. 校验线程池状态和队列:如果不是RUNNING,直接退出循环
    2. 校验线程数:如果超过上限直接失败
    3. 尝试给ctl加1:如果成功则结束循环,如果失败则重新循环
  2. workers存线程:加锁把线程并放进workers,workers是一个HashSet,用于保存所有工作线程
  3. 启动线程:如果成功加进workers,调用线程的.start()方法,启动线程。
    1. 注意这里调用的是Worker对象的start()方法,而不是原线程的start()方法。Worker对象封装了原线程。Worker类是Runnable的子类,它的run()方法只有一行,调用了runWorker()方法

完整源码: 

    /**
     * 添加工作线程
     * @param firstTask Runnable对象
     * @param core 是否是核心线程
     * @return boolean
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry主要是为了结束整个循环
        // retry 是一个标签(label),用来标记循环的某个位置。
        // 这个标签与 for (;;) 循环结合使用,调用continue retry时重新开始整个循环,调用break retry时结束整个循环
        retry:
        for (;;) {
            // 获取当前线程池的ctl:包含线程池状态和数量信息
            int c = ctl.get();

            // 获取线程池状态:根据ctl,获取线程池状态
            // runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值
            int rs = runStateOf(c);

            // 1.校验线程池状态和队列:如果不是运行时,直接退出循环
            // rs >= SHUTDOWN:表示此时不再接收新任务。代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常
            // RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
            // rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
            // 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
                return false;
            }

            for (;;) {
                // 2.校验工作线程数量:如果超过线程数上限直接失败
                // 获取线程数
                int wc = workerCountOf(c);
                 // CAPACITY:是ctl的低29位的最大值(二进制是29个1),返回false;
                 // core:addWorker()方法的第二个参数,如果为true表示根据corePoolSize来比较,
                 // 如果为false则根据maximumPoolSize来比较。
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 3.尝试给ctl加1:如果成功,则跳出第一个for循环;如果失败,则重新循环
                // 尝试CAS方式自增workCount  其实就是类似锁,同时只有一个线程能成功
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 3.1 如果增加workerCount失败,则重新获取ctl的值
                c = ctl.get();
                // 3.2 如果当前的运行状态不等于第一个循环刚开始时的状态,说明状态已被改变,返回第一个for循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 4.线程放进workers:加锁把线程放进workers,workers是一个HashSet,用于保存所有工作线程
        // 代码执行到这里  只是通过不停的循环通过CAS获取到执行任务的“锁”

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根据firstTask来创建Worker对象
            w = new Worker(firstTask);
            // 每一个Worker对象都会创建一个线程
            final Thread t = w.thread;
            if (t != null) {
                // 获取锁。mainLock是ThreadPoolExecutor的可重入锁字段,用于保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    /*
                     * rs < SHUTDOWN表示是RUNNING状态;
                     * 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                     * 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                     */
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 校验线程是否存活,不存活直接抛出异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // workers是一个HashSet,用于保存所有工作线程
                        workers.add(w);
                        int s = workers.size();
                        // largestPoolSize记录着线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 解锁
                    mainLock.unlock();
                }
                // 5.启动线程:如果成功加进workers,调用线程的.start()方法,启动线程
                if (workerAdded) {
                    // 启动线程  这个t.start()实际是调用的Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

2.7 runWorker ():执行线程

调用关系:

上面addWorker()开启线程时,调用的Worker对象的start()方法。Worker对象封装了原线程。Worker类是Runnable的子类,它的run()方法只有一行,调用了runWorker()方法。 

addWorker()调用Worker类的start()方法:

Worker类的run()方法

核心流程:

  1. 获取任务:调用getTask()方法,获取Worker中的Runnable任务,拿到后Worker置空、解锁
  2. 执行任务:如果任务不等于空,则加锁执行任务
    1. 加锁
    2. beforeExecute():这个方法为空,需要我们继承ThreadPoolExecutor重写这个方法
    3. 执行任务:调用Runnable的run()方法执行任务
    4. afterExecute():这个方法为空,需要我们继承ThreadPoolExecutor重写这个方法
    5. 解锁

代码解析:

    /**
     * 执行任务:由Worker类的run()调用的唯一一行方法
     * @param w Worker对象
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 1.获取Worker中的runnable任务,拿到后Worker置空、解锁
        // 拿到当前Worker的第一个任务(如果携带的话)
        Runnable task = w.firstTask;
        // 置空
        w.firstTask = null;
        // 解锁
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 2.如果任务不等于空,则加锁、执行任务、解锁
            while (task != null || (task = getTask()) != null) {
                // 加锁
                w.lock();
                // 如果线程池状态 >= STOP,确保线程中断了
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 在任务执行前做一些操作:这个方法内为空,需要我们继承ThreadPoolExecutor重写这个方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 任务执行:调用线程的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行后一些操作:这个方法内为空,需要我们继承ThreadPoolExecutor重写这个方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 任务置空
                    task = null;
                    // 执行任务+1
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 删除线程的方法
            processWorkerExit(w, completedAbruptly);
        }
    }

2.8 getTask():从队列获取任务

调用关系:

上面runWorker()方法,调用了getTask()方法获取线程,返回值是Runnable对象

核心流程:

  1. 进入死循环
  2. 校验当前线程池状态:如果状态是停止,或关闭+阻塞队列是空,则返回空
  3. 校验超时:如果是非核心线程,或者允许超时的核心线程,则尝试销毁线程;
  4. 从阻塞队列取出线程:当队列没线程时,如果线程允许超时则返回空,如果不允许则阻塞获取

源码解析:

    /**
     * 获取任务
     * @return {@link Runnable }
     */
    private Runnable getTask() {
        // 声明超时变量:表示上次从阻塞队列中取任务时是否超时,先标记为未超时
        boolean timedOut = false;

        // 1.死循环拿线程对象
        for (;;) {
            // 1.1 校验当前线程池状态
            // 拿到当前的ctl
            int c = ctl.get();
            // 获取当前的线程池状态
            int rs = runStateOf(c);

            // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
            // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 线程池中的线程个数减一
                decrementWorkerCount();
                return null;
            }

            // 当前线程池中线程个数
            int wc = workerCountOf(c);

            // 1.2 判断线程是否能超时:如果核心线程允许超时,或者当前线程池中的线程个数大于核心线程数,那么这个线程就有寿命
            // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑
            // wc > corePoolSize:当前线程池中的线程个数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 1.3 如果线程超时,则销毁线程
            // wc > maximumPoolSize:基本不存在
            // timed && timedOut:第一次肯定是失败的(超时标记为false)
            if ((wc > maximumPoolSize || (timed && timedOut))
                    // 1、线程个数为1
                    // 2、阻塞队列是空的
                    && (wc > 1 || workQueue.isEmpty())) {
                // 线程池的线程个数减一
                if (compareAndDecrementWorkerCount(c)){
                    return null;
                }
                continue;
            }
            // 运行到这里,说明线程没超时
            try {
                // 1.4 从阻塞队列取出线程:当队列没线程时,如果线程允许超时则返回空,如果不允许则阻塞获取
                // poll():从队列中获取并移除一个任务。如果队列为空,它会返回 null
                // take():从队列中获取并移除一个任务。如果队列为空,它会阻塞当前线程直到有任务可用为止。
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null){
                    return r;
                }
                // 到这里,说明没从阻塞队列拿出线程,线程超时了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

2.9 关闭

  1. SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
  2. STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。

2.9.1 shutdown():关闭线程

2.9.1.1 源码解析

shutdown()方法用于关闭线程,线程池关闭后,将不再接受新的任务,旧任务将执行完成

核心流程:

  1. 加锁
  2. 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
  3. 修改线程池状态:将线程池状态修改为SHUTDOWN
  4. 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
  5. 执行关闭时方法:钩子函数,实际内容为空,用户继承重写可以编写关闭线程池后要执行的逻辑
  6. 尝试终止:查看当前线程池是否可以变为TERMINATED状态

代码解析:

    /**
     * 关闭线程池
     */
    public void shutdown() {
        // 1.加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 2.校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
            checkShutdownAccess();
            // 3.修改线程池状态:将线程池状态修改为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 4.中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
            interruptIdleWorkers();
            // 5.钩子函数,实际内容为空,用户继承重写可以编写关闭线程池后要执行的逻辑
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 6.查看当前线程池是否可以变为TERMINATED状态
        // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED
        tryTerminate();
    }
2.9.1.2 校验关闭权限:checkShutdownAccess()
    /**
     * 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
     */
    private void checkShutdownAccess() {
        // 1. 获取系统的安全管理器
        SecurityManager security = System.getSecurityManager();

        // 2. 如果有安全管理器,需要进行权限检查
        if (security != null) {
            // 3. 首先检查调用者是否有权限关闭线程池(shutdownPerm 是一种特殊的权限)
            security.checkPermission(shutdownPerm);

            // 4. 获取线程池的锁,确保操作的线程安全
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();

            try {
                // 5. 遍历线程池中的每一个工作线程,检查调用者是否有权限中断每个线程
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                // 6. 最后解锁,确保锁的释放以防止死锁
                mainLock.unlock();
            }
        }
    }
2.9.1.3 修改线程池状态:advanceRunState()

    /**
     * 修改线程池状态
     * @param targetState 目标状态
     */
    private void advanceRunState(int targetState) {
        // 1.进入直接死循环
        for (;;) {
            // 2.拿到当前的ctl
            int c = ctl.get();
            // 3.修改状态:如果当前状态已经大于等于目标状态,则跳出循环,否则CAS修改线程池状态
            if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
2.9.1.4 中断空闲线程:interruptIdleWorkers()
    /**
     * 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
     * 不传参数时,默认参数是false,即中断所有线程,而不是只中断完一个线程就完事
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    /**
     * 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
     * @param onlyOne 是否只中断一个:如果参数 onlyOne 为 true,中断一个线程后就直接跳出循环。如果为 false,则继续尝试中断所有空闲线程。
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        // 1.加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 2.遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
            for (java.util.concurrent.ThreadPoolExecutor.Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果只中断一个,则中断后直接退出循环
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
2.9.1.5 尝试终止线程池:tryTerminate()
    /**
     * 尝试设置线程池为TERMINATED状态
     */
    final void tryTerminate() {
        // 1.开始死循环;
        for (;;) {
            // 拿到ctl
            int c = ctl.get();
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 2.尝试CAS将线程池状态设为TIDYING,如果失败则解锁再次循环,如果成功则继续执行:
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 2.1 执行销毁前方法:根据用户自定义的逻辑,在线程池销毁之前做最后的处理
                        // 该方法是一个钩子函数,是个空方法,允许我们通过继承重写的方式,自己定义该方法逻辑
                        terminated();
                    } finally {
                        // 2.2 线程池状态设为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 2.3 唤醒所有等待线程池终止的线程
                        // termination是mainLock的 Condition 对象 ,这是与 ReentrantLock 相关联的条件变量,用于线程间的通信。
                        // Condition对象:用于线程间通信,通过await()和signal(),signalAll()让线程等待或唤醒。通常用lock锁创建Condition对象,即lock.newCondition();
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
        }
    }

2.9.2 shutdownNow():停止线程

2.9.2.1 源码解析

shutdownNow()方法用于关闭线程,线程池关闭后,将不再接受新的任务,旧任务将直接中断

核心流程:

  1. 加锁
  2. 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
  3. 修改线程池状态:将线程池状态修改为STOP
  4. 中断所有线程:将线程池中的线程全部中断
  5. 删除所有线程
    1. 删除当前所有的工作线程
    2. 删除阻塞队列中所有的工作线程
    3. 并将这些删除线程收集到tasks
  6. 解锁
  7. 尝试设置线程池为TERMINATED状态:
    1. CAS自旋修改状态
    2. 执行销毁前自定义方法
    3. 唤醒所有等待线程池终止的线程

代码解析: 

    /**
     * 终止线程池
     * @return {@link List }<{@link Runnable }> 被删除的所有线程
     */
    public List<Runnable> shutdownNow() {
        // 声明tasks,用于收集所有被删除的线程
        List<Runnable> tasks;
        // 1.加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 2.校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
            checkShutdownAccess();
            // 3.修改线程池状态:将线程池状态修改为STOP
            advanceRunState(STOP);
            // 4.中断所有线程:将线程池中的线程全部中断
            interruptWorkers();
            // 5.删除所有线程:删除阻塞队列中所有的工作线程,并将这些删除线程收集到tasks
            // drain以为排空
            tasks = drainQueue();
        } finally {
            // 6.解锁
            mainLock.unlock();
        }
        // 7.尝试设置线程池为TERMINATED状态
        // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED
        tryTerminate();
        return tasks;
    }


2.9.2.2 清空阻塞队列:drainQueue()

    /**
     * 清空阻塞队列:删除阻塞队列中所有线程,并返回这些线程
     * @return {@link List }<{@link Runnable }>
     */
    private List<Runnable> drainQueue() {
        // 1.获取阻塞队列
        BlockingQueue<Runnable> q = workQueue;
        // 返回的结果
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        // 2.清空阻塞队列并将数据放入taskList中。
        // drainTo 方法是 BlockingQueue 的批量删除方法,它可以将队列中的所有元素批量转移到另一个集合中。
        q.drainTo(taskList);
        // 3.再次遍历清空:因为没加锁,所以需要再次遍历,防止依赖
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        // 最终返回即可
        return taskList;
    }

三、Executors源码解析

3.1 Executors工具类

执行器工具类Executors主要用于创建线程池,它主要有4个创建线程池的方法:

  • newFixedThreadPool:固定大小的线程池

  • newCachedThreadPool:缓存线程池(无限大)

  • newScheduledThreadPool:定时任务线程池

  • newSingleThreadExecutor:单线程化的线程池

一般不使用这种方式创建线程池,因为虽然它简单方便,但是它的参数配置写死了,不可控。

参数配置包括:

核心线程数、最大线程数、线程存活时间、阻塞队列、拒绝策略。详细看上文的“ThreadPoolExecutor”

这四个线程池底层都是return new ThreadPoolExecutor(...),返回自定义线程池,并根据各个线程池的特性,传入线程数等参数。

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 源码
// 固定大小线程池: 
new ThreadExcutor(n, n, 0L, ms, new LinkedBlockingQueue<Runable>()
// 缓存线程池: 
new ThreadExcutor(0, max_valuem, 60L, s, new SynchronousQueue<Runnable>());
// 定时任务线程池: 
new ScheduledThreadPoolExecutor(corePoolSize);
// super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
// 单线程线程池: 
new ThreadExcutor(1, 1, 0L, ms, new LinkedBlockingQueue<Runable>())

以固定大小的线程池为例:

源码如下

可以看到:

  • 核心线程数和最大线程数都等于传入的参数

  • 空闲线程存活时间是0L。这里其实设置多少都一个效果,因为这个线程池最大线程数=核心线程数,也就是说不会有非核心线程,而核心线程是不会被回收的。

  • 阻塞队列是链表。无界,可以缓存无限多的任务,所以也用不到拒绝策略。

public class Executors {

    /**
     * 创建固定大小的线程池
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
...
}

3.2 newFixedThreadPool:固定大小的线程池

特点:

  • 核心线程数:所有线程都是核心线程(通过构造参数指定),最大线程数=核心线程数。
  • 存活时间0s:因为所有线程都是核心线程,所以用不到存活时间,线程都会一直存活。keepAliveTime为0S。
  • 链表阻塞队列:超出的线程会在LinkedBlockingQueue队列中等待。

源码:

public class Executors {

    /**
     * 创建固定大小的线程池
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    // ...
}

可以看到,底层是创建自定义线程池对象,并且传入参数进行了设置:

  • 核心线程数和最大线程数:都等于传入的参数
  • 空闲线程存活时间:是0L。这里其实设置多少都一个效果,因为这个线程池最大线程数=核心线程数,也就是说不会有非核心线程,而核心线程是不会被回收的。
  • 阻塞队列:是链表。无界,可以缓存无限多的任务,所以也用不到拒绝策略。

3.3 newCachedThreadPool:缓存线程池(无限大)

特点: 

  • 所有线程都是非核心线程:核心线程数是0,最大线程数无限大
  • 空闲线程一分钟后杀死:keepAliveTime为60S,空闲线程超过60s会被杀死。
  • 阻塞队列没有空间:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。

源码:

​
public class Executors {

    /**
     * 缓存线程池
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    // ...
}

​

可以看到,底层是创建自定义线程池对象,并且传入参数进行了设置:

  • 核心线程数是0,最大线程数无限大:最大线程数Integer.MAX_VALUE,即2^31-1。线程数量可以无限扩大,所有线程都是非核心线程。
  • 空闲线程存活时间60s:keepAliveTime为60S,空闲线程超过60s会被杀死。
  • 同步队列:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3.4 newScheduledThreadPool:定时任务线程池

创建一个定长线程池, 支持定时及周期性任务执行。可指定核心线程数。

特点:

  • 核心线程数:由传入参数决定;
  • 最大线程数:无限大
  • 存活时间:0
  • 阻塞队列:延迟队列。无界的延迟+优先级队列,延迟时间短的任务排在队头,优先被执行。

源码: 

public class Executors {

    /**
     * 定时任务线程池:可以定期或设定多久后执行任务
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    // ...
}
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    // ...
}

可以看到:

  • 核心线程数:由传入参数corePoolSize决定;
  • 最大线程数:无限大,2^31-1。这里其实设置无限大和设置corePoolSize是一个效果,因为它的阻塞队列是无界的,内存溢出前永远不会满,所以也不会有创建非核心线程的机会。至于为什么设置成无限大呢,可能是为了保证内存溢出等极端情况下,任务可以不被丢弃,也可能是心理安慰。
  • 存活时间:0。因为阻塞队列无界,所以不会有非核心线程,所以存活时间也没意义,内存溢出时创建了非核心线程,空闲后直接销毁。
  • 阻塞队列:延迟队列。无界的延迟+优先级队列,延迟时间短的任务排在队头,优先被执行。

3.5 newSingleThreadExecutor:单线程化的线程池

  • 核心线程数和最大线程数:核心线程数与最大线程数都只有一个,不回收。创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 
  • 阻塞队列:无界的链表队列。
  • 使用场景:需要稳定串行化执行一个个线程的场景;

思考:单线程化的线程池存在的意义是什么?直接创建一个线程不就好了?

主要是为了利用线程池的优势。

  • 让线程复用:如果是直接创建一个线程,这个线程创建后会直接销毁,而线程池的核心线程会一直存活,这就减少了创建、销毁的时间损耗。
  • 管理线程数量:它可以管理线程的数量,保证只有一个线程,减少了创建、销毁的时间损耗。

源码:

public class Executors {

    /**
     * 单线程线程池:仅有一个核心线程和无界队列
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    // ...
}

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐