【多线程】线程池
3.keepAliveTime:多余的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止。7.handler:拒绝策略,表示当队列满了并且工作线程-大于等于线程池的数量最大线程数(maxinumPoolSize)时如何来拒绝请求执行的runnable的策略。6.threadF
·
一、线程池的七个参数:
- 1.corePoolSize:线程池中的常驻核心线程数
- 2.maxinumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
- 3.keepAliveTime:多余的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止
- 4.unit:keepAliveTime的单位
- 5.workQueue:任务队列,被提交但是尚未被执行的任务
- 6.threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可
- 7.handler:拒绝策略,表示当队列满了并且工作线程-大于等于线程池的数量最大线程数(maxinumPoolSize)时如何来拒绝请求执行的runnable的策略
二、ThreadPoolExecutor
2.1 线程池状态
- 1.ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
- 2.RUNNING:高3位为111,可以接收新任务,可以处理阻塞队列任务
- 3.SHUTDOWN:高3位为000,不能接收新任务,可以处理阻塞队列任务;不会接收新任务,会处理阻塞队列剩余任务
- 4.STOP:高3位为001,不能接收新任务,不可以处理阻塞队列任务;会中断正在执行的任务,并抛弃阻塞队列任务
- 5.TIDYING:高3位为010,任务全执行完毕,活动线程为0,即将进入终结
- 6.TERMINATED:终结状态
- 7.从数字上比较,TERMININATED>TIDYING>STOP>SHUTDOWN>RUNNING
- 8.这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,可以用一次cas原子操作进行赋值
// c为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c))));
// rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc){return rs|wc;}
2.2 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- 1.corePoolSize:核心线程数目,最多保留的线程数
- 2.maximumPoolSize:最大线程数目
- 3.keepAliveTime:生存时间-针对救急线程
- 4.unit时间单位-针对救急线程
- 5.workQueue:阻塞队列
- 6.threadFactory:线程工厂-可以为线程创建时起个好名字
- 7.handler:拒绝策略
三、原理
3.1 过程
- 1.线程池中的线程分为核心线程和救急线程
- 2.核心线程和救急线程都是懒加载的,有任务时才进行创建
- 3.线程池初始时核心线程数为0,来一个任务则创建一个核心线程,直到达到最大核心线程数
- 4.当核心线程数都被占用,再来任务则将任务放入阻塞队列
- 5.当核心线程数都被占用,且阻塞队列满了,再来任务时,则会创建救急线程来处理该任务
- 6.救急线程执行任务结束且空闲后,存活keepAliveTime+unit时间后会自动销毁
- 7.核心线程执行任务结束且空闲后,也不会消失
- 8.核心线程数都被占用,阻塞队列都满了,且救急线程也被占用,再来任务则会执行拒绝策略
3.2 说明
- 1.线程池中一开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 2.当线程数达到corePoolSize并没有线程空闲,此时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
- 3.如果队列选择了有界队列,任务超过了队列大小时,会创建(maxi mumPoolSize-corePoolSize)数目的救急线程来救急
- 4.如果线程到达maximumPoolSize仍然有新任务,此时会执行拒绝策略
- 5.jdk提供了4种拒绝策略的实现
- 6.当任务高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime和unit来控制
四、 拒绝策略
4.1 jdk提供的
- 接口:RejectedExecutionHandler
- 1.AbortPolicy:让调用者抛出RejectedExecutionException异常,默认策略
- 2.CallerRunsPolicy:让调用者完成任务
- 3.DiscardPolicy:放弃本次任务
- 4.DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
4.2 其它开源框架提供的
- 1.Dubbo:在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
- 2.Netty:是创建一个新的线程来执行任务
- 3.ActiveMQ:带超时等待(60s)尝试放入队列
- 4.PinPoint:使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
四、 jdk提供的线程池
4.1 newFixedThreadPool
- 1.固定大小的线程池
- 2.核心线程数等于最大线程数,不会创建救急线程,无需超时时间以及时间单位
- 3.阻塞队列是无界的,相对耗时的任务
- 4.适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
4.2 newCachedThreadPool
- 1.带缓存的线程池
- 2.核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲时间是60s,
- 3.全部都是救急线程,救急线程空闲后,60s后可以回收
- 4.救急线程可以无线创建
- 5.队列采用了SynchronousQueue实现,没有容量,没有线程消费的话是存放不了的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4.3 newSingleThreadExecutor
- 1.单线程线程池
- 2.希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队
- 3.任务执行完毕,这唯一的线程也不会被释放
- 4.自己创建一个单线程串行执行任务,如果任务执行失败而终止没有任何补救措施,单线程线程池可以新建一个线程,保证线程池的正常工作
- 5.Executors.newSingleThreadExecutor()线程个数始终为1,不能修改
- 6.FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,不能调用ThreadPoolExecutor中特有的方法
- 7.Executors.newFixedThreadPool(1)初始为1,可以修改,对外暴露的是ThreaddPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
五、 提交任务
5.1 execute
- 1.执行任务
void execute(Runnable command)
5.2 submit
- 2.提交任务task,用返回值Future获得任务执行结果
<T> Future<T> submit(Callable<T> task)
示例
package com.learning.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* @Description 提交任务
*/
@Slf4j
public class SubmitLearning {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future = pool.submit(() -> {
log.debug("running");
Thread.sleep(1000);
return "ok";
});
log.debug("{}", future.get());
}
}
5.3 invokeAll
- 3.提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException
- 4.提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException
示例
package com.learning.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @Description 提交任务
*/
@Slf4j
public class invokeAllLearning {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
futures.forEach(future -> {
try {
log.debug("{}", future.get());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
5.3 invokeAny
- 5.提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
- 6.提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
示例
package com.learning.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description invokeAny
**/
@Slf4j
public class InvokeAnyLearning {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
String result = pool.invokeAny(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
log.debug("{}", result);
}
}
六、关闭线程池
6.1 shutdown
- 1.线程池状态变为shutdown
- 2.不会接收新任务
- 3.已提交任务会执行完
- 4.此方法不会阻塞调用线程的执行
6.2 shutdownNow
- 1.线程池状态变为stop
- 2.不会接收新任务
- 3.会将队列中的任务返回
- 4.并用interrupt的方式中断正在执行的任务
七、其它方法
7.1 isShutdown
- 1.不在running状态的线程池,此方法返回true
7.2 isTerminated
- 2.线程池状态是否是terminated
7.2 awatiTermination
- 3.调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池Terminated后做些事情,可以利用此方法等待
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献4条内容
所有评论(0)