Java 创建线程池的几种方式
java线程池创建的几种方式和运用
·
一、创建线程池四种方式
- 使用 Executors 类,Executors 类是 Java 中用于创建线程池的工厂类,它提供了多种静态方法来创建不同类型的线程池
- 使用 ThreadPoolExecutor 类,ThreadPoolExecutor 是 Java 中线程池的一个核心类,它提供了更细粒度的控制来创建和管理线程池
- 使用 Future 和 Callable,Future 和 Callable 是并发编程中非常重要的两个接口,它们通常与 ExecutorService 一起使用来执行异步任务。
- 使用 Spring 的 ThreadPooltaskExecutor,ThreadPoolTaskExecutor 是一个基于 java.util.concurrent.ThreadPoolExecutor 的扩展,提供了更丰富的配置选项和与Spring集成的特性
二、线程池重要参数
- corePoolSize (int): 线程池的基本大小,即在没有任务执行时线程池的大小。当新任务提交时,线程池会优先使用已有的空闲线程。
- maximumPoolSize (int): 线程池能够容纳同时执行的最大线程数。这个参数用于控制线程池的最大规模,防止因任务过多而导致资源耗尽。
- keepAliveTime (long): 当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程能等待新任务的最长时间。超过这个时间后,多余的线程将被终止。
- unit (TimeUnit): keepAliveTime 参数的时间单位,常见的时间单位有 TimeUnit.SECONDS、TimeUnit.MINUTES 等。
- workQueue (BlockingQueue): 一个阻塞队列,用于存储等待执行的任务。常用的阻塞队列有 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。
- threadFactory (ThreadFactory): 用于创建新线程的工厂。可以通过实现 ThreadFactory 接口来自定义线程的创建过程。
- handler (RejectedExecutionHandler): 当任务太多而线程池无法处理时,用于定义拒绝任务的策略。常见的拒绝策略有 ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.CallerRunsPolicy 和 ThreadPoolExecutor.DiscardPolicy 等。
package com.demo.threadPool;
import java.util.concurrent.*;
public class MainDemo1 {
public static void main(String[] args) {
int corePoolSize = 5; // 核心线程数
int maximumPoolSize = 10; // 最大线程数
long keepAliveTime = 1; // 非核心线程空闲存活时间
/**
* 存活时间单位
* TimeUnit.DAYS:天
* TimeUnit.HOURS:小时
* TimeUnit.MINUTES:分
* TimeUnit.SECONDS:秒
* TimeUnit.MILLISECONDS:毫秒
* TimeUnit.MICROSECONDS:微妙
* TimeUnit.NANOSECONDS:纳秒
*/
TimeUnit unit = TimeUnit.MINUTES;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); // 工作队列
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
}
}
三、线程池5种状态
- RUNNING:正常运行状态,可接收新任务,可处理阻塞队列中的任务
- SHUTDOWN:不会接收新任务,但会处理阻塞队列剩余任务
- STOP:会中断正在执行的任务,并抛弃阻塞队列任务
- TIDYING:任务全执行完毕,活动线程为 0,即将进入终结
- TERMINATED:终结状态
四、Executors 类创建线程池
- new newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池的规模不存在限制。(数量不固定的线程池)
- new newFixedThreadPool():创建一个固定长度线程池,可控制线程最大并发数,超出的线程会在队列中等待。(固定数量的线程池)
- new newScheduledThreadPool():创建一个固定长度线程池,支持定时及周期性任务执行。(定时线程池)
- new newSingleThreadExecutor():创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。(单线程的线程池)
- 固定线程池创建 ( Executors.newFixedThreadPool(5) ):创建一个固定大小的线程池。线程池中的线程数量是固定的,即使有些线程处于空闲状态,它们也不会被回收。
package com.demo.threadPool;
import java.util.List;
import java.util.concurrent.*;
public class MainThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//初始化固定大小线程池
ExecutorService executor1 = Executors.newFixedThreadPool(5);
//使用 execute(Runnable command) 方法提交一个不需要返回结果的任务,
// 或者使用submit(Callable<T> task) 方法提交一个需要返回结果的任务。
for (int i = 0; i < 10; i++) {
executor1.execute(new TaskR(i));
}
//使用 submit(Callable<T> task) 任务并获取 Future
//使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。
for (int i = 0; i < 10; i++) {
Future<String> future = executor1.submit(new TaskC(i));
System.out.println("线程返回结果 "+future.get());
}
// 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。
// 这将等待正在执行的任务完成,但不接收新任务。
executor1.shutdown();
//使用 shutdownNow() 方法尝试立即停止所有正在执行的任务,并返回等待执行的任务列表
List<Runnable> notExecutedTasks = executor1.shutdownNow();
for(Runnable ls : notExecutedTasks){
System.out.println(ls);
}
//使用 awaitTermination() 方法等待线程池关闭,直到所有任务完成或超时。
boolean res = executor1.awaitTermination(60, TimeUnit.SECONDS);
System.out.println("执行结果:"+res);
}
}
/**
* 实现 Runnable 接口
*/
class TaskR implements Runnable {
private int id;
public TaskR(int id) {
this.id = id;
}
public void run() {
System.out.println("TaskR " + id + " is running...");
}
}
/**
* 实现 Callable 接口
* 有返回值
*/
class TaskC implements Callable {
private int id;
public TaskC(int id) {
this.id = id;
}
@Override
public Object call(){
System.out.println("TaskC " + id + " is running...");
return id+"--TaskC";
}
}
- 单线程池 (newSingleThreadExecutor):创建一个只有一个线程的线程池。即使有多个任务提交,它们也会被排队,逐个由单个线程执行
package com.demo.threadPool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 单线程池 (newSingleThreadExecutor):
* 创建一个只有一个线程的线程池。即使有多个任务提交,它们也会被排队,逐个由单个线程执行。
*/
public class MainOne {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 单线程:创建的执行服务内部有一个线程。所有提交给它的任务将会序列化执行,也就是说,它会在单个线程上依次执行任务,不会有并发执行的情况发生
* 任务队列:如果有多个任务提交给这个执行器,除了当前正在执行的任务外,其他任务将会在一个无界队列中等待,直到线程可用
* 处理任务失败:如果执行中的线程由于任务抛出异常而终止,执行服务会安排一个新的线程来替换它,以继续执行后续的任务
* 使用场景: newSingleThreadExecutor 非常适合需要顺序执行的任务,并且要求任务之间不受并发问题影响的场景
*/
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executor.execute(new TaskR(i));
}
//使用 submit(Callable<T> task) 任务并获取 Future
//使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。
for (int i = 0; i < 10; i++) {
Future<String> future = executor.submit(new TaskC(i));
System.out.println("线程返回结果 "+future.get());
}
// 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。
// 这将等待正在执行的任务完成,但不接收新任务。
executor.shutdown();
}
}
- 缓存线程池 (newCachedThreadPool):创建一个可根据需要创建新线程的线程池。如果线程空闲超过60秒,它们将被终止并从池中移除
package com.demo.threadPool;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 缓存线程池 (newCachedThreadPool):
* 创建一个可根据需要创建新线程的线程池。如果线程空闲超过60秒,它们将被终止并从池中移除
*/
public class MainCacheThreadPool {
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "线程: Start at: " + new Date());
//初始化缓存线程池
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 1; i < 10; i++) {
System.out.println("添加了第" + i + "个任务类");
Thread.sleep(2000);
exec.execute(new TaskR(i));
}
//所有任务结束后关闭线程池
exec.shutdown();
System.out.println(Thread.currentThread().getName() + " 线程: Finished all threads at:" + new Date());
}
}
- 调度线程池 (newScheduledThreadPool):创建一个支持定时任务和周期性任务的线程池
package com.demo.threadPool;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 固定频率执行
* 调度线程池 (newScheduledThreadPool):
* 创建一个支持定时任务和周期性任务的线程池
*/
public class MainScheduledThreadPool {
public static void main(String[] args) {
/**
* 场景描述
* 假设你需要一个应用程序,该程序能够每10秒执行一次任务,并在启动后1分钟开始执行。此外,
* 你还需要能够安排一次性任务在未来的某个时间点执行
*/
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
// 安排定期任务
// 初始延迟1分钟,之后每10秒执行一次
threadPool.scheduleAtFixedRate(new TaskR(2), 60, 10, TimeUnit.SECONDS);
// 安排一次性任务
// 使用 schedule 方法安排一个任务,在指定的延迟后执行一次
// 延迟5分钟后执行
threadPool.schedule(new TaskR(3), 5, TimeUnit.MINUTES);
// 关闭线程池
// 当不再需要线程池时,调用 shutdown 方法来关闭线程池。这将等待正在执行的任务完成,但不接收新任务
threadPool.shutdown();
// 等待线程池关闭
// 使用 awaitTermination 方法等待线程池关闭,直到所有任务完成或超时。
try {
threadPool.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 使用给定的线程工厂创建线程池:可以提供一个自定义的 ThreadFactory 来创建线程池中的线程
package com.demo.threadPool;
import java.util.concurrent.*;
/**
* 使用给定的线程工厂创建线程池
*/
public class MainFactory {
public static void main(String[] args) {
//自定义线程工厂创建
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
};
//使用给定的线程工厂创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5, threadFactory);
executor.execute(new TaskR(2));
}
}
- 自定义线程工厂创建:自定义线程工厂可以设置自己的线程名,设置守护线程,设置线程优先级,处理未捕获的异常等
package com.demo.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义线程工厂:设置线程名,守护线程,优先级以及UncaughtExceptionHandler
*/
public class MainFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public MainFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix + "-thread-";
}
public MainFactory(ThreadGroup group, String namePrefix) {
this.group = group;
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);
//守护线程
if (t.isDaemon())
t.setDaemon(true);
//线程优先级
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
/**
* 处理未捕捉的异常
*/
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("处理未捕获的异常");
}
});
return t;
}
//测试方法
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5, new MainFactory("测试线程"));
for (int i = 0; i < 10; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程处理");
//未捕获的异常,走自定义的UncaughtExceptionHandler逻辑
int i = 1 / 0;
}
});
}
pool.shutdown();
}
}
五、ThreadPoolExecutor 类创建线程池
ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类。它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。
ThreadPoolExecutor 类中的几个重要方法
- execute():向线程池提交一个任务,交由线程池去执行
- submit():也是向线程池提交任务,但是和execute()方法不同,它能够返回任务执行的结果它实际上还是调用的 execute() 方法,只不过它利用了 Future 来获取任务执行结果
- invokeAll():提交一个任务集合
- invokeAny(): 提交一个任务集合,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
- shutdown():关闭线程池,再也不会接受新的任务不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止
- shutdownNow():关闭线程池,再也不会接受新的任务立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
- isShutdown():不在 RUNNING 状态的线程池,此方法就返回 true
- isTerminated():线程池状态是否是 TERMINATED
package com.demo.threadPool;
import java.util.Random;
import java.util.concurrent.*;
/**
* ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类
* 它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。
* 以下是几种常见的创建 ThreadPoolExecutor 线程池的方式
* 实际上 Executors 类也是调用 ThreadPoolExecutor 类创建的线程
*/
public class MainThreadPoolExecutor {
//测试方法
public static void main(String[] args) {
/**
* 核心线程数,核心线程就是一直存在的线程
*/
int corePoolSize = 5;
/**
* 最大线程数,表示线程池中最多能创建多少个线程
* 非核心线程数 = 最大线程数 - 核心线程数
*/
int maximumPoolSize = 10;
/**
* 默认情况下,只有当线程池中的线程数大于corePoolSize时,
* keepAliveTime才会起作用,则会终止,直到线程池中的线程数不超过corePoolSize
* 则会终止,直到线程池中的线程数不超过corePoolSize
* 但是如果调用了 allowCoreThreadTimeOut(boolean) 方法
* 在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为 0
* 针对非核心线程而言,表示线程没有任务执行时最多保持多久时间会终止
*/
long keepAliveTime = 60;
/**
* 时间单位
* 与 keepAliveTime 配合使用,针对非核心线程
*/
TimeUnit unit = TimeUnit.SECONDS;
/**
* 存放任务的阻塞队列
*/
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
/**
* 创建线程的工厂,可以为线程创建时起个好名字
*/
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
};
/**
* 拒绝策略
* 任务太多的时候会进行拒绝操作
* 核心线程,非核心线程,任务队列都放不下时
*/
// 自定义拒绝策略
RejectedExecutionHandler defaultHandler1 = new MyRejectedExecutionHandler();
// 默认策略,在需要拒绝任务时抛出RejectedExecutionException
RejectedExecutionHandler defaultHandler3 = new ThreadPoolExecutor.AbortPolicy();
// 直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
RejectedExecutionHandler defaultHandler2 = new ThreadPoolExecutor.CallerRunsPolicy();
// 直接丢弃任务
RejectedExecutionHandler defaultHandler4 = new ThreadPoolExecutor.DiscardPolicy();
// 丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃
RejectedExecutionHandler defaultHandler5 = new ThreadPoolExecutor.DiscardOldestPolicy();
/**
* 创建线程池
*/
ExecutorService service1 = new ThreadPoolExecutor( corePoolSize, maximumPoolSize,keepAliveTime,
unit,workQueue,threadFactory,defaultHandler1);
for (int i = 0; i < 10; i++) {
System.out.println("添加第"+i+"个任务");
service1.execute(new MyThread("线程"+i));
}
service1.shutdown();
}
}
/**
* 自定义拒绝策略
*/
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
new Thread(r,"新线程"+new Random().nextInt(10)).start();
}
}
/**
* 线程类
*/
class MyThread implements Runnable {
String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:"+Thread.currentThread().getName() +" 执行:"+name +" run");
}
}
六、Future 和 Callable 类使用创建线程池
- Callable 是一个函数式接口,它允许你定义一个任务,该任务可以返回一个结果并抛出异常。它是 Runnable 接口的扩展,增加了返回值和抛出异常的能力。
- 返回值:与 Runnable 接口不同,Callable 任务可以返回一个值,返回值通过 Future 对象获取。
- 异常:Callable 任务可以抛出异常,这些异常可以通过 Future 对象处理。
- Future 接口代表异步计算的结果。它提供了检查计算是否完成的方法,以及获取计算结果的方法。
- get():获取计算结果。如果计算尚未完成,此方法会阻塞,直到计算完成或抛出异常。
- isDone():检查计算是否完成。
- cancel():尝试取消任务。
- isCancelled():检查任务是否被取消
package com.demo.threadPool;
import java.util.concurrent.*;
/**
* Future 使用
*/
public class MainFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
System.out.println("开始时间戳为:" + System.currentTimeMillis());
Future<String> future = executorService.submit(new Test01());
String result = future.get(); //获取计算结果。如果计算尚未完成,此方法会阻塞,直到计算完成或抛出异常
boolean isdone = future.isDone(); //检查计算是否完成
boolean cancel = future.cancel(true); //尝试取消任务
boolean iscancelled = future.isCancelled(); //检查任务是否被取消
System.out.println("result:"+result);
System.out.println("isdone:"+isdone);
System.out.println("cancel:"+cancel);
System.out.println("iscancelled:"+iscancelled);
System.out.println("结束时间戳为:" + System.currentTimeMillis());
executorService.shutdown();
}
}
class Test01 implements Callable {
@Override
public Object call() throws Exception {
return "你好";
}
}
七、Spring 的 ThreadPoolTaskExecutor 类创建线程池
ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,它扩展了 Java 的 ThreadPoolExecutor 并提供了一些额外的配置和功能
- 添加依赖: 如果你的项目是一个 Maven 项目,确保你的 pom.xml 文件中包含了 Spring Boot 的依赖
- 配置线程池: 在 Spring Boot 应用程序中,你可以通过 Java 配置类来配置 ThreadPoolTaskExecutor
package com.cnpc.epai.assetcatalog.dmp.controller;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置类
*/
@Configuration
public class ConfigPoolConfiguration {
@Bean("TaskExecutorDemo")
public ThreadPoolTaskExecutor taskExecutorDemo(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10); // 核心线程数
threadPoolTaskExecutor.setMaxPoolSize(20);// 最大线程数
threadPoolTaskExecutor.setQueueCapacity(100); //工作队列
threadPoolTaskExecutor.setKeepAliveSeconds(60); // 非核心线程的空闲存活时间
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);//指定是否允许核心线程超时。这允许动态增长和收缩,即使与非零队列结合使用也是如此(因为最大池大小只有在队列已满时才会增长)
threadPoolTaskExecutor.setThreadNamePrefix("monitor-thread-pool-");// 设置线程名前缀
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// 拒绝策略
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);// 设置线程池关闭时需要等待子任务执行完毕,才销毁对应的bean
threadPoolTaskExecutor.initialize();//初始化线程池
return threadPoolTaskExecutor;
}
}
测试类
package com.cnpc.epai.assetcatalog.dmp.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class TestService {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Async("taskExecutor")
public void executeTask() {
taskExecutor.execute(() -> {
System.out.println("Executing task in thread: " + Thread.currentThread().getName());
});
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献10条内容
所有评论(0)