一、线程池的七个参数:
  • 1.corePoolSize:线程池中的常驻核心线程数
  • 2.maxinumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
  • 3.keepAliveTime:多余的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止
  • 4.unit:keepAliveTime的单位
  • 5.workQueue:任务队列,被提交但是尚未被执行的任务
  • 6.threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可
  • 7.handler:拒绝策略,表示当队列满了并且工作线程-大于等于线程池的数量最大线程数(maxinumPoolSize)时如何来拒绝请求执行的runnable的策略
二、ThreadPoolExecutor
2.1 线程池状态
  • 1.ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
  • 2.RUNNING:高3位为111,可以接收新任务,可以处理阻塞队列任务
  • 3.SHUTDOWN:高3位为000,不能接收新任务,可以处理阻塞队列任务;不会接收新任务,会处理阻塞队列剩余任务
  • 4.STOP:高3位为001,不能接收新任务,不可以处理阻塞队列任务;会中断正在执行的任务,并抛弃阻塞队列任务
  • 5.TIDYING:高3位为010,任务全执行完毕,活动线程为0,即将进入终结
  • 6.TERMINATED:终结状态
  • 7.从数字上比较,TERMININATED>TIDYING>STOP>SHUTDOWN>RUNNING
  • 8.这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,可以用一次cas原子操作进行赋值
// c为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c))));

// rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc){return rs|wc;}
2.2 构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • 1.corePoolSize:核心线程数目,最多保留的线程数
  • 2.maximumPoolSize:最大线程数目
  • 3.keepAliveTime:生存时间-针对救急线程
  • 4.unit时间单位-针对救急线程
  • 5.workQueue:阻塞队列
  • 6.threadFactory:线程工厂-可以为线程创建时起个好名字
  • 7.handler:拒绝策略
三、原理
3.1 过程
  • 1.线程池中的线程分为核心线程和救急线程
  • 2.核心线程和救急线程都是懒加载的,有任务时才进行创建
  • 3.线程池初始时核心线程数为0,来一个任务则创建一个核心线程,直到达到最大核心线程数
  • 4.当核心线程数都被占用,再来任务则将任务放入阻塞队列
  • 5.当核心线程数都被占用,且阻塞队列满了,再来任务时,则会创建救急线程来处理该任务
  • 6.救急线程执行任务结束且空闲后,存活keepAliveTime+unit时间后会自动销毁
  • 7.核心线程执行任务结束且空闲后,也不会消失
  • 8.核心线程数都被占用,阻塞队列都满了,且救急线程也被占用,再来任务则会执行拒绝策略
3.2 说明
  • 1.线程池中一开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
  • 2.当线程数达到corePoolSize并没有线程空闲,此时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
  • 3.如果队列选择了有界队列,任务超过了队列大小时,会创建(maxi mumPoolSize-corePoolSize)数目的救急线程来救急
  • 4.如果线程到达maximumPoolSize仍然有新任务,此时会执行拒绝策略
  • 5.jdk提供了4种拒绝策略的实现
  • 6.当任务高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime和unit来控制
四、 拒绝策略
4.1 jdk提供的
  • 接口:RejectedExecutionHandler
  • 1.AbortPolicy:让调用者抛出RejectedExecutionException异常,默认策略
  • 2.CallerRunsPolicy:让调用者完成任务
  • 3.DiscardPolicy:放弃本次任务
  • 4.DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
4.2 其它开源框架提供的
  • 1.Dubbo:在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
  • 2.Netty:是创建一个新的线程来执行任务
  • 3.ActiveMQ:带超时等待(60s)尝试放入队列
  • 4.PinPoint:使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
四、 jdk提供的线程池
4.1 newFixedThreadPool
  • 1.固定大小的线程池
  • 2.核心线程数等于最大线程数,不会创建救急线程,无需超时时间以及时间单位
  • 3.阻塞队列是无界的,相对耗时的任务
  • 4.适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
4.2 newCachedThreadPool
  • 1.带缓存的线程池
  • 2.核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲时间是60s,
  • 3.全部都是救急线程,救急线程空闲后,60s后可以回收
  • 4.救急线程可以无线创建
  • 5.队列采用了SynchronousQueue实现,没有容量,没有线程消费的话是存放不了的
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
4.3 newSingleThreadExecutor
  • 1.单线程线程池
  • 2.希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队
  • 3.任务执行完毕,这唯一的线程也不会被释放
  • 4.自己创建一个单线程串行执行任务,如果任务执行失败而终止没有任何补救措施,单线程线程池可以新建一个线程,保证线程池的正常工作
  • 5.Executors.newSingleThreadExecutor()线程个数始终为1,不能修改
  • 6.FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,不能调用ThreadPoolExecutor中特有的方法
  • 7.Executors.newFixedThreadPool(1)初始为1,可以修改,对外暴露的是ThreaddPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
五、 提交任务
5.1 execute
  • 1.执行任务
void execute(Runnable command)
5.2 submit
  • 2.提交任务task,用返回值Future获得任务执行结果
<T> Future<T> submit(Callable<T> task)

示例

package com.learning.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @Description 提交任务
 */
@Slf4j
public class SubmitLearning {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        Future<String> future = pool.submit(() -> {
            log.debug("running");
            Thread.sleep(1000);
            return "ok";
        });
        log.debug("{}", future.get());
    }
}

5.3 invokeAll
  • 3.提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException
  • 4.提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException

示例

package com.learning.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @Description 提交任务
 */
@Slf4j
public class invokeAllLearning {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        List<Future<String>> futures = pool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        futures.forEach(future -> {
            try {
                log.debug("{}", future.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

5.3 invokeAny
  • 5.提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) 
			throws InterruptedException, ExecutionException
  • 6.提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException

示例

package com.learning.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Description invokeAny
 **/
@Slf4j
public class InvokeAnyLearning {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        String result = pool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        log.debug("{}", result);
    }
}

六、关闭线程池
6.1 shutdown
  • 1.线程池状态变为shutdown
  • 2.不会接收新任务
  • 3.已提交任务会执行完
  • 4.此方法不会阻塞调用线程的执行
6.2 shutdownNow
  • 1.线程池状态变为stop
  • 2.不会接收新任务
  • 3.会将队列中的任务返回
  • 4.并用interrupt的方式中断正在执行的任务
七、其它方法
7.1 isShutdown
  • 1.不在running状态的线程池,此方法返回true
7.2 isTerminated
  • 2.线程池状态是否是terminated
7.2 awatiTermination
  • 3.调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池Terminated后做些事情,可以利用此方法等待
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐