史上最强异步编程~CompletableFuture精读
是一个强大的类,它同时实现了Future和两个关键接口,集成了异步计算结果的表示与异步计算流程编排的双重功能。Future接口:该接口定义了异步操作的结果,提供了检查任务是否完成、获取计算结果以及尝试取消任务的能力。它是对未来可能完成的计算结果的一个抽象表示。CompletionStage接口:作为异步计算流程的核心,代表了异步操作中的一个阶段,允许将多个异步操作串联起来,形成复杂的异步执行流程。
为什么用CompletableFuture?
Future | CompletableFuture | RxJava | Reactor | |
---|---|---|---|---|
Composable(可组合) | ❌ | ✔️ | ✔️ | ✔️ |
Asynchronous(异步) | ✔️ | ✔️ | ✔️ | ✔️ |
Operator fusion(操作融合) | ❌ | ❌ | ✔️ | ✔️ |
Lazy(延迟执行) | ❌ | ❌ | ✔️ | ✔️ |
Backpressure(回压) | ❌ | ❌ | ✔️ | ✔️ |
-
可组合:可以将多个依赖操作通过不同的方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法,这些方法就是对“可组合”特性的支持。
-
操作融合:将数据流中使用的多个操作符以某种方式结合起来,进而降低开销(时间、内存)。
-
延迟执行:操作不会立即执行,当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
-
回压:某些异步阶段的处理速度跟不上,直接失败会导致大量数据的丢失,对业务来说是不能接受的,这时需要反馈上游生产者降低调用量。
摘抄:
简化异步编程:告别复杂的同步机制,CompletableFuture
凭借 thenApply
、thenAccept
、thenRun
等方法,直观编排异步操作,简化线程间通信与数据传递。
-
支持非阻塞操作:
CompletableFuture
引领非阻塞异步编程潮流,实现异步操作的无缝安排与结果通知,显著提升应用响应力与吞吐量。 -
丰富的组合功能:借助
thenCombine
、thenCompose
、allOf
、anyOf
等方法,CompletableFuture
轻松构建复杂异步流程,强化操作间的组合能力。 -
异常处理机制:通过
exceptionally
和handle
方法,CompletableFuture
提供了灵活的异常处理方案,有效避免异步操作异常导致的线程中断或程序崩溃。 -
支持操作取消:当异步任务不再需要时,
CompletableFuture
的cancel
方法允许开发者及时取消任务,优化资源利用。 -
无缝并发集成:与 Java 并发工具如
ExecutorService
紧密集成,CompletableFuture
为开发者提供了更为强大和灵活的并发控制能力。
在权衡RxJava与Reactor的广泛功能集与较高学习曲线后,我们聚焦于本次整合的核心需求——“异步处理”与“高度可组合性”。考虑到学习成本与实际需求,我们选择了相对轻量且直接满足需求的CompletableFuture
作为实现方案。这一选择不仅满足了项目需求,也确保了开发效率与后期维护的便捷性。
定义
CompletableFuture
是一个强大的类,它同时实现了 Future
和 CompletionStage
两个关键接口,集成了异步计算结果的表示与异步计算流程编排的双重功能。
-
Future接口:该接口定义了异步操作的结果,提供了检查任务是否完成、获取计算结果以及尝试取消任务的能力。它是对未来可能完成的计算结果的一个抽象表示。
-
CompletionStage接口:作为异步计算流程的核心,
CompletionStage
代表了异步操作中的一个阶段,允许将多个异步操作串联起来,形成复杂的异步执行流程。通过它,开发者可以灵活地编排和组合多个异步任务,实现高效且易于管理的异步编程模型。
使用场景
异步执行耗时操作
- 对于一些耗时的操作,如远程调用、数据库查询、文件读写等,使用
CompletableFuture
可以异步执行这些操作,避免阻塞主线程,从而提高系统的吞吐量和并发能力。例如,在电商系统中,当用户下单时,可以异步地调用支付接口、库存更新接口等,而不必等待这些操作完成后再继续执行后续逻辑。
并行处理多个独立任务
- 当一个任务可以被分解为多个独立的子任务时,可以使用
CompletableFuture
来并行执行这些子任务,并在所有子任务完成后进行结果合并。这种方式可以显著提高系统的性能和响应速度。例如,在查询用户信息时,可能需要同时从多个数据源(如数据库、缓存等)获取数据,使用CompletableFuture
可以并行地访问这些数据源,并在所有数据都获取到后进行整合。
组合多个异步任务的结果
- 有时候需要等待多个异步任务都完成后才能进行下一步处理,
CompletableFuture
提供了丰富的组合方法(如thenCombine
、thenAcceptBoth
、allOf
等)来等待多个异步任务的结果,并在所有任务完成后进行处理。例如,在生成报表时,可能需要从多个数据源获取数据,并等待所有数据都准备好后再进行报表的生成和渲染。
超时处理和异常处理
CompletableFuture
提供了超时处理和异常处理的方法,可以很方便地处理异步任务执行过程中出现的异常或者超时情况。例如,可以设置一个超时时间,如果异步任务在指定时间内没有完成,则抛出超时异常,并进行相应的处理。
实现异步回调
- 通过
CompletableFuture
的回调方法(如whenComplete
、exceptionally
等),可以在异步任务完成后执行特定的逻辑,如通知其他系统、记录日志等。这种方式使得异步任务的结果处理更加灵活和方便。
复杂异步逻辑的编排
CompletableFuture
支持复杂的异步逻辑编排,可以通过链式调用将多个异步操作组合在一起,形成一个复杂的异步流程。这种编排能力使得异步编程更加清晰和易于管理。
原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
-
UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
-
BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
设计思想
按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型。如下图所示:
被观察者
-
每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
-
被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。
观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
-
观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
-
观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
-
观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
整体流程
一元依赖
这里仍然以thenApply为例来说明一元依赖的流程:
-
将观察者Completion注册到CF1,此时CF1将Completion压栈。
-
当CF1的操作运行完成时,会将结果赋值给CF1中的result属性。
-
依次弹栈,通知观察者尝试运行。
初步流程设计如上图所示,这里有几个关于注册与通知的并发问题,大家可以思考下:
Q1:在观察者注册之前,如果CF已经执行完成,并且已经发出通知,那么这时观察者由于错过了通知是不是将永远不会被触发呢 ? A1:不会。在注册时检查依赖的CF是否已经完成。如果未完成(即result == null)则将观察者入栈,如果已完成(result != null)则直接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure的实现也没有对两个操作进行加锁,完成时间在这两个操作之间,观察者仍然得不到通知,是不是仍然无法触发?
A2:不会。入栈之后再次检查CF是否完成,如果完成则触发。
Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。
A3:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。
通过对以上3个问题的分析可以看出,CompletableFuture在处理并行问题时,全程无加锁操作,极大地提高了程序的执行效率。我们将并行问题考虑纳入之后,可以得到完善的整体流程图如下所示:
CompletableFuture支持的回调方法十分丰富,但是正如上一章节的整体流程图所述,他们的整体流程是一致的。所有回调复用同一套流程架构,不同的回调监听通过策略模式实现差异化。
二元依赖
我们以thenCombine为例来说明二元依赖:
thenCombine操作表示依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,避免重复触发。
多元依赖
依赖多个CompletableFuture的回调方法包括allOf
、anyOf
,区别在于allOf
观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf
观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树,执行结果层层通知,直到根节点,触发回调监听。
创建方式
方式一:new CompletableFuture()
- 当你需要手动设置CompletableFuture的结果时,你可以创建一个空的CompletableFuture实例,然后在异步任务完成时调用complete或completeExceptionally(如果发生异常)方法来设置其结果。
以下是一个使用CompletableFuture的示例,其中包括了如何创建一个空的CompletableFuture并在某个异步操作完成后设置其结果:
@Test
public void testCompletableFuture1() {
// 创建一个空的CompletableFuture
CompletableFuture<String> future = new CompletableFuture<>();
CountDownLatch cd = new CountDownLatch(1);
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 模拟异步任务
executorService.execute(() -> {
try {
// 假设这里进行一些异步操作
Boolean result = Boolean.TRUE;
if (result) {
// 异步任务完成后,设置CompletableFuture的结果
future.complete("异步调用完成");
} else {
future.completeExceptionally(new RuntimeException("异步任务出错"));
}
} catch (Exception e) {
// 捕获异常并处理,此处仅打印异常信息,也可以去掉
e.printStackTrace();
// 并可以设置异常到CompletableFuture
future.completeExceptionally(e);
} finally {
cd.countDown();
}
});
try {
cd.await();
executorService.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在主线程中等待异步结果
future.thenAccept(result -> {
System.out.println("异步结果: " + result);
}).exceptionally(e -> {
System.err.println("异步任务发生异常: " + e.getMessage());
throw new RuntimeException(e);
});
// 注意:实际场景中,主线程可能需要等待异步操作完成
// 例如,可以通过future.join()等待结果,但这里为了演示方便没有加
}
在这个示例中,我们创建了一个CompletableFuture实例future,然后启动了一个线程池+CountDownLatch来模拟异步任务。在异步任务完成后,我们使用complete方法将结果设置到future中。此外,如果异步任务中发生了异常,我们也可以使用completeExceptionally方法来设置异常。
然后,我们使用thenAccept方法来处理异步结果,它会在future完成时自动执行。如果future以异常结束,exceptionally方法会捕获该异常并进行处理。
注意:在实际应用中,你可能需要在主线程中等待异步操作完成,例如通过调用future.join()(这将会阻塞当前线程直到future完成)或使用其他同步机制。
方式二:使用completedFuture()方法
- 当你已经有一个结果并且想要立即提供一个已经完成的CompletableFuture实例时。这种方法避免了不必要的异步执行和等待,直接返回一个已经包含结果的CompletableFuture。
@Test
public void testCompletableFuture2() {
// 假设这是你的结果
String result = "这是已经准备好的结果";
// 使用completedFuture()方法立即创建一个已经完成的CompletableFuture
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture(result);
// 然后你可以像处理其他CompletableFuture一样处理它
completedFuture.thenAccept(r -> System.out.println("结果: " + r))
// 注意:thenAccept不处理返回值
.exceptionally(e -> {
// 注意:在大多数情况下,这里不会被调用,因为future已经完成且没有异常
System.err.println("异常: " + e.getMessage());
return null;
});
// 在这个例子中,因为future已经完成,所以thenAccept会立即执行
// 你不需要等待或调用任何同步方法
}
completedFuture()方法被用来创建一个已经包含结果的CompletableFuture。然后,我们使用了thenAccept方法来处理这个结果,但是请注意,由于CompletableFuture已经完成了,所以thenAccept会立即被调用,而不需要等待任何异步操作完成;exceptionally方法在这里大多数情况下都不会被调用,因为completedFuture()创建的CompletableFuture是已经完成的,并且没有异常的。
方式三:使用supplyAsync,有返回值
-
CompletableFuture 类中的一个静态方法,它允许你以异步方式执行一个 Supplier 函数式接口的实现,并返回一个新的 CompletableFuture的返回值,该 CompletableFuture 将在任务完成时包含结果。
-
注意supplyAsync的底层其实本质上也是通过new CompletableFuture()创建的。
supplyAsync() 方法有两个重载版本
-
一个使用系统默认的线程池
- 默认由系统本身默认的线程池控制,无法把控内部的参数,比如线程池的大小,名称,对排查问题会有很大的影响。
/**
* 使用默认的supplyAsync,不带线程池
*/
@Test
public void testCompletableFuture3() {
// 使用系统默认的线程池执行Supplier
CompletableFuture<String> futureWithDefaultPool = CompletableFuture.supplyAsync(() -> {
// 这里执行耗时的任务
return "默认线程池";
});
// 处理结果
futureWithDefaultPool.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(e -> {
System.err.println("发生异常: " + e.getMessage());
return null; // 注意:thenAccept不处理返回值
});
}
-
另一个允许你指定一个自定义的 Executor。
- 可以更细粒度地控制任务的执行环境(比如线程池的大小、线程的名称等)
/**
* 使用自定义线程池
*/
@Test
public void testCompletableFuture4() {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("自定义线程池-" + t.getId());
return t;
});
// 使用自定义的Executor执行Supplier
CompletableFuture<String> futureWithCustomExecutor = CompletableFuture.supplyAsync(() -> {
// 这里执行耗时的任务
return "自定义线程池";
}, executor);
// 处理结果
futureWithCustomExecutor.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(e -> {
System.err.println("发生异常: " + e.getMessage());
return null; // 注意:thenAccept不处理返回值
});
// 注意:在实际应用中,你应该在适当的时候关闭线程池
executor.shutdown();
}
方式四:runAsync,无返回值
-
CompletableFuture类的一个静态方法,它允许你创建一个新的CompletableFuture,这个CompletableFuture会在后台线程中执行一个任务,但是这个任务不会有任何返回值。也就是说,runAsync()方法主要用于执行一些没有返回值的任务,比如发送邮件、下载文件等等。
-
一个使用系统默认的线程池
- 默认由系统本身默认的线程池控制,无法把控内部的参数,比如线程池的大小,名称,对排查问题会有很大的影响。
@Test
public void testCompletableFuture5() {
// 使用系统默认的线程池执行Runnable
CompletableFuture<Void> futureWithDefaultPool = CompletableFuture.runAsync(() -> {
// 这里执行没有返回值的异步任务
System.out.println("正在执行任务:默认线程池,无返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务执行完成:默认线程池,无返回值");
});
futureWithDefaultPool.join();
// 处理完成或异常的情况(注意:runAsync的CompletableFuture类型是Void,所以thenAccept不会接收到任何值)
futureWithDefaultPool.thenRun(() -> System.out.println("执行完成后,添加操作"))
.exceptionally(e -> {
System.err.println("任务执行过程中发生异常: " + e.getMessage());
// 这里返回null是因为exceptionally处理的是异常,而不是正常的完成
return null; // 对于Void类型的CompletableFuture,exceptionally的返回值是无关紧要的
});
}
-
另一个允许你指定一个自定义的 Executor。
- 可以更细粒度地控制任务的执行环境(比如线程池的大小、线程的名称等)
@Test
public void testCompletableFuture6() {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("自定义线程池-" + t.getId());
return t;
});
// 使用系统默认的线程池执行Runnable
CompletableFuture<Void> futureWithCustomExecutor = CompletableFuture.runAsync(() -> {
// 这里执行没有返回值的异步任务
System.out.println("正在执行任务:自定义线程池,无返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务执行完成:自定义线程池,无返回值");
}, executor);
futureWithCustomExecutor.join();
// 处理完成或异常的情况(注意:runAsync的CompletableFuture类型是Void,所以thenAccept不会接收到任何值)
futureWithCustomExecutor.thenRun(() -> System.out.println("执行完成后,添加操作"))
.exceptionally(e -> {
System.err.println("任务执行过程中发生异常: " + e.getMessage());
// 这里返回null是因为exceptionally处理的是异常,而不是正常的完成
return null; // 对于Void类型的CompletableFuture,exceptionally的返回值是无关紧要的
});
// 注意:在实际应用中,你应该在适当的时候关闭线程池
executor.shutdown();
}
获取结果
get
当调用此方法时,如果异步操作尚未完成,当前线程将会被阻塞,直到异步操作完成并返回其结果。如果异步操作在执行过程中遇到了异常,get()
方法会抛出一个 ExecutionException
,该异常封装了原始的异常信息,以便调用者能够获取到导致异步操作失败的具体原因。此外,如果当前线程在等待异步操作完成的过程中被中断,get()
方法将不再等待并立即抛出一个 InterruptedException
,以通知调用者线程已被中断。
/**
* 获取结果之get
*/
@Test
public void testCfGet() {
CompletableFuture<String> cf7 = CompletableFuture.supplyAsync(() -> {
// 这里执行有返回值的异步任务
System.out.println("正在执行任务:默认线程池,有返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务执行完成:默认线程池,有返回值");
return "Hello, World!"; // 返回结果
});
try {
cf7.get();
} catch (Exception e) {
System.out.println(e.getMessage());
Thread.currentThread().interrupt();
}
cf7.thenAccept(result -> {
System.out.println("异步任务的结果是: " + result);
}).exceptionally(e -> {
throw new RuntimeException(e);
});
}
join
join()
方法与 get()
方法功能相似,都用于阻塞当前线程以获取 CompletableFuture
的结果。然而,它们在处理线程中断时的行为上有所不同。具体来说,join()
方法不会抛出 InterruptedException
;如果当前线程在等待结果期间被中断,join()
会将中断状态重新设置到当前线程(即恢复中断状态),但不会立即抛出中断异常。这使得 join()
成为 get()
方法的一个非抛出中断异常的替代选择,对于那些希望在中断时能够优雅处理(而非立即退出)的场景更为适用。
/**
* 获取结果之Join
*/
@Test
public void testCfJoin() {
CompletableFuture<String> cf7 = CompletableFuture.supplyAsync(() -> {
// 这里执行有返回值的异步任务
System.out.println("正在执行任务:默认线程池,有返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务执行完成:默认线程池,有返回值");
return "Hello, World!"; // 返回结果
});
cf7.join();
cf7.thenAccept(result -> {
System.out.println("异步任务的结果是: " + result);
}).exceptionally(e -> {
throw new RuntimeException(e);
});
}
isDeno
isDone()
方法提供了一种非阻塞的方式来检查 CompletableFuture
是否已经完成了其异步操作。该方法立即返回一个布尔值:如果异步操作已经完成(无论是正常完成还是异常完成),则返回 true
;如果异步操作仍在执行中,则返回 false
。由于 isDone()
方法不会阻塞调用线程,因此它非常适合用于轮询异步操作的完成状态,尤其是在需要基于完成状态来做出进一步决策的场景中
/**
* 获取结果之isDone
*/
@Test
public void testIsDone() {
CompletableFuture<Void> cf7 = CompletableFuture.runAsync(() -> {
// 这里执行有返回值的异步任务
System.out.println("正在执行任务:默认线程池,有返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务执行完成:默认线程池,有返回值");
});
if (cf7.isDone()){
System.out.println("任务执行完成");
}else{
System.out.println("任务执行中");
}
}
使用方式的区别
-
get()
和join()
两者均用于从CompletableFuture
中获取异步操作的结果,但它们在处理线程中断时表现出不同的行为:get()
在等待异步操作完成的过程中,如果当前线程被中断,则会抛出InterruptedException
;而join()
在相同情况下则会将中断状态设置回线程,但不抛出异常,允许调用者自行决定如何处理中断。 -
isDone()
方法则完全不同,它仅用于检查异步操作是否已经完成,而不会阻塞调用线程等待结果。这一特性使得isDone()
非常适合于非阻塞的轮询场景,允许开发者在不阻塞执行流程的前提下,定期检查异步操作的完成状态。
在实际开发中,选择使用 get()
、join()
还是 isDone()
,应基于具体需求来决定,包括是否需要处理线程中断、是否需要立即获取结果、以及是否希望当前线程在执行过程中被阻塞等因素。
一元依赖:任务的回调
thenRun/thenRunAsync方法(与当前操作无关)
-
执行器差异:
thenRun
使用默认的ForkJoinPool.commonPool()
执行后续任务,而thenRunAsync
允许通过自定义Executor
来异步执行。 -
返回值:两者均返回
CompletableFuture<Void>
,表明它们不产生结果值。 -
自定义线程池:
thenRun
依赖于默认的线程池,而thenRunAsync
提供了自定义线程池的能力。 -
执行方式:两者都在 CompletableFuture 的异步操作完成后触发,但
thenRunAsync
通过自定义 Executor 提供了更灵活的异步执行控制。
使用场景
-
- 清理资源
当你执行一个异步操作,该操作可能会使用到一些资源(如文件句柄、数据库连接等),你可以使用 thenRun
来确保这些资源在操作完成后被正确释放或关闭。
-
- 发送通知
异步操作完成后,你可能需要发送电子邮件、短信或消息通知给相关方。thenRun
可以用来执行这些通知任务,确保它们只在操作真正完成后才进行。
-
- 更新UI
在GUI应用程序中,你可能需要在后台线程中执行耗时的操作,并在操作完成后更新用户界面。由于UI更新通常需要在主线程(或UI线程)上进行,你可以使用 thenRun
来安排一个任务,该任务在后台操作完成后在主线程上执行UI更新。
-
- 记录日志
记录日志是任何应用程序中都非常重要的一个方面。使用 thenRun
,你可以在异步操作完成后立即记录相关的日志信息,这有助于跟踪应用程序的行为和调试潜在的问题。
-
- 触发后续操作
有时,一个异步操作的完成是触发另一个操作的前提。使用 thenRun
,你可以确保在第一个操作完成后立即启动第二个操作,而无需编写复杂的同步代码。
-
- 执行不依赖于异步操作结果的代码
如果你的 CompletableFuture
链中的某个步骤不需要前一个步骤的结果,但需要在前一个步骤完成后执行,那么 thenRun
是一个很好的选择。它允许你插入一个不依赖于前面步骤结果的 Runnable
任务。
thenRun 的具体使用方法
"当 CompletableFuture
完成时(不论成功或失败),thenRun
方法允许你指定一个 Runnable
任务,该任务将在 Java 的默认 ForkJoinPool
的公共线程池(或者指定一个线程池)中异步执行。这样,你可以在异步操作完成后,轻松地在不阻塞原始线程的情况下执行额外的代码。
/**
* 测试thenRun
*/
@Test
public void testThenRun() {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("自定义线程池-" + t.getId());
return t;
});
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
// 这里执行有返回值的异步任务
System.out.println("正在执行任务:默认线程池,无返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor).thenRun(() -> {
System.out.println("执行完成后,添加操作");
});
cf.join();
}
注意事项
-
thenRun
中的任务会在CompletableFuture
链中的前一个任务完成后执行,但无论前一个任务是正常完成还是异常完成。 -
thenRun
返回一个新的CompletableFuture<Void>
,它将在Runnable
任务执行完成后完成。 -
如果
CompletableFuture
链中的任何步骤发生异常,并且你没有在链中显式地处理这些异常(例如,使用exceptionally
或handle
方法),那么这些异常将会传播到链的末尾,并最终可能导致整个链的失败。因此,请确保你理解了异常在CompletableFuture
链中的传播方式,并适当地处理它们。
thenRunAsync 的具体使用方法
thenRunAsync
方法允许在 CompletableFuture
链中的前一个任务完成后,利用指定的 Executor
在不同的线程中异步执行提供的 Runnable
任务。这样,你可以灵活地控制任务执行的线程,同时保持异步编程的便利性和非阻塞特性。
/**
* 测试thenRunAsync
*/
@Test
public void testThenRunAsync() {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("自定义线程池-" + t.getId());
return t;
});
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
// 这里执行有返回值的异步任务
System.out.println("正在执行任务:默认线程池,无返回值...");
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor).thenRunAsync(() -> {
System.out.println("执行完成后,添加操作");
}, executor);
cf.join();
}
thenApply/thenApplyAsync方法(与当前操作紧密相连)
CompletableFuture
的 thenApply
和 thenApplyAsync
方法在异步操作成功完成后,对结果进行转换或处理,并将转换后的结果封装在一个新的 CompletableFuture
对象中。这两个方法的主要区别在于它们的执行方式和执行器(Executor)的使用上。
-
执行方式:
thenApply
方法采用同步方式执行提供的函数,即该函数将在调用它的线程(通常是CompletableFuture
链中前一个任务所在的线程)上执行。相反,thenApplyAsync
方法则明确以异步方式执行函数,这允许将函数的执行委托给不同的线程,以实现更好的并行性或避免阻塞。 -
执行器(Executor):
thenApply
默认使用CompletableFuture
内部的默认执行器,通常是ForkJoinPool.commonPool()
,来(尽管是同步地)执行提供的函数。而thenApplyAsync
允许你指定一个自定义的ExecutorService
,从而提供了对执行环境更细致的控制。如果没有明确指定执行器,thenApplyAsync
也会使用ForkJoinPool.commonPool()
,但关键在于它支持自定义执行器。 -
返回值:两者都返回一个新的
CompletableFuture
对象,其泛型类型反映了应用函数后的结果类型。这使得你可以继续链式调用其他CompletableFuture
方法,构建复杂的异步操作流。
综上所述,thenApply
和 thenApplyAsync
方法为异步编程提供了灵活的结果转换和处理机制,同时通过不同的执行方式和执行器选择,满足了不同场景下的性能和控制需求。
常见的使用场景如下:
-
- 结果转换
- 当需要对前一个异步操作的结果进行某种转换或计算时,可以使用 thenApply 方法。它接受一个函数作为参数,这个函数会以前一个操作的结果作为输入,并返回一个新的结果。这种方法类似于函数式编程中的 map 操作。
示例: 假设我们有一个异步操作返回一个用户ID,然后我们需要使用这个ID来查询用户的详细信息
CompletableFuture<String> userIdFuture = CompletableFuture.supplyAsync(() -> {
// 模拟异步获取用户ID
return "12345";
});
CompletableFuture<User> userFuture = userIdFuture.thenApply(userId -> {
// 根据用户ID查询用户信息(这里假设是同步操作)
return getUserById(userId);
});
// getUserById 是一个假设的同步方法,用于根据ID查询用户信息
User getUserById(String userId) {
// 模拟数据库查询
return new User(userId, "zhangliang");
}
-
- 链式调用
- thenApply 方法允许我们链式地调用多个异步操作,每个操作都以前一个操作的结果作为输入。这种链式调用可以构建出复杂的异步操作流,而无需显式地等待每个操作完成。
示例: 在电商系统中,可能先异步地保存订单到数据库,然后根据订单ID发送确认邮件,最后更新订单状态。
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> saveOrderToDatabase(order))
.thenApply(orderId -> sendConfirmationEmail(orderId))
.thenApply(emailSuccess -> updateOrderStatus(orderId, Status.CONFIRMED))
.thenAccept(status -> { /* 处理结果或什么都不做 */ });
// 假设的方法定义
String saveOrderToDatabase(Order order) { /* 保存订单 */ }
String sendConfirmationEmail(String orderId) { /* 发送邮件 */ }
void updateOrderStatus(String orderId, Status status) { /* 更新订单状态 */ }
-
- 轻量级操作
- 当需要对异步操作的结果执行一些轻量级的转换或计算时,thenApply 是一个很好的选择。由于这些操作通常不会阻塞线程太久,因此可以在相同的线程上执行,从而避免不必要的线程切换开销。
-
- 避免阻塞
- 虽然 thenApply 本身不保证异步执行(它可能在当前线程上执行,如果结果已经准备好),但通过使用它,我们可以避免在主线程或关键路径上执行耗时的计算或I/O操作,从而保持应用的响应性。
thenApply 的具体使用方法
thenApply
方法会在前一个 CompletableFuture
完成后,异步地执行提供的函数,并将该函数的返回值封装在一个新的 CompletableFuture
中
@Test
public void testThenApply() {
// 创建一个CompletableFuture,假设它代表了一个异步操作
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(1000); // 假设这个操作需要1秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result of long-running operation";
});
// 使用thenApply方法,在前一个CompletableFuture完成后,异步地执行提供的函数
CompletableFuture<String> processedFuture = future.thenApply(result -> {
// 这里可以对result进行进一步的处理
return "Processed: " + result;
});
// 获取处理后的结果(这会阻塞,直到结果准备好)
try {
String finalResult = processedFuture.get(); // 调用get()会阻塞,直到结果准备好
System.out.println(finalResult);
} catch (Exception e) {
e.printStackTrace();
}
}
利用前一步的CompletableFuture的结果去通过thenApply去做处理
thenApplyAsync 的具体使用方法
thenApplyAsync
允许你在前一个 CompletableFuture
的计算完成之后,异步地(即在新线程中)执行一个给定的函数。这个函数会以前一个 CompletableFuture
的结果作为输入,并产生一个新的结果。thenApplyAsync 方法接受一个 ExecutorService
作为参数,用于控制新任务的执行环境,这提供了对任务执行并发性的精细控制。
/**
* 测试thenApplyAsync
*/
@Test
public void testThenApplyAsync() {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("自定义线程池-" + t.getId());
return t;
});
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
return "Hello, World!";
}, executor).thenApplyAsync((res) -> {
return res + "你好";
},executor);
System.out.println(cf.join());
}
thenCompose/thenComposeAsync方法
CompletableFuture
的 thenCompose
和 thenComposeAsync
方法是构建异步编程链的关键工具,它们允许你基于前一个异步操作的结果来启动另一个异步操作。这两个方法的主要区别在于它们的执行方式和线程控制:
-
执行方式:
thenCompose
方法以同步方式(相对于其内部逻辑而言)处理结果的传递,即它会等待当前CompletableFuture
完成,并立即(在同一线程或ForkJoinPool的公共线程中,取决于上下文)启动下一个异步操作。而thenComposeAsync
则明确地以异步方式执行,它允许你指定一个自定义的Executor
来控制下一个异步操作的执行线程,或者如果不指定,则使用系统默认的异步执行器。 -
线程控制:
thenCompose
依赖于其调用上下文中的执行器(通常是ForkJoinPool的公共线程池,但也可能受外部因素影响),而thenComposeAsync
提供了更大的灵活性,允许你通过参数指定一个自定义的线程池来精确控制后续异步任务的执行环境。 -
返回值:两者都返回一个新的
CompletableFuture
实例,该实例代表了链中下一个异步操作的结果。这个新CompletableFuture
的类型与传递给下一个异步操作的函数的返回类型相匹配,从而支持类型安全的链式调用。
总结:
thenCompose
适用于那些不需要额外线程控制或默认执行策略已满足需求的场景;
thenComposeAsync
则为需要更细致控制异步任务执行线程的场景提供了强大的灵活性。通过这两个方法,可以轻松地构建出复杂且高效的异步任务链。
使用场景
thenCompose 的使用场景
-
连续异步操作: 当你需要基于前一个异步操作的结果来启动另一个异步操作时,
thenCompose
非常有用。它允许你将多个异步任务链接起来,形成一个流畅的链式调用。这种方式可以避免回调地狱,使代码更加清晰和易于维护。 -
结果转换: 如果你需要对前一个异步操作的结果进行某种转换或处理,并希望这个转换过程也是异步的(即转换过程可能涉及另一个异步操作),那么
thenCompose
是合适的选择。通过它,你可以将转换逻辑封装在一个返回CompletableFuture
的函数中,并将这个函数作为thenCompose
的参数。 -
保持类型安全:
thenCompose
方法能够保持链式调用的类型安全。当你使用它时,返回的新CompletableFuture
的类型将自动匹配你提供的函数中返回的CompletableFuture
的类型。
thenComposeAsync 的使用场景
-
自定义异步执行环境: 当你需要更细粒度地控制异步操作的执行环境时,
thenComposeAsync
提供了这种能力。通过提供一个自定义的Executor
,你可以指定在哪个线程池上执行后续的异步操作,这有助于优化应用程序的性能和资源使用。 -
提高并发性: 在某些情况下,你可能希望尽快地启动后续的异步操作,以便更好地利用多核处理器的优势。
thenComposeAsync
通过确保后续操作在另一个线程上异步执行,有助于实现这一目标。 -
避免线程饥饿: 如果你正在使用默认的线程池(如 ForkJoinPool 的公共线程池),并且担心因为大量异步任务同时执行而导致线程饥饿问题,
thenComposeAsync
允许你通过提供自定义的线程池来减轻这种风险。
thenCompose 的具体使用方法
**thenCompose **方法允许你以链式调用的方式,将一个 **CompletableFuture **的结果作为参数传递给另一个返回 **CompletableFuture **的函数。一旦当前的 **CompletableFuture **完成(无论是正常完成还是异常完成),**thenCompose **会立即执行这个提供的函数,并返回该函数生成的新的 CompletableFuture。
/**
* 模拟异步获取用户ID
*
* @return
*/
public static CompletableFuture<Integer> fetchUserIdAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1; // 假设这是从某处获取的用户ID
});
}
/**
* 模拟根据用户ID异步获取用户详细信息
*
* @param userId
* @return
*/
public static CompletableFuture<String> fetchUserInfoAsync(int userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "User " + userId + " details"; // 假设这是用户详细信息
});
}
@Test
public void testThenCompose() {
// 链接两个异步操作
CompletableFuture<String> userInfoFuture = fetchUserIdAsync()
.thenCompose(userId -> fetchUserInfoAsync(userId));
// 处理结果
userInfoFuture.thenAccept(System.out::println)
.exceptionally(e -> {
System.err.println("Error fetching user info: " + e.getMessage());
return null;
});
userInfoFuture.join();
}
thenComposeAsync 的具体使用方法
**thenComposeAsync **方法不仅接受一个返回 **CompletableFuture **的函数作为参数,而且它还确保这个函数的执行是异步的。此外,**thenComposeAsync **允许你提供一个自定义的 Executor,以便更灵活地控制这个异步函数的执行环境,包括线程池的大小、类型等
@Test
public void testThenComposeAsync() {
// 创建一个自定义的Executor
ExecutorService executor = Executors.newFixedThreadPool(2);
// 链接两个异步操作,第二个操作使用自定义的Executor
CompletableFuture<String> userInfoFuture = fetchUserIdAsync()
.thenComposeAsync(userId -> fetchUserInfoAsync(userId), executor);
// 处理结果(与上面相同)
userInfoFuture.thenAccept(userInfo -> System.out.println(userInfo))
.exceptionally(e -> {
System.err.println("Error fetching user info: " + e.getMessage());
return null;
});
userInfoFuture.join();
executor.shutdown();
}
/**
* 模拟异步获取用户ID
*
* @return
*/
public static CompletableFuture<Integer> fetchUserIdAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1; // 假设这是从某处获取的用户ID
});
}
/**
* 模拟根据用户ID异步获取用户详细信息
*
* @param userId
* @return
*/
public static CompletableFuture<String> fetchUserInfoAsync(int userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "User " + userId + " details"; // 假设这是用户详细信息
});
}
thenAccept/thenAcceptAsync方法
CompletableFuture
的 thenAccept
和 thenAcceptAsync
方法提供了在异步操作完成后对结果进行处理的机制。这两个方法均接收一个 Consumer
函数式接口的实现作为参数,该函数定义了当异步操作成功完成时应该如何处理其结果。它们之间的主要区别体现在执行方式和线程控制上:
-
执行方式:
thenAccept
方法在CompletableFuture
完成后的当前线程(或系统默认的ForkJoinPool.commonPool()
中的某个线程,具体取决于上下文)上同步执行传入的Consumer
。这意味着,如果CompletableFuture
是在一个非默认线程池中完成的,而thenAccept
被调用时没有明确指定执行器,则Consumer
可能会在同一个线程上执行,但也可能不会,具体取决于系统的调度。相比之下,thenAcceptAsync
总是异步执行Consumer
,即它会将Consumer
的执行委托给另一个线程,这个线程可以是自定义的ExecutorService
中的线程,如果没有指定,则使用默认的异步执行器。 -
自定义线程池:
thenAccept
方法不提供直接指定执行器的选项,它依赖于系统默认的ForkJoinPool.commonPool()
(除非通过其他方式间接影响,如使用CompletableFuture.asyncSupplyStage()
等方法时指定了执行器)。而thenAcceptAsync
方法允许你通过传递一个ExecutorService
实例来明确指定用于执行Consumer
的线程池,从而提供了更高的灵活性和控制力。 -
返回值:两者都返回
CompletableFuture<Void>
类型的结果,表明这些方法本身不产生任何结果值。这反映了它们的主要目的是处理异步操作的结果,而不是返回新的计算结果。
总结:
thenAccept
适用于那些不需要额外线程控制且可以接受在当前线程(或默认线程池中的线程)上同步处理结果的场景。
thenAcceptAsync
则更适用于需要异步处理结果,或者需要精确控制执行结果的线程环境的场景。
thenAccept 的具体使用方法
**thenAccept **用于在前一个 CompletableFuture 任务成功完成后,立即(但不一定在同一线程上,尽管在大多数情况下可能如此,具体取决于执行器和上下文)在默认的执行器(通常是 ForkJoinPool 的公共线程池,但可能因上下文而异)上同步执行一个给定的 Consumer 函数。这个 Consumer 函数接受前一个任务的结果作为输入,但不返回任何结果(即返回类型为 void)。通过这种方式,thenAccept 允许你对异步操作的结果进行消费处理,而无需进一步的异步链式调用。
@Test
public void testThenAccept(){
// 创建一个ExecutorService,用于thenAcceptAsync
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个异步任务,这里只是模拟返回了一个结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟异步操作,比如网络请求或数据库查询
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello, World!";
});
// 使用thenAccept来处理异步结果
future.thenAccept(result -> {
System.out.println("Handling result synchronously: " + result);
// 注意:这里不能返回结果给调用者,只能进行消费操作
});
future.join();
executor.shutdown();
}
thenAcceptAsync 的具体使用方法
thenAcceptAsync允许你在前一个 **CompletableFuture **成功完成时,以异步的方式执行一个 **Consumer **函数。这个函数不会返回结果,但会以前一个 **CompletableFuture **的结果作为输入参数。重要的是,**thenAcceptAsync **允许你指定一个 ExecutorService,这个服务将用于异步地执行这个 Consumer 函数,从而提供了对执行线程的灵活控制。这种方式使得你可以在不影响主线程或其他任务执行的情况下,对异步操作的结果进行处理。
@Test
public void testThenAcceptAsync(){
// 创建一个ExecutorService,用于thenAcceptAsync
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个异步任务,这里只是模拟返回了一个结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟异步操作,比如网络请求或数据库查询
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello, World!";
});
future.join();
// 使用thenAcceptAsync来处理异步结果,并指定自定义的Executor
future.thenAcceptAsync(result -> {
System.out.println("Handling result asynchronously with custom Executor: " + result);
// 这个操作会在自定义的Executor中异步执行
}, executor);
System.out.println(future.join());
executor.shutdown();
}
whenComplete/whenCompleteAsync方法
CompletableFuture
的 whenComplete
和 whenCompleteAsync
方法提供了一种机制,允许在异步操作完成后(无论成功还是抛出异常)执行一个特定的操作。这两个方法都接收一个 BiConsumer<? super T, ? super Throwable>
类型的参数,该参数是一个函数式接口的实现,能够访问异步操作的结果(如果操作成功)或异常(如果操作失败)。
主要区别:
-
执行方式:
whenComplete
方法默认在CompletableFuture
使用的执行器(通常是ForkJoinPool.commonPool()
)上同步执行给定的BiConsumer
。这意味着如果异步操作已经完成,并且当前线程可用,BiConsumer
将立即在当前线程上执行。而whenCompleteAsync
方法则明确地将BiConsumer
的执行提交到另一个线程,从而异步地执行它。 -
自定义线程池:
whenComplete
方法不使用自定义线程池,它依赖于CompletableFuture
的默认执行环境。相反,whenCompleteAsync
方法允许你指定一个自定义的ExecutorService
,从而提供了对BiConsumer
执行线程的更细粒度控制。
返回值:
- 两者都返回与原始
CompletableFuture
相同类型的结果(即CompletableFuture<T>
),但重要的是要注意,这个返回的CompletableFuture
已经与原始CompletableFuture
的完成状态同步。这意味着,即使你通过whenComplete
或whenCompleteAsync
添加了额外的操作,返回的CompletableFuture
仍然会反映原始异步操作的成功或失败状态。
总结,
whenComplete
适用于那些不需要额外线程开销,且可以在当前执行环境中直接处理结果的场景。
whenCompleteAsync
则更适合于需要确保处理逻辑不会阻塞原始异步操作执行线程的场景,特别是当处理逻辑本身比较耗时或需要并发执行时。
使用场景
whenComplete
和 whenCompleteAsync
是 Java 中 CompletableFuture
类提供的两个方法,它们用于在异步操作完成时执行特定的操作,无论这个异步操作是成功完成还是出现了异常。以下是这两个方法的使用场景:
whenComplete 的使用场景
-
结果和异常处理:
-
当你需要在异步操作完成后同时处理结果和可能发生的异常时,
whenComplete
是一个很好的选择。它接收一个BiConsumer<T, Throwable>
类型的参数,这个参数允许你访问异步操作的结果(如果操作成功完成)或异常(如果操作失败)。 -
使用
whenComplete
,你可以在一个地方同时处理正常结果和异常,从而避免编写重复的异常处理代码。
-
-
清理资源:
- 如果你的异步操作涉及到了资源的使用(如文件句柄、数据库连接等),那么在操作完成后及时释放这些资源是非常重要的。
whenComplete
可以在操作完成后立即执行资源释放的逻辑,无论操作是成功还是失败。
- 如果你的异步操作涉及到了资源的使用(如文件句柄、数据库连接等),那么在操作完成后及时释放这些资源是非常重要的。
-
执行依赖于异步操作结果的后续操作:
- 有时,你可能需要在异步操作完成后执行一些依赖于其结果(或异常)的后续操作。虽然
thenApply
、thenAccept
和exceptionally
等方法提供了更专注于结果或异常处理的选项,但whenComplete
提供了更通用的方式,允许你在一个回调中同时处理两者。
- 有时,你可能需要在异步操作完成后执行一些依赖于其结果(或异常)的后续操作。虽然
whenCompleteAsync 的使用场景
-
非阻塞的后续操作:
- 当你希望异步操作完成后的处理逻辑本身也是异步的,以避免阻塞主线程或其他重要线程时,
whenCompleteAsync
是一个合适的选择。它允许你指定一个ExecutorService
来异步执行处理逻辑,从而提高了应用程序的响应性和吞吐量。
- 当你希望异步操作完成后的处理逻辑本身也是异步的,以避免阻塞主线程或其他重要线程时,
-
提高并发性:
- 在高并发的应用场景中,使用
whenCompleteAsync
可以帮助提高系统的并发处理能力。通过将处理逻辑异步化,你可以减少线程等待时间,使得系统能够更快地处理更多的请求。
- 在高并发的应用场景中,使用
-
细粒度的线程控制:
- 与
whenComplete
不同,whenCompleteAsync
允许你指定执行后续操作的具体线程池。这提供了更细粒度的线程控制能力,允许你根据任务的性质和系统资源的状况来优化线程的使用。
- 与
总结
-
如果你需要在异步操作完成后立即处理结果和异常,并且不需要额外的线程来执行这些处理逻辑,那么
whenComplete
是一个简单而有效的选择。 -
如果你希望处理逻辑本身是异步的,或者你需要对执行处理逻辑的线程进行细粒度的控制,那么
whenCompleteAsync
可能是更好的选择。
whenComplete 的具体使用方法
-
**whenComplete **允许你注册一个 BiConsumer<T, Throwable> 回调,该回调将在 **CompletableFuture **完成时被调用,无论是正常完成还是发生异常。
-
**whenComplete **的 **BiConsumer **接收两个参数:第一个是操作成功完成时的结果(如果操作有结果的话),第二个是操作失败时抛出的异常(如果操作正常完成,则此参数为 null)。重要的是,whenComplete 方法会在默认的执行器(通常是 ForkJoinPool 的公共线程池,但可能受上下文影响)上同步执行提供的 BiConsumer,这意味着回调的执行不会引入额外的异步性。这种方式使得 whenComplete 非常适合于需要在异步操作完成时执行清理操作、记录日志或执行与结果无关但需要等待异步完成的其他任务。
@Test
public void testWhenComplete(){
// 创建一个ExecutorService,用于whenCompleteAsync(可选)
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个异步任务,这里只是模拟返回了一个结果或抛出了一个异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 假设这里是正常返回结果
// return "Operation successful";
// 这里模拟抛出异常
throw new RuntimeException("Operation failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted", e);
}
});
// 使用whenComplete来处理异步操作完成后的结果或异常
future.whenComplete((result, exception) -> {
if (exception != null) {
System.err.println("Async operation failed: " + exception.getMessage());
} else {
System.out.println("Async operation completed successfully: " + result);
}
// 注意:这里是在默认执行器上同步执行的
});
// 阻塞主线程以等待异步操作完成
// 只等待原始的CompletableFuture完成
future.join();
}
whenCompleteAsync 的具体使用方法
- whenCompleteAsync允许你在 **CompletableFuture **无论是正常完成还是发生异常时,都异步地执行一个 BiConsumer 函数。这个 BiConsumer 接收两个参数:第一个是异步操作的结果(如果操作成功完成),或者是 null(如果操作因为异常而失败);第二个是异常对象(如果操作因为异常而完成,则为该异常,否则为 null)。whenCompleteAsync 方法特别之处在于它允许你指定一个 ExecutorService,以便控制 BiConsumer 的执行线程。如果没有指定 ExecutorService,则默认使用公共的 ForkJoinPool。这意味着,即使原始的 CompletableFuture 操作是在某个特定的线程上完成的,whenCompleteAsync 也会确保 BiConsumer 的执行是异步的,并且可以在不同的线程上执行。这种机制使得 whenCompleteAsync 非常适合于需要在异步操作完成后执行清理工作、记录日志、发送通知等任务的场景
@Test
public void testWhenCompleteAsync(){
// 创建一个ExecutorService,用于whenCompleteAsync(可选)
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个异步任务,这里只是模拟返回了一个结果或抛出了一个异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 假设这里是正常返回结果
// return "Operation successful";
// 这里模拟抛出异常
throw new RuntimeException("Operation failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted", e);
}
});
// 使用whenCompleteAsync来处理异步操作完成后的结果或异常,并指定自定义的Executor
future.whenCompleteAsync((result, exception) -> {
if (exception != null) {
System.err.println("Async operation failed (async callback): " + exception.getMessage());
} else {
System.out.println("Async operation completed successfully (async callback): " + result);
}
// 这个回调将在自定义的Executor上异步执行
}, executor);
// 阻塞主线程以等待异步操作完成
// 只等待原始的CompletableFuture完成
future.join();
executor.shutdown();
}
handle/handleAsync方法
CompletableFuture
的 handle
和 handleAsync
方法为处理异步操作完成后的结果或异常提供了强大的机制。这两个方法都接收一个 BiFunction<? super T, Throwable, ? extends U>
类型的函数式接口实现作为参数,该函数能够接受异步操作的结果(如果操作成功完成)或捕获的异常(如果操作失败),并基于这些信息返回一个新的值。
-
主要区别:
handle
方法会在CompletableFuture
完成的线程(通常是ForkJoinPool.commonPool()
中的某个线程,但具体取决于上下文)上同步执行提供的BiFunction
。而handleAsync
方法则明确地将BiFunction
的执行委托给另一个线程,这个线程可以是自定义的ExecutorService
中的线程,如果没有指定,则使用默认的异步执行器。这意味着handleAsync
提供了更高的灵活性和对并发执行的控制。 -
返回值:两者都返回一个新的
CompletableFuture<U>
实例,其中U
是由BiFunction
产生的结果的类型。这个新的CompletableFuture
代表了BiFunction
执行完成后的结果,允许进一步的链式调用或异步处理。 -
自定义线程池:
handle
方法通常依赖于CompletableFuture
创建时或后续操作中指定的执行器(默认为ForkJoinPool.commonPool()
),而handleAsync
方法允许通过其参数显式指定一个自定义的ExecutorService
,以便更精确地控制BiFunction
的执行环境。 -
执行方式:
handle
方法以同步方式执行BiFunction
,即它会等待异步操作完成,并在同一线程(或默认执行器中的线程)上立即执行BiFunction
。相反,handleAsync
方法以异步方式执行BiFunction
,将其提交给另一个线程执行,从而允许当前线程继续执行其他任务。
handle 的具体使用方法
**handle **允许你在 CompletableFuture 的计算完成(无论是正常完成还是异常完成)之后,执行一个给定的 BiFunction。这个 BiFunction 接受两个参数:第一个是 CompletableFuture 计算的结果(如果计算正常完成则为结果值,如果异常则为 null),第二个是计算过程中抛出的异常(如果计算正常完成则为 null),并返回一个结果。重要的是,handle 方法默认在当前线程(如果可能)或默认的 ForkJoinPool 上同步执行这个 BiFunction,而不会启动新的异步任务。这种方式使得你能够在单个步骤中处理正常结果和异常情况,并可能返回一个新的 CompletableFuture 以继续异步链式调用。
@Test
public void testHandle() {
// 创建一个异步任务,这里只是模拟返回了一个结果或抛出了一个异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 假设这里是正常返回结果
return "Operation successful";
// 或者模拟抛出异常
// throw new RuntimeException("Operation failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted", e);
}
});
// 使用handle方法处理异步操作完成后的结果或异常,并返回一个新的CompletableFuture
CompletableFuture<String> handledFuture = future.handle((result, exception) -> {
if (exception != null) {
// 处理异常,这里返回了一个错误消息
return "Error: " + exception.getMessage();
} else {
// 正常处理结果
return "Processed result: " + result;
}
});
// 等待并打印handle方法的结果
try {
System.out.println(handledFuture.get()); // 输出可能是 "Processed result: Operation successful" 或 "Error: ..."
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
handleAsync 的具体使用方法
**handleAsync **允许你在 CompletableFuture 的计算无论是正常完成还是抛出异常时,都通过指定的 ExecutorService 异步地执行一个 BiFunction 函数。这个 BiFunction 函数接收两个参数:第一个是 CompletableFuture 的计算结果(如果计算成功完成)或者异常的 Throwable 对象(如果计算抛出异常),第二个参数是结果的类型(T)或者异常的类型(Throwable),但在这个上下文中第二个参数通常被忽略或用作类型推断的辅助。handleAsync 返回一个新的 CompletableFuture,其结果是由提供的 BiFunction 函数产生的。这种方式为处理异步操作的结果或异常提供了极大的灵活性。
@Test
public void testHandleAsync() {
// 创建一个ExecutorService(可选,用于handleAsync)
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个异步任务,这里只是模拟返回了一个结果或抛出了一个异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 假设这里是正常返回结果
return "Operation successful";
// 或者模拟抛出异常
// throw new RuntimeException("Operation failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted", e);
}
});
// 使用handleAsync方法异步处理异步操作完成后的结果或异常,并返回一个新的CompletableFuture
CompletableFuture<String> handledAsyncFuture = future.handleAsync((result, exception) -> {
if (exception != null) {
// 异步处理异常,这里返回了一个错误消息
return "Async Error: " + exception.getMessage();
} else {
// 异步处理正常结果
return "Async Processed result: " + result;
}
}, executor);
// 注意:由于handleAsync是异步的,我们不能直接等待它的结果(至少不能在这里简单地等待),
// 但我们可以做一些其他事情来模拟等待,比如使用CompletableFuture的其他方法或外部同步机制。
// 为了演示,我们简单地打印出异步处理是否已提交(注意,这并不表示处理已完成)
System.out.println("handleAsync future submitted: " + handledAsyncFuture.isDone()); // 初始时应该是false
handledAsyncFuture.join();
System.out.println("handleAsync future submitted: " + handledAsyncFuture.isDone());
executor.shutdown();
}
总结:一元依赖之间的区别
方法 | 描述 | 是否有返回值 | 异步/同步执行 | 接收上一步结果 | 异常处理 |
---|---|---|---|---|---|
thenRun | 在前一个任务完成后执行Runnable任务,不依赖上一步结果,也不返回结果 | 无 | 同步 | 否 | 不处理 |
thenRunAsync | 类似于thenRun,但Runnable任务异步执行 | 无 | 异步 | 否 | 不处理 |
thenApply | 接收上一步结果作为参数,执行一个函数,并返回结果 | 有 | 同步 | 是 | 不处理 |
thenApplyAsync | 类似于thenApply,但函数异步执行 | 有 | 异步 | 是 | 不处理 |
thenCompose | 接收上一步结果作为参数,执行一个返回CompletableFuture的函数,并返回这个CompletableFuture的结果 | 有 | 同步 | 是 | 不处理 |
thenComposeAsync | 类似于thenCompose,但函数异步执行 | 有 | 异步 | 是 | 不处理 |
thenAccept | 接收上一步结果作为参数,执行一个消费操作,无返回值 | 无 | 同步 | 是 | 不处理 |
thenAcceptAsync | 类似于thenAccept,但消费操作异步执行 | 无 | 异步 | 是 | 不处理 |
whenComplete | 在任务完成时执行一个双参数(结果和异常)的Runnable任务,不改变结果或异常 | 无 | 同步 | 是(结果或异常) | 是(通过异常参数) |
whenCompleteAsync | 类似于whenComplete,但Runnable任务异步执行 | 无 | 异步 | 是(结果或异常) | 是(通过异常参数) |
handle | 类似于whenComplete,但提供了返回值,必须返回一个结果 | 有 | 同步 | 是(结果或异常) | 是(通过异常参数) |
handleAsync | 类似于handle,但Runnable任务异步执行 | 有 | 异步 | 是(结果或异常) | 是(通过异常参数) |
1. thenRun/thenRunAsync
-
目的:用于在
CompletableFuture
完成时执行一个Runnable
任务,但不关心原始结果或异常。 -
参数:
Runnable
。 -
返回值:返回一个新的
CompletableFuture<Void>
,它将在原始CompletableFuture
完成后完成,但结果始终为null
。 -
区别:
thenRun
在原始CompletableFuture
的默认执行器上同步执行Runnable
,而thenRunAsync
允许指定一个自定义的Executor
来异步执行Runnable
。
2. thenApply/thenApplyAsync
-
目的:用于在
CompletableFuture
完成时,对其结果进行转换。 -
参数:
Function<? super T, ? extends U>
,其中T
是原始结果的类型,U
是转换后结果的类型。 -
返回值:返回一个新的
CompletableFuture<U>
,它包含转换后的结果。 -
区别:
thenApply
在原始CompletableFuture
的默认执行器上同步执行转换函数,而thenApplyAsync
允许指定一个自定义的Executor
来异步执行转换函数。
3. thenCompose/thenComposeAsync
-
目的:用于在
CompletableFuture
完成时,根据其结果来生成一个新的CompletableFuture
。 -
参数:
Function<? super T, ? extends CompletionStage<U>>
,其中T
是原始结果的类型,U
是新CompletableFuture
的结果类型。 -
返回值:返回一个新的
CompletableFuture<U>
,它代表由转换函数生成的CompletableFuture
的结果。 -
区别:
thenCompose
在原始CompletableFuture
的默认执行器上同步执行转换函数,而thenComposeAsync
允许指定一个自定义的Executor
来异步执行转换函数。
4. thenAccept/thenAcceptAsync
-
目的:用于在
CompletableFuture
完成时,对其结果进行消费,但不返回新的结果。 -
参数:
Consumer<? super T>
,其中T
是原始结果的类型。 -
返回值:返回与原始
CompletableFuture
相同类型的CompletableFuture<T>
,但结果不受影响。 -
区别:
thenAccept
在原始CompletableFuture
的默认执行器上同步执行消费函数,而thenAcceptAsync
允许指定一个自定义的Executor
来异步执行消费函数。
5. whenComplete/whenCompleteAsync
-
目的:用于在
CompletableFuture
完成时执行一个操作,无论其成功还是失败。 -
参数:
BiConsumer<? super T, ? super Throwable>
,其中T
是原始结果的类型,Throwable
是可能的异常。 -
返回值:返回与原始
CompletableFuture
相同类型的CompletableFuture<T>
,但结果不受影响。 -
区别:
whenComplete
在原始CompletableFuture
的默认执行器上同步执行BiConsumer
,而whenCompleteAsync
允许指定一个自定义的Executor
来异步执行BiConsumer
。
6. handle/handleAsync
-
目的:用于在
CompletableFuture
完成时,无论其成功还是失败,都对其结果或异常进行处理,并返回一个新的结果。 -
参数:
BiFunction<? super T, Throwable, ? extends U>
,其中T
是原始结果的类型,Throwable
是可能的异常,U
是新结果的类型。 -
返回值:返回一个新的
CompletableFuture<U>
,它包含处理函数的结果。 -
区别:
handle
在原始CompletableFuture
的默认执行器上同步执行处理函数,而handleAsync
允许指定一个自定义的Executor
来异步执行处理函数。
二元依赖:AND组合关系
thenCombine/thenCombineAsync方法
优化后的表达:
CompletableFuture
提供的 thenCombine
和 thenCombineAsync
方法是处理多个异步操作结果组合的强大工具。当两个 CompletableFuture
实例都成功完成时,这两个方法允许你利用这两个结果来执行一个组合函数,生成一个新的结果。
主要区别:
-
执行方式:
thenCombine
在任一输入的CompletableFuture
完成时,会自动在其默认的ForkJoinPool.commonPool()
上同步执行提供的组合函数。这意味着组合操作会阻塞当前线程(如果当前线程是默认执行器的一部分)直到组合函数完成。相反,thenCombineAsync
允许你指定一个自定义的ExecutorService
来异步执行组合函数,从而不会阻塞调用线程。 -
自定义线程池:
thenCombine
默认使用 Java 提供的公共 ForkJoin 池来执行组合操作,而thenCombineAsync
提供了更大的灵活性,允许你根据应用需求选择或创建自定义的ExecutorService
,以便更好地控制并发级别和资源利用。
返回值:
两者都返回一个新的 CompletableFuture<V>
,其中 V
是由提供的组合函数返回的值的类型。这个新的 CompletableFuture
会在组合函数执行完毕后完成,并携带其结果。
使用场景:
-
并行数据聚合: 当你需要并行地从多个数据源获取数据,并将这些数据聚合成一个统一的结果时,
thenCombine
或thenCombineAsync
非常有用。例如,你可能需要从两个独立的 API 调用中获取用户信息和订单信息,然后将这些信息合并为一个用户概要对象。 -
并行计算: 在进行并行计算时,你可能需要将不同计算任务的结果结合起来以得出最终结果。使用
thenCombine
或thenCombineAsync
可以轻松地将这些异步计算的结果合并。 -
依赖多个异步操作的决策: 在某些情况下,你可能需要等待多个异步操作完成,并根据这些操作的结果来做出决策。
thenCombine
或thenCombineAsync
允许你等待所有操作完成,并同时获取它们的结果,以便进行后续的逻辑处理。 -
链式异步处理: 在复杂的异步处理流程中,你可能需要将多个
CompletableFuture
实例链接在一起,以便一个异步操作的结果能够影响下一个异步操作。虽然thenCombine
或thenCombineAsync
不是专门为此设计的(它们更侧重于并行结果的合并),但它们可以在某些链式处理场景中作为中间步骤使用。
thenCombine 的具体使用方法
thenCombine
方法确保在两个CompletableFuture
实例均完成时,利用这两个实例的结果,在其默认执行器(通常是ForkJoinPool.commonPool())
上同步地执行一个组合函数,以产生一个新的 CompletableFuture
,该 CompletableFuture
包含了组合函数的结果。
@Test
public void thesThenCombine(){
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 2;
});
// 使用 thenCombine 来合并结果
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
// 获取合并后的结果
try {
System.out.println(combinedFuture.get()); // 输出: 3
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
thenCombineAsync 的具体使用方法
thenCombineAsync
方法在两个 CompletableFuture
实例都成功完成时,利用指定的 ExecutorService
来异步地执行一个组合函数。这意味着组合函数的执行不会阻塞当前线程,而是会在提供的执行器上排队等待执行,从而允许程序继续执行其他任务而不必等待组合结果。通过这种方式,thenCombineAsync
提供了更高的灵活性和并发性能,特别是在需要精细控制异步任务执行线程池的情况下。
@Test
public void thesThenCombineAsync(){
// 创建一个ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1;
},executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 2;
},executor);
// 使用 thenCombine 来合并结果
CompletableFuture<Integer> combinedFuture = future1.thenCombineAsync(future2, Integer::sum, executor);
// 获取合并后的结果
try {
System.out.println(combinedFuture.get()); // 输出: 3
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
thenAcceptBoth/thenAcceptBothAsync方法
CompletableFuture
的 thenAcceptBoth
和 thenAcceptBothAsync
方法设计用于处理两个异步操作完成后的情况,它们允许你使用这两个操作的结果来执行一个特定的动作(BiConsumer
函数),而不需要关心结果的进一步组合或转换。这两个方法的核心在于它们的协作性和灵活性。
主要区别:
-
执行方式:
thenAcceptBoth
方法采用同步执行模式,即当任一输入的CompletableFuture
完成时,它会自动在其默认的ForkJoinPool.commonPool()
上执行提供的BiConsumer
函数。这意味着,如果当前线程是ForkJoinPool.commonPool()
的一部分,它可能会被阻塞直到BiConsumer
函数执行完毕。相反,thenAcceptBothAsync
允许你指定一个自定义的ExecutorService
来异步执行BiConsumer
函数,从而避免阻塞调用线程。 -
自定义线程池:
thenAcceptBoth
默认使用 Java 的公共 ForkJoin 池,而thenAcceptBothAsync
提供了更高的灵活性,允许你根据应用需求选择合适的ExecutorService
,以优化资源利用和并发控制。
返回值:
两者都返回 CompletableFuture<Void>
,这反映了 BiConsumer
函数不产生返回值的事实。返回的 CompletableFuture
允许你进一步链接其他异步操作,尽管在这个特定的上下文中,它主要用于表示原始两个 CompletableFuture
实例的完成状态。
使用场景:
-
结果消费: 当两个异步操作的结果需要被消费(例如打印到控制台、更新UI、发送通知等),但不需要将结果作为另一个异步操作的输入时,
thenAcceptBoth
或thenAcceptBothAsync
是非常合适的选择。 -
无返回值操作: 在某些情况下,你可能只需要对两个异步操作的结果执行一些无返回值的操作,如日志记录、性能监控等。此时,使用
thenAcceptBoth
或thenAcceptBothAsync
可以避免不必要的CompletableFuture
链式调用和返回值处理。 -
并行处理与消费: 当你需要并行处理两个异步任务,并在它们完成后立即消费它们的结果时,
thenAcceptBoth
或thenAcceptBothAsync
可以帮助你实现这一目标。这两个方法确保了消费操作只会在两个异步任务都完成后执行。 -
避免阻塞: 虽然
thenAcceptBoth
本身是在其默认执行器(通常是ForkJoinPool.commonPool()
)上同步执行的,但thenAcceptBothAsync
允许你指定一个自定义的Executor
来异步执行消费操作。这可以避免在消费结果时阻塞当前线程,特别是在高并发场景下,这有助于提高程序的响应性和吞吐量。
thenAcceptBoth 的具体使用方法
thenAcceptBoth
方法在两个CompletableFuture
都完成时,会同步地使用这两个 的结果来执行一个BiConsumer
(双参数消费者),即BiAction
(虽然 Java 标准库中不直接称为 BiAction
,但这里可以理解为接受两个参数的操作)。
@Test
public void testThenAcceptBoth(){
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result A";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result B";
});
// 使用 thenAcceptBoth 来消费两个异步操作的结果
CompletableFuture<Void> future = futureA.thenAcceptBoth(futureB, (resultA, resultB) -> {
System.out.println("Received results: " + resultA + " and " + resultB);
});
future.join();
}
thenAcceptBothAsync 的具体使用方法
thenAcceptBothAsync
方法在两个CompletableFuture
实例都成功完成时,会利用用户指定的 ExecutorService
来异步地执行一个 BiConsumer
(或称为 BiAction,尽管在 Java 标准库中更常被称为 BiConsumer)函数。这种异步执行模式允许程序在等待两个异步操作完成的同时,继续执行其他任务,从而提高整体程序的响应性和吞吐量。通过指定自定义的ExecutorService
,开发可以精细控制并发执行的细节,包括线程池的大小、线程的生命周期等,以更好地适应应用的需求。
@Test
public void testThenAcceptBothAsync() {
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result A";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result B";
});
// 或者使用 thenAcceptBothAsync 来异步消费结果
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture<Void> future = futureA.thenAcceptBothAsync(futureB, (resultA, resultB) -> {
System.out.println("Asynchronously received results: " + resultA + " and " + resultB);
}, executor);// 使用自定义的线程池
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
runAfterBoth/runAfterBothAsync方法
CompletableFuture
的 runAfterBoth
和 runAfterBothAsync
方法提供了一种机制,用于在两个异步操作都成功完成后执行一个 Runnable
任务,而这两个异步操作的完成顺序并不重要。这两个方法允许你定义一个在两个 CompletableFuture
实例都结束后自动触发的操作,而无需等待具体的操作结果或进一步处理它们。
主要区别:
-
执行方式:
runAfterBoth
方法会在任一输入的CompletableFuture
完成时,自动在其默认的ForkJoinPool.commonPool()
上同步执行提供的Runnable
任务。这意味着,如果当前线程是ForkJoinPool.commonPool()
的一部分,它可能会被阻塞直到Runnable
任务执行完毕。相反,runAfterBothAsync
允许你指定一个自定义的ExecutorService
来异步执行Runnable
任务,从而避免阻塞调用线程。 -
自定义线程池:
runAfterBoth
默认使用 Java 的公共 ForkJoin 池来执行Runnable
任务,而runAfterBothAsync
提供了更高的灵活性,允许你根据应用需求选择合适的ExecutorService
,以优化资源利用和并发控制。
返回值:
两者都返回 CompletableFuture<Void>
,这反映了 Runnable
任务不产生返回值的事实。返回的 CompletableFuture
允许你进一步链接其他异步操作,尽管在这个特定的上下文中,它主要用于表示原始两个 CompletableFuture
实例的完成状态以及 Runnable
任务的执行。
使用场景
runAfterBoth 使用场景
-
依赖两个异步结果的后续操作:当你有两个异步任务,并且这两个任务的结果都是后续操作所必需的,但你不关心它们的完成顺序时,可以使用
runAfterBoth
。这个后续操作(Runnable
)将在两个任务都完成后同步执行。 -
资源释放或清理:如果你启动了两个异步任务,并且这两个任务都完成后需要执行一些资源释放或清理工作,那么
runAfterBoth
是一个合适的选择。这可以确保无论任务的执行顺序如何,清理工作都会在两个任务都完成后进行。 -
同步更新状态:在需要基于两个异步任务的结果来同步更新某些状态或执行某些操作时,
runAfterBoth
可以确保这些操作在所有必要的数据都准备好之后执行。
runAfterBothAsync 使用场景
-
避免阻塞并优化响应性:与
runAfterBoth
不同,runAfterBothAsync
允许Runnable
异步执行,这意味着它不会阻塞当前线程。这在需要保持应用程序响应性的场景中非常有用,例如,在 GUI 应用程序中或在需要处理多个并发请求的服务器端应用程序中。 -
自定义执行器:
runAfterBothAsync
允许你指定一个自定义的ExecutorService
来控制Runnable
的执行。这提供了额外的灵活性,允许你根据应用程序的需求来优化线程的使用和性能。 -
复杂的异步操作链:在构建复杂的异步操作链时,
runAfterBothAsync
可以帮助你更灵活地组合任务。你可以根据两个或多个异步操作的结果来启动新的异步操作,而无需等待所有操作都完成。
runAfterBoth 的具体使用方法
runAfterBoth
方法在两个CompletableFuture
实例都成功完成时,会触发一个Runnable
任务的同步执行。这里需要注意的是,runAfterBoth
方法并不直接使用这两个 CompletableFuture
的结果作为 Runnable
任务的输入;相反,它只是简单地等待两个异步操作都完成,然后执行提供的Runnable。
假设你正在开发一个在线购物应用程序,并且有两个异步任务:一个用于检查用户的支付状态,另一个用于验证用户的库存可用性。这两个任务都完成后,你需要更新订单状态并通知用户。由于更新订单状态和通知用户不需要等待这两个任务以特定的顺序完成
@Test
public void testRunAfterBoth() {
CompletableFuture<Void> paymentFuture = checkPaymentStatus();
CompletableFuture<Void> inventoryFuture = verifyInventory();
// 使用 runAfterBoth 同步等待两个任务都完成,然后执行后续操作
paymentFuture.runAfterBoth(inventoryFuture, this::updateOrderAndNotifyUser);
//看下模拟的结果,实际这句代码是不需要的,因为runAfterBoth方法本身是异步的
paymentFuture.join();
}
/**
* 模拟检查支付状态的异步方法
*
* @return
*/
private CompletableFuture<Void> checkPaymentStatus() {
return CompletableFuture.runAsync(() -> {
// 模拟支付状态检查
try {
Thread.sleep(1000); // 假设这需要1秒钟
System.out.println("Payment status checked.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
* 模拟验证库存可用性的异步方法
*
* @return
*/
private CompletableFuture<Void> verifyInventory() {
return CompletableFuture.runAsync(() -> {
// 模拟库存验证
try {
Thread.sleep(500); // 假设这需要0.5秒钟
System.out.println("Inventory verified.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
* 更新订单状态和通知用户的方法
*/
private void updateOrderAndNotifyUser() {
// 这里编写更新订单状态和通知用户的逻辑
System.out.println("Order status updated and user notified.");
}
runAfterBothAsync 的具体使用方法
runAfterBothAsync
方法在两个 CompletableFuture
实例都完成时,将使用指定的 ExecutorService
来异步执行一个 Runnable
任务。这个特性允许你在两个异步操作都成功结束后,执行一些不依赖于这两个操作结果的后续操作,同时这些后续操作可以在指定的执行器上异步执行,从而不会阻塞当前线程。这种方法提高了程序的并发性和响应性,使得在等待异步操作完成的同时,可以执行其他不相关的任务。
假设你正在开发一个在线购物应用程序,并且有两个异步任务:一个用于检查用户的支付状态,另一个用于验证用户的库存可用性。这两个任务都完成后,你需要更新订单状态并通知用户。由于更新订单状态和通知用户不需要等待这两个任务以特定的顺序完成
@Test
public void testRunAfterBothAsync() {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> paymentFuture = checkPaymentStatus();
CompletableFuture<Void> inventoryFuture = verifyInventory();
// 使用 runAfterBoth 同步等待两个任务都完成,然后执行后续操作
paymentFuture.runAfterBothAsync(inventoryFuture, this::updateOrderAndNotifyUser,executor);
//看下模拟的结果,实际这句代码是不需要的,因为runAfterBoth方法本身是异步的
paymentFuture.join();
}
/**
* 模拟检查支付状态的异步方法
*
* @return
*/
private CompletableFuture<Void> checkPaymentStatus() {
return CompletableFuture.runAsync(() -> {
// 模拟支付状态检查
try {
Thread.sleep(1000); // 假设这需要1秒钟
System.out.println("Payment status checked.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
* 模拟验证库存可用性的异步方法
*
* @return
*/
private CompletableFuture<Void> verifyInventory() {
return CompletableFuture.runAsync(() -> {
// 模拟库存验证
try {
Thread.sleep(500); // 假设这需要0.5秒钟
System.out.println("Inventory verified.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
* 更新订单状态和通知用户的方法
*/
private void updateOrderAndNotifyUser() {
// 这里编写更新订单状态和通知用户的逻辑
System.out.println("Order status updated and user notified.");
}
总结
方法 | 返回类型 | 执行方式 | 自定义线程池 | 参数类型 | 使用场景 |
---|---|---|---|---|---|
thenCombine | CompletableFuture | 同步(与执行future相同的线程) | 支持 | CompletableFuture<? extends U>,CompletableFuture<? extends V>,BiFunction<? super U,? super V,? extends T> | 当两个CompletableFuture都完成时,以它们的结果作为参数,通过提供的函数生成新的结果,并返回一个新的CompletableFuture。 |
thenCombineAsync | CompletableFuture | 异步(使用线程池) | 支持 | 同上,并额外需要Executor | 类似于thenCombine,但使用线程池中的线程来执行提供的函数。 |
thenAcceptBoth | CompletableFuture | 同步(与执行future相同的线程) | 支持 | CompletableFuture<? extends U>,CompletableFuture<? extends V>,BiConsumer<? super U,? super V> | 当两个CompletableFuture都完成时,以它们的结果作为参数,执行提供的消费操作,不返回结果。 |
thenAcceptBothAsync | CompletableFuture | 异步(使用线程池) | 支持 | 同上,并额外需要Executor | 类似于thenAcceptBoth,但使用线程池中的线程来执行提供的消费操作。 |
runAfterBoth | CompletableFuture | 同步(由最后执行完的异步任务线程执行) | 支持 | CompletableFuture<?>,Runnable | 当两个CompletableFuture都完成时,执行提供的Runnable任务,不接收前面任务的结果,也不返回结果。 |
runAfterBothAsync | CompletableFuture | 异步(使用线程池) | 支持 | 同上,并额外需要Executor | 类似于runAfterBoth,但使用线程池中的线程来执行Runnable任务。 |
解释
-
返回类型:
-
thenCombine
和thenCombineAsync
返回一个新的CompletableFuture<T>
,其中T
是通过提供的函数计算出的新结果类型。 -
thenAcceptBoth
、thenAcceptBothAsync
、runAfterBoth
和runAfterBothAsync
返回CompletableFuture<Void>
,因为它们不产生新的结果。
-
-
执行方式:
-
带有
Async
后缀的方法(如thenCombineAsync
)使用线程池中的线程来异步执行提供的函数或Runnable。 -
不带
Async
后缀的方法(如thenCombine
)使用与执行原始CompletableFuture相同的线程(通常是调用线程或线程池中的线程)来同步执行。
-
-
自定义线程池:
- 所有方法都支持自定义线程池,通过提供额外的
Executor
参数来实现。
- 所有方法都支持自定义线程池,通过提供额外的
-
参数类型:
-
thenCombine
和thenCombineAsync
需要两个CompletableFuture
和一个BiFunction
,用于组合两个结果。 -
thenAcceptBoth
和thenAcceptBothAsync
需要两个CompletableFuture
和一个BiConsumer
,用于消费两个结果。 -
runAfterBoth
和runAfterBothAsync
需要两个CompletableFuture
和一个Runnable
,用于在两者都完成后执行。
-
-
使用场景:
-
thenCombine
和thenCombineAsync
适用于需要组合两个异步操作结果的场景。 -
thenAcceptBoth
和thenAcceptBothAsync
适用于需要消费两个异步操作结果但不产生新结果的场景。 -
runAfterBoth
和runAfterBothAsync
适用于在两个异步操作完成后执行某个不依赖于它们结果的任务。
-
二元依赖:OR组合的关系
applyToEither/applyToEitherAsync方法
applyToEither
和applyToEitherAsync
方法是CompletableFuture
类中用于处理两个异步操作竞争完成情形的工具。当这两个CompletableFuture
实例中的任何一个完成时,这两个方法都会将提供的函数应用于该先完成实例的结果,并返回一个新的CompletableFuture
,该CompletableFuture
将持有应用函数后的结果。
主要区别:
-
执行方式:
applyToEither
以同步方式执行提供的函数,使用默认的ForkJoinPool.commonPool()
。而applyToEitherAsync
则以异步方式执行,可以指定一个自定义的ExecutorService
来控制函数的执行线程,提供了更高的灵活性。 -
线程池:
applyToEither
默认使用公共的ForkJoin池,而applyToEitherAsync
允许你明确指定一个线程池,这对于需要细粒度控制线程使用或避免资源争用的情况非常有用。
返回值:两者都返回一个新的CompletableFuture
实例,该实例的类型是应用函数后的结果类型,使得结果可以进一步链式处理或以非阻塞方式查询。
使用场景:
- 冗余操作
当你执行两个或多个本质上相同或相似的异步操作作为冗余时,可以使用 applyToEither或applyToEitherAsync
来确保即使其中一个操作失败,另一个操作的结果仍然可以被使用。例如,你可能从两个不同的数据源获取相同的数据,但只希望处理第一个返回的数据。
- 优先处理
在某些情况下,你可能有两个操作,其中一个操作在业务逻辑上比另一个更重要或更优先。使用 applyToEither或applyToEitherAsync
,你可以确保一旦优先的操作完成,就立即处理其结果,而忽略其他操作的结果。(比如文件上传,哪个先上传用哪个)
- 并发优化
在并发编程中,有时你可能想要同时启动多个任务,但只关心第一个完成的任务的结果。使用 applyToEither或applyToEitherAsync
可以帮助减少等待时间,因为一旦有一个任务完成,你就可以立即开始处理其结果,而不需要等待其他任务也完成。
- 负载均衡
在分布式系统中,你可能将任务分配给多个节点以进行负载均衡。使用 applyToEither或applyToEitherAsync
,你可以将任务结果的处理逻辑与任务执行本身解耦,从而更容易地管理资源和使用情况。
- 超时处理
虽然 applyToEither或applyToEitherAsync
本身不直接处理超时,但你可以结合使用它来实现超时逻辑。例如,你可以启动一个定时任务来设置一个超时限制,并使用 applyToEither或applyToEitherAsync
来处理两个异步操作:一个是实际的业务操作,另一个是当达到超时时间时触发的回退操作。
applyToEither 的具体使用方法
**applyToEither **方法在两个 **CompletableFuture **实例中任意一个完成时,会立即且同步地执行传递给它的函数,并将该函数的结果封装在一个新的 **CompletableFuture **中返回。
@Test
public void testApplyToEither() {
CompletableFuture<String> smsFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 发送短信的逻辑
return "短信验证码已发送";
});
CompletableFuture<String> emailFuture = CompletableFuture.supplyAsync(() -> {
// 发送邮件的逻辑
return "邮件验证码已发送";
});
// 使用 applyToEither
CompletableFuture<String> firstResponse = smsFuture.applyToEither(emailFuture, result -> result);
System.out.println(firstResponse.join());
}
applyToEitherAsync 的具体使用方法
**applyToEitherAsync **方法在两个 **CompletableFuture **实例中任意一个完成时,会利用指定的 ExecutorService 来异步地执行提供的函数,并将该函数的执行结果封装在一个新的 **CompletableFuture **中返回。
@Test
public void testApplyToEitherAsync() {
CompletableFuture<String> smsFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 发送短信的逻辑
return "短信验证码已发送";
});
CompletableFuture<String> emailFuture = CompletableFuture.supplyAsync(() -> {
// 发送邮件的逻辑
return "邮件验证码已发送";
});
// 或者使用 applyToEitherAsync,并指定自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> firstResponseAsync = smsFuture.applyToEitherAsync(emailFuture, result -> result, executor);
System.out.println(firstResponseAsync.join());
executor.shutdown();
}
acceptEither/acceptEitherAsync方法
ompletableFuture
的 acceptEither
和 acceptEitherAsync
方法允许在两个异步操作中,一旦其中一个操作先完成,就立即执行一个 Consumer
函数式接口,该函数接口处理该操作的结果。这两个方法的主要区别在于执行方式和自定义线程池的使用。
-
执行方式:
acceptEither
使用默认的ForkJoinPool.commonPool()
同步执行Consumer
,即它会阻塞当前线程直到Consumer
完成。而acceptEitherAsync
允许你指定一个自定义的ExecutorService
来异步执行Consumer
,这样Consumer
的执行就不会阻塞当前线程。 -
自定义线程池:
acceptEither
依赖于默认的线程池,而acceptEitherAsync
提供了更大的灵活性,允许你根据应用的需求选择合适的线程池来执行Consumer
。 -
返回值:两者都返回
CompletableFuture<Void>
,因为Consumer
函数式接口设计为接受单个输入参数但不返回任何结果。这允许你进一步对返回的CompletableFuture
进行链式调用或处理,尽管在这种情况下,返回的CompletableFuture
主要用于表示Consumer
的执行状态。
使用场景
-
快速响应:当你需要尽快对某个操作的结果做出响应,而不关心是哪个操作先完成时。例如,在用户界面上,你可能希望无论哪个数据源先返回数据,都立即更新界面,以改善用户体验。
-
冗余操作:当执行两个或多个本质上相同或相似的异步操作作为冗余时,使用
applyToEither
可以确保即使其中一个操作失败,也能立即使用另一个操作的结果。 -
资源限制:在资源受限的环境中,你可能希望减少等待时间,尽快利用可用资源来处理结果。
使用场景:
acceptEither
使用场景
-
即时响应:当你需要立即对某个异步操作的结果做出响应,但不需要将这个响应作为后续操作的一部分时。例如,你可能只是想在日志中记录某个操作的结果,或者更新一个状态标志,而不需要等待这个操作完成后再继续。
-
资源释放:在某些情况下,你可能需要尽快释放与异步操作相关的资源,比如关闭一个数据库连接或文件句柄。使用
acceptEither
可以确保一旦有结果返回,就立即执行清理操作。 -
简化流程:当异步操作的结果处理逻辑相对简单,不需要进一步链式调用或复杂处理时,
acceptEither
提供了一种简洁的方式来处理结果。
2.acceptEitherAsync
使用场景
-
避免阻塞:与
acceptEither
类似,但acceptEitherAsync
允许你以异步方式执行Consumer
操作,从而避免在主线程或关键线程上执行耗时的操作。这对于保持应用的响应性和吞吐量非常重要。 -
自定义线程池:当默认的线程池(如
ForkJoinPool.commonPool()
)不满足你的性能需求或你希望更好地控制资源使用时,acceptEitherAsync
允许你指定一个自定义的ExecutorService
。 -
并发优化:在处理多个并发异步操作时,
acceptEitherAsync
可以帮助你更好地组织代码,确保关键路径上的操作能够尽快完成,同时非关键路径上的操作也不会阻塞主线程或关键线程。
共同点
-
结果处理:两者都允许你对第一个完成的异步操作的结果执行一个
Consumer
操作,但都不返回新的CompletableFuture
。 -
执行方式:
acceptEither
是同步执行Consumer
,而acceptEitherAsync
是异步执行。
acceptEither 的具体使用方法
acceptEither
方法在 CompletableFuture
的两个实例中任意一个完成时,会立即且同步地调用提供的 Consumer
函数,以便处理那个先完成的操作的结果。这个调用会阻塞当前线程,直到 Consumer
执行完毕。
@Test
public void testAcceptEither() {
CompletableFuture<Integer> sendToServer1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Hello, Server 1!");
return 1;
});
CompletableFuture<Integer> sendToServer2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello, Server 2!");
return 2;
});
// 使用 acceptEither 来记录第一个成功发送的消息
sendToServer1.acceptEither(
sendToServer2,
result -> System.out.println("Message sent to server: " + (result == 1 ? "Server 1" : "Server 2"))
);
}
acceptEitherAsync 的具体使用方法
acceptEitherAsync方法允许在两个**CompletableFuture **实例中任意一个完成时,利用指定的 ExecutorService 来异步地执行提供的 **Consumer **函数式接口,该 **Consumer **负责处理完成操作的结果,而不会阻塞当前线程。这种方法提高了应用程序的响应性和并发性能。
@Test
public void testAcceptEitherAsync() {
CompletableFuture<Integer> sendToServer1 = CompletableFuture.supplyAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
System.out.println("Hello, Server 1!");
return 1;
});
CompletableFuture<Integer> sendToServer2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello, Server 2!");
return 2;
});
// 如果你希望以异步方式记录消息,并且想指定线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
sendToServer1.acceptEitherAsync(
sendToServer2,
result -> System.out.println("Message sent to server: " + (result == 1 ? "Server 1" : "Server 2")),
executor
);
executor.shutdown();
// 注意:sendMessageToServer1 和 sendMessageToServer2 方法应该返回 CompletableFuture<Void>
// 并且它们的实现应该能够区分是哪个服务器成功发送了消息(在这个简化的示例中,我们假设 result 总是 null)
}
runAfterEither/runAfterEitherAsync 方法
这两个方法都用于在两个异步操作(CompletableFuture
实例)中的任何一个完成后执行一个 Runnable
。无论这两个异步操作以何种顺序完成,指定的 Runnable
都会执行。
主要区别:
-
执行方式:
-
runAfterEither
:在任一CompletableFuture
完成时,使用默认的ForkJoinPool.commonPool()
同步执行Runnable
。这意味着runAfterEither
的调用线程会阻塞,直到Runnable
执行完毕。 -
runAfterEitherAsync
:也在任一CompletableFuture
完成时执行Runnable
,但它是异步执行的。这意味着runAfterEitherAsync
会立即返回,而Runnable
将在另一个线程(可以是自定义的ExecutorService
中的线程)中执行。
-
自定义线程池:
runAfterEither
不允许你指定执行Runnable
的线程池,它总是使用默认的ForkJoinPool.commonPool()
。
使用场景
runAfterEither 使用场景
-
资源清理:当你启动了两个异步任务,并且无论哪个任务先完成都需要执行一些清理工作时,可以使用
runAfterEither
。例如,你可能需要关闭两个数据库连接,无论哪个查询先完成。 -
状态更新:在异步操作中,你可能需要更新某些状态或发送通知,而这些操作不需要等待两个任务都完成。使用
runAfterEither
可以在任一任务完成时立即进行这些操作。 -
依赖任务的启动:如果你有两个异步任务,并且想要根据它们的任意一个的结果来启动另一个任务(尽管这个新任务不依赖于具体哪个任务的结果),那么
runAfterEither
是一个很好的选择。
runAfterEitherAsync 使用场景
-
避免阻塞:如果你不希望
Runnable
的执行阻塞当前线程(例如,在 GUI 应用程序中或在需要保持高响应性的服务中),那么runAfterEitherAsync
是一个更好的选择。它允许Runnable
在另一个线程中异步执行。 -
自定义执行器:当你想要控制
Runnable
执行的线程池时,runAfterEitherAsync
允许你指定一个自定义的ExecutorService
。这有助于管理资源、限制并发数或优化性能。 -
复合异步操作:在构建复杂的异步操作链时,
runAfterEitherAsync
可以帮助你更灵活地组合任务。你可以根据一个或多个异步操作的结果来启动新的异步操作,而无需等待所有操作都完成。
runAfterEither 的具体使用方法
**runAfterEither **方法在两个 CompletableFuture 实例中的任何一个完成时,会立即且同步地在其默认的 ForkJoinPool.commonPool() 线程池中执行提供的 Runnable 任务,而无需等待另一个 CompletableFuture 完成。这种方式确保了无论哪个任务先完成,后续的操作都能及时执行,同时避免了阻塞当前线程。
假设你正在开发一个电商网站,并且有两个异步任务:一个用于从库存系统获取商品信息,另一个用于从价格系统获取商品价格。无论哪个任务先完成,你都想更新商品详情页面。但是,由于你希望保持页面响应性,因此你不想让更新操作阻塞 UI 线程。
@Test
public void testRunAfterEither() {
// 假设 fetchInventoryInfo 和 fetchPriceInfo 是返回 CompletableFuture<Void> 的异步方法
CompletableFuture<Void> inventoryFuture = fetchInventoryInfo();
CompletableFuture<Void> priceFuture = fetchPriceInfo();
// 使用 runAfterEitherAsync 在任一任务完成时更新页面
CompletableFuture<Void> updateFuture = inventoryFuture.runAfterEither(
priceFuture,
() -> updateProductDetailPage() // 这是更新页面的 Runnable
);
// 注意:在实际应用中,你可能还需要处理 updateFuture 的完成或异常
// 例如,你可以添加异常处理或使用 thenAccept/thenRun 等方法来进一步处理
}
private CompletableFuture<Void> fetchInventoryInfo() {
// 模拟异步获取库存信息
return CompletableFuture.runAsync(() -> {
// ... 从库存系统获取数据 ...
System.out.println("Inventory info fetched.");
});
}
private CompletableFuture<Void> fetchPriceInfo() {
// 模拟异步获取价格信息
return CompletableFuture.runAsync(() -> {
// ... 从价格系统获取数据 ...
System.out.println("Price info fetched.");
});
}
private void updateProductDetailPage() {
// 更新商品详情页面的逻辑
System.out.println("Product detail page updated.");
// 这里应该是实际的 UI 更新逻辑,但在控制台应用程序中只是打印一条消息
}
runAfterEitherAsync 的具体使用方法
**runAfterEitherAsync **方法在两个 **CompletableFuture **实例中的任何一个完成时,会利用指定的 **ExecutorService **异步地执行提供的 **Runnable **任务。这意味着 **Runnable **的执行不会阻塞当前线程,而是会在 **ExecutorService **管理的某个线程中执行,从而提高了程序的并发性和响应性。通过指定 ExecutorService,开发者可以更加灵活地控制任务的执行策略,比如限制并发线程数、优化资源利用等。
假设你正在开发一个电商网站,并且有两个异步任务:一个用于从库存系统获取商品信息,另一个用于从价格系统获取商品价格。无论哪个任务先完成,你都想更新商品详情页面。但是,由于你希望保持页面响应性,因此你不想让更新操作阻塞 UI 线程。
@Test
public void testRunAfterEitherAsync() {
ExecutorService executor = Executors.newFixedThreadPool(10); // 自定义线程池
// 假设 fetchInventoryInfo 和 fetchPriceInfo 是返回 CompletableFuture<Void> 的异步方法
CompletableFuture<Void> inventoryFuture = fetchInventoryInfo();
CompletableFuture<Void> priceFuture = fetchPriceInfo();
// 使用 runAfterEitherAsync 在任一任务完成时更新页面
CompletableFuture<Void> updateFuture = inventoryFuture.runAfterEitherAsync(
priceFuture,
() -> updateProductDetailPage(), // 这是更新页面的 Runnable
executor // 使用自定义线程池
);
// 注意:在实际应用中,你可能还需要处理 updateFuture 的完成或异常
// 例如,你可以添加异常处理或使用 thenAccept/thenRun 等方法来进一步处理
// ... 其他代码 ...
// 清理资源(在适当的时候)
executor.shutdown();
}
private CompletableFuture<Void> fetchInventoryInfo() {
// 模拟异步获取库存信息
return CompletableFuture.runAsync(() -> {
// ... 从库存系统获取数据 ...
System.out.println("Inventory info fetched.");
});
}
private CompletableFuture<Void> fetchPriceInfo() {
// 模拟异步获取价格信息
return CompletableFuture.runAsync(() -> {
// ... 从价格系统获取数据 ...
System.out.println("Price info fetched.");
});
}
private void updateProductDetailPage() {
// 更新商品详情页面的逻辑
System.out.println("Product detail page updated.");
// 这里应该是实际的 UI 更新逻辑,但在控制台应用程序中只是打印一条消息
}
总结
方法名称 | 返回类型 | 执行方式 | 自定义线程池 | 参数类型 | 使用场景 |
---|---|---|---|---|---|
applyToEither | CompletableFuture | 当两个CompletableFuture中的任何一个完成时,将结果应用于提供的函数,并返回一个新的CompletableFuture | 否(默认使用ForkJoinPool.commonPool()) | CompletionStage<? extends T> other, Function<? super T, ? extends U> fn | 当两个异步任务中任意一个完成时,使用完成任务的结果进行进一步处理,并需要新的结果 |
applyToEitherAsync | CompletableFuture | 异步执行applyToEither逻辑,可以指定线程池 | 是(可指定Executor) | CompletionStage<? extends T> other, Function<? super T, ? extends U> fn, Executor executor | 同上,但允许用户指定线程池,以便更好地控制并发和资源使用 |
acceptEither | CompletableFuture | 当两个CompletableFuture中的任何一个完成时,消费结果,但不返回新值 | 否(默认使用ForkJoinPool.commonPool()) | CompletionStage<? extends T> other, Consumer<? super T> action | 当两个异步任务中任意一个完成时,对完成任务的结果进行消费操作,不需要新结果 |
acceptEitherAsync | CompletableFuture | 异步执行acceptEither逻辑,可以指定线程池 | 是(可指定Executor) | CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor | 同上,但允许用户指定线程池 |
runAfterEither | CompletableFuture | 当两个CompletableFuture中的任何一个完成时,运行给定的Runnable | 否(默认使用ForkJoinPool.commonPool()) | CompletionStage<?> other, Runnable runnable | 当两个异步任务中任意一个完成时,执行一些不依赖于任务结果的额外操作 |
runAfterEitherAsync | CompletableFuture | 异步执行runAfterEither逻辑,可以指定线程池 | 是(可指定Executor) | CompletionStage<?> other, Runnable runnable, Executor executor | 同上,但允许用户指定线程池 |
详细说明
-
返回类型:
-
applyToEither
和applyToEitherAsync
返回CompletableFuture<U>
,其中U
是函数处理后的结果类型。 -
acceptEither
和acceptEitherAsync
返回CompletableFuture<Void>
,因为它们只消费结果而不产生新的结果。 -
runAfterEither
和runAfterEitherAsync
也返回CompletableFuture<Void>
,因为它们执行的操作与原始任务的结果无关。
-
-
执行方式:
-
所有方法都提供了异步执行的能力,但
applyToEither
、acceptEither
和runAfterEither
在默认情况下使用ForkJoinPool.commonPool()
作为线程池。 -
带有
Async
后缀的方法允许用户指定一个自定义的Executor
,以控制并发执行和资源使用。
-
-
自定义线程池:
-
默认情况下,不带
Async
后缀的方法使用ForkJoinPool.commonPool()
。 -
带有
Async
后缀的方法允许通过额外的Executor
参数来自定义线程池。
-
-
参数类型:
-
所有方法都需要一个
CompletionStage
作为另一个可能完成的CompletableFuture
。 -
applyToEither
和applyToEitherAsync
需要一个Function
来处理结果。 -
acceptEither
和acceptEitherAsync
需要一个Consumer
来消费结果。 -
runAfterEither
和runAfterEitherAsync
需要一个Runnable
来执行与结果无关的操作。
-
-
使用场景:
-
applyToEither
和applyToEitherAsync
适用于需要从两个异步任务中任意一个的结果进行进一步处理,并生成新结果的场景。 -
acceptEither
和acceptEitherAsync
适用于只需要消费两个异步任务中任意一个的结果,而不关心新结果的场景。 -
runAfterEither
和runAfterEitherAsync
适用于在两个异步任务中任意一个完成后执行一些与结果无关的操作的场景。
-
多元依赖:allOf/anyOf方法
CompletableFuture
的 allOf
和 anyOf
方法是处理多个异步操作完成状态的强大工具。
-
allOf:此方法用于等待所有指定的
CompletableFuture
实例都完成。只有当所有传入的CompletableFuture
对象都达到完成状态时(无论是正常完成还是异常完成),返回的CompletableFuture<Void>
才会完成。这适用于需要所有并行任务都成功完成才能继续执行的场景。 -
anyOf:与
allOf
相反,anyOf
方法等待的是传入的CompletableFuture
实例中任意一个完成。一旦有任何一个CompletableFuture
完成,返回的CompletableFuture<Void>
就会立即完成,而无需等待其他任务。这适用于在多个任务中只要有一个成功完成即可继续处理的场景。
区别:
- 核心区别在于触发条件,
allOf
要求所有任务完成,而anyOf
只要求至少一个任务完成。
返回值:
- 两者都返回一个
CompletableFuture<Void>
类型的对象,该对象不携带任何具体业务数据的返回值,仅用于表示所有或任意一个异步操作的完成状态。
自定义线程池:
allOf
和anyOf
方法本身并不直接执行异步操作,它们只是作为协调机制来等待其他CompletableFuture
实例的完成。这些实例背后的异步操作可以使用自定义的ExecutorService
(线程池)来执行,但allOf
和anyOf
方法的调用和执行是在当前线程上同步进行的,它们仅关注给定CompletableFuture
实例的完成状态。
执行方式:
- 从表面上看,
allOf
和anyOf
的调用似乎是“同步”的,因为它们会阻塞当前线程直到满足特定的完成条件。然而,这并不意味着它们与异步操作本身相冲突或限制了异步操作的并发性。实际上,它们允许开发者以一种声明式的方式处理多个异步操作的完成状态,而无需编写复杂的轮询或等待逻辑。真正的异步操作是在其他地方(可能是通过CompletableFuture
的其他方法,如supplyAsync
、runAsync
等)以非阻塞方式并行执行的。
使用场景
allOf 使用场景
CompletableFuture.allOf
方法适用于你需要等待一组 CompletableFuture
实例全部完成才能继续执行后续逻辑的场景。这种场景通常出现在:
-
并行数据处理:当你需要并行处理多个数据集,并且需要等待所有数据都处理完毕后才能进行下一步操作(如汇总、报告等)时。
-
资源释放:在某些情况下,你可能需要等待所有异步操作完成后才能释放相关资源(如关闭数据库连接、断开网络连接等)。
-
多步骤依赖:在复杂的异步编程中,某个步骤可能依赖于多个异步操作的结果,而这些操作之间没有直接的依赖关系,但需要全部完成后才能继续。
anyOf 使用场景
CompletableFuture.anyOf
方法适用于你只想等待一组 CompletableFuture
实例中的任意一个完成即可的场景。这种场景通常出现在:
-
超时处理:当你执行多个异步任务,并且希望一旦有任何一个任务成功就立即处理结果,而不必等待其他可能失败或耗时的任务时。
-
快速响应:在用户界面或Web应用中,你可能希望尽快响应用户的请求,即使只有一个任务成功完成了,也可以立即向用户展示部分结果或进行下一步操作。
-
冗余操作:在某些情况下,你可能会启动多个相同的异步任务作为冗余,以增加任务成功的可能性。在这种情况下,你只需要等待任务中的任何一个成功完成即可。
allOf 的具体使用方法
CompletableFuture 类提供了多种组合多个 CompletableFuture 实例的方式。然而,CompletableFuture API 本身并没有直接名为 allOf 的方法,但你可以使用 CompletableFuture.allOf 静态方法来实现这个需求。这个方法接受多个 CompletableFuture 实例作为参数,并返回一个新的 CompletableFuture,这个返回的 CompletableFuture 会在所有给定的 CompletableFuture 实例都完成时完成。
@Test
public void testAllOf(){
// 创建几个示例的CompletableFuture实例
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟异步任务
try {
Thread.sleep(1000); // 假设这个任务需要1秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟另一个异步任务
try {
Thread.sleep(500); // 假设这个任务需要0.5秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务2完成";
});
// 使用CompletableFuture.allOf等待所有CompletableFuture完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
// 当所有CompletableFuture都完成时,可以处理结果
allFutures.thenRun(() -> {
System.out.println("所有任务完成");
// 注意:你不能直接从allFutures获取future1和future2的结果,
// 因为allOf返回的是Void的CompletableFuture。
// 你需要单独处理每个future的结果
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// 注意:main线程需要等待,以便看到输出结果
try {
Thread.sleep(2000); // 等待足够长的时间以确保所有任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
anyOf 的具体使用方法
这个方法接受多个CompletableFuture实例作为参数,并返回一个新的CompletableFuture,这个返回的CompletableFuture会在任意一个给定的CompletableFuture实例完成时完成。完成的结果将是首先完成的那个CompletableFuture的结果,但结果会被包装成Object类型,因此你可能需要进行类型转换。
@Test
public void testAnyOf(){
// 创建几个示例的CompletableFuture实例
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟异步任务
try {
Thread.sleep(1000); // 假设这个任务需要1秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟另一个异步任务
try {
Thread.sleep(500); // 假设这个任务需要0.5秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务2完成";
});
// 使用CompletableFuture.anyOf等待任一CompletableFuture完成
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
// 当任一CompletableFuture完成时,处理结果
anyFuture.thenAccept(result -> {
// 因为结果是Object类型,所以需要进行类型转换
if (result instanceof String) {
String firstCompletedResult = (String) result;
System.out.println("任一任务完成,结果是:" + firstCompletedResult);
}
});
// 注意:main线程需要等待,以便看到输出结果
try {
Thread.sleep(2000); // 等待足够长的时间以确保至少有一个任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
注意事项
-
当使用
allOf
或anyOf
时,你需要注意返回的CompletableFuture
的类型(Void
或Object
),并根据需要进行类型转换或检查。 -
anyOf
方法返回的结果需要特别注意,因为它是一个Object
类型,你可能需要进行类型检查或转换才能使用实际的结果。 -
在使用这些方法时,请确保你理解异步编程的复杂性,并妥善处理可能的异常和错误情况。
-
在实际应用中,可能需要结合其他并发工具(如
ExecutorService
、CyclicBarrier
、CountDownLatch
等)来更好地控制和管理异步任务。
异常处理: exceptionally
-
exceptionally
方法用途:CompletableFuture
的exceptionally
方法专门用于处理异步操作中抛出的异常。当CompletableFuture
完成时,如果其计算过程中抛出了异常(即操作失败),则exceptionally
方法中提供的函数将被调用。这个函数接收异常作为参数,并允许你提供一个替代的结果值,该值将作为CompletableFuture
的最终结果。
-
返回值:
exceptionally
方法返回一个CompletableFuture<T>
,其中T
是提供的替代函数返回的类型。这个返回的CompletableFuture
将以正常方式完成,其结果是替代函数提供的值,而不是原始异步操作中抛出的异常。
-
自定义线程池:
- 需要注意的是,
exceptionally
方法本身并不直接涉及线程池的使用。它并不改变CompletableFuture
的执行线程,而是在CompletableFuture
的结果已经确定(无论是正常完成还是异常完成)之后,在调用exceptionally
方法的线程(或默认执行器线程,如果是在另一个线程中调用)上同步执行。因此,它不会启动新的异步任务或改变任务的执行线程。
- 需要注意的是,
-
执行方式:
exceptionally
方法的执行是同步的,但它并不意味着它会阻塞调用线程等待CompletableFuture
完成。相反,它是在CompletableFuture
的结果已经确定(可能是之前某个时间点已经完成的)之后,立即以同步方式在调用它的线程中执行的。如果CompletableFuture
还没有完成,exceptionally
方法会立即返回一个新的CompletableFuture
实例,该实例将在原始CompletableFuture
完成时被解析。
综上所述,
-
CompletableFuture
的exceptionally
方法提供了一种优雅的方式来处理异步操作中抛出的异常。当异步操作完成时,如果发生异常,exceptionally
方法中指定的函数将被调用,该函数可以返回一个替代结果,该结果将作为CompletableFuture
的最终结果。此方法返回一个CompletableFuture<T>
,其中T
是替代函数的返回类型。重要的是要注意,exceptionally
方法的执行是同步的,但它不依赖于特定的线程池,而是在调用它的线程上执行,且仅当原始CompletableFuture
已经完成时才会执行。 -
exceptionally 方法在 CompletableFuture 因异常而未能正常完成时,会自动触发并执行一个指定的函数来处理这个异常。该函数接受异常作为输入,并返回一个替代的结果,该结果随后被用作 CompletableFuture 的最终完成值。这样,即使原始异步操作失败,CompletableFuture 链也能以优雅的方式继续执行,避免了异常的直接传播和中断。
@Test
public void testExceptionally(){
// 创建一个可能失败的CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个可能抛出异常的异步操作
if (Math.random() < 0.5) { // 假设有50%的概率失败
throw new RuntimeException("操作失败");
}
return "操作成功";
});
// 使用exceptionally来处理异常,并返回一个替代的结果
CompletableFuture<String> resultFuture = future.exceptionally(ex -> {
// 处理异常,这里简单地返回一个错误消息作为替代结果
System.err.println("捕获到异常: " + ex.getMessage());
return "操作失败,已恢复";
});
// 等待结果并打印
resultFuture.thenAccept(System.out::println).join();
}
注意事项
1、异常处理范围: exceptionally
方法仅捕获并处理 CompletableFuture
链中当前阶段发生的异常。如果异常发生在链中的更早阶段,并且该阶段的异常没有被 exceptionally
或其他异常处理方法捕获,那么 exceptionally
方法将不会接收到该异常。因此,在复杂的异步任务链中,需要确保每个可能抛出异常的阶段都被适当地处理了。
2、返回值类型: exceptionally
方法返回的 CompletableFuture
的类型与原始 CompletableFuture
相同,但其结果将是 exceptionally
方法中提供的函数返回的值。这意味着你需要确保提供的函数返回的类型与原始 CompletableFuture
的泛型类型兼容。
3、执行时机: exceptionally
方法本身并不改变异步任务的执行逻辑或线程。它只是在异步任务完成后(无论成功还是失败),如果发生了异常,则执行提供的函数。这个函数的执行是在调用 exceptionally
方法的线程上进行的,而不是在原始异步任务的执行线程上。
4、异常丢失: 如果在 exceptionally
方法中简单地返回一个值而没有记录或处理异常(例如,没有打印堆栈跟踪),则可能会丢失关于异常原因的重要信息。因此,即使你打算返回一个默认值,也建议记录异常信息以便后续调试。
5、与其他异常处理方法的比较:
-
handle
方法:与exceptionally
不同,handle
方法可以处理正常完成和异常完成两种情况,并返回一个新的CompletableFuture
。它提供了更灵活的异常处理机制,允许你同时访问结果和异常。 -
whenComplete
方法:虽然whenComplete
方法也可以用于处理正常完成和异常完成,但它不返回CompletableFuture
,而是直接接受一个BiConsumer
,用于在任务完成时执行某些操作。
6、链式调用: exceptionally
方法可以与其他 CompletableFuture
方法(如 thenApply
、thenAccept
等)链式调用,以构建复杂的异步任务链。然而,需要注意异常在链中的传播和处理方式,以确保程序的健壮性和可维护性。
7、性能考虑: 虽然 exceptionally
方法本身对性能的影响可能很小,但在构建复杂的异步任务链时,需要考虑到整个链的性能。过多的异常处理和链式调用可能会增加代码的复杂性和执行时间。
获取任务状态:isCancelled/isCompletedExceptionally
CompletableFuture 类中的 isCancelled() 和 isCompletedExceptionally() 方法用于检查异步任务的状态,但它们检查的状态类型不同。
isCancelled()的具体使用方法
isCancelled() 方法用于检查 CompletableFuture 是否被取消。如果异步任务在执行前或执行过程中被取消(通过调用 cancel(boolean mayInterruptIfRunning) 方法),则 isCancelled() 方法将返回 true。需要注意的是,即使任务被取消,它也可能已经完成(如果取消请求在任务完成后才到达),但这种情况下 isCancelled() 仍然会返回 false,因为任务并不是因为取消而结束的。
@Test
public void testCancel(){
// 创建一个可能抛出异常的CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("操作失败");
}
return "操作成功";
});
// 尝试取消异步任务(注意:这里不能保证一定能够取消,因为任务可能已经完成或即将完成)
future.cancel(false);
// 检查任务是否被取消
boolean isCancelled = future.isCancelled();
System.out.println("任务是否被取消: " + isCancelled);
// 检查任务是否以异常方式完成(这里取决于任务是否实际抛出了异常)
boolean isCompletedExceptionally = future.isCompletedExceptionally();
// 注意:由于cancel调用和future的完成是异步的,这里可能需要额外的同步或等待来确保状态正确反映
// 但为了示例的简洁性,我们直接检查状态
if (isCompletedExceptionally) {
System.out.println("任务以异常方式完成");
// 可以通过future.exceptionally(...)来处理异常,但这里只是检查状态
} else {
// 如果任务正常完成(在这个例子中不太可能,因为我们尝试取消了它),则获取结果
// 注意:这里应该使用try-catch来捕获ExecutionException
try {
String result = future.get(); // 这将抛出ExecutionException如果任务以异常方式完成
System.out.println("任务结果: " + result);
} catch (Exception e) {
System.out.println("任务执行过程中抛出了异常: " + e.getCause().getMessage());
}
}
}
isCompletedExceptionally()的具体使用方法
isCompletedExceptionally() 方法用于检查 CompletableFuture 是否已经以异常方式完成。如果异步任务在执行过程中抛出了未捕获的异常,并且该异常没有被 **exceptionally **方法或其他异常处理机制捕获和处理,则 CompletableFuture 会以异常方式完成。此时,isCompletedExceptionally() 方法将返回 true。如果任务正常完成或被取消,则此方法将返回 false。
@Test
public void testCancel(){
// 创建一个可能抛出异常的CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("操作失败");
}
return "操作成功";
});
// 尝试取消异步任务(注意:这里不能保证一定能够取消,因为任务可能已经完成或即将完成)
future.cancel(false);
// 检查任务是否被取消
boolean isCancelled = future.isCancelled();
System.out.println("任务是否被取消: " + isCancelled);
// 检查任务是否以异常方式完成(这里取决于任务是否实际抛出了异常)
boolean isCompletedExceptionally = future.isCompletedExceptionally();
// 注意:由于cancel调用和future的完成是异步的,这里可能需要额外的同步或等待来确保状态正确反映
// 但为了示例的简洁性,我们直接检查状态
if (isCompletedExceptionally) {
System.out.println("任务以异常方式完成");
// 可以通过future.exceptionally(...)来处理异常,但这里只是检查状态
} else {
// 如果任务正常完成(在这个例子中不太可能,因为我们尝试取消了它),则获取结果
// 注意:这里应该使用try-catch来捕获ExecutionException
try {
String result = future.get(); // 这将抛出ExecutionException如果任务以异常方式完成
System.out.println("任务结果: " + result);
} catch (Exception e) {
System.out.println("任务执行过程中抛出了异常: " + e.getCause().getMessage());
}
}
}
使用场景
isCancelled()
使用场景isCancelled()
方法用于检查 CompletableFuture
是否被取消。当一个 CompletableFuture
被取消时,它不会以正常或异常的方式完成,而是直接进入取消状态。这通常发生在以下场景中:
-
显式取消:你显式地调用了
cancel(boolean mayInterruptIfRunning)
方法来取消任务。如果任务尚未开始执行,或者正在执行且mayInterruptIfRunning
参数为true
且任务支持中断,则任务将被取消。 -
依赖取消:如果
CompletableFuture
依赖于其他CompletableFuture
(例如,通过thenApply
、thenCompose
等方法链式调用),并且其中一个依赖项被取消,那么整个链中的CompletableFuture
也可能会被取消(这取决于具体的链式调用和取消策略)。
使用 isCancelled()
的场景包括:
-
在任务执行过程中,你可能需要定期检查任务是否被取消,以便进行相应的资源清理或状态更新。
-
在任务完成后(无论是正常完成、异常完成还是被取消),你可能需要基于任务的状态(包括是否被取消)来执行不同的逻辑。
isCompletedExceptionally()
使用场景isCompletedExceptionally()
方法用于检查 CompletableFuture
是否以异常方式完成。当一个 CompletableFuture
抛出了异常并且没有通过 exceptionally
方法被捕获和处理时,它将以异常方式完成。使用 isCompletedExceptionally()
的场景包括:
-
在任务完成后,你可能需要检查任务是否以异常方式完成,以便进行相应的错误处理或日志记录。
-
在链式调用中,你可能需要基于某个
CompletableFuture
是否以异常方式完成来决定是否继续执行链中的后续任务,或者是否需要执行一些回退逻辑。
注意事项
**isCancelled()**
** 注意事项**
-
取消状态:
isCancelled()
方法会返回true
如果CompletableFuture
被取消了,并且没有通过完成(无论是正常完成还是异常完成)来覆盖这个取消状态。一旦CompletableFuture
被取消,它就不能再被完成。 -
与
cancel()
方法的关系:你可以通过调用cancel(boolean mayInterruptIfRunning)
方法来尝试取消一个CompletableFuture
。如果任务已经开始执行,并且mayInterruptIfRunning
参数为true
,那么会尝试中断执行该任务的线程(如果它支持中断)。但是,这并不意味着取消操作总是能够成功阻止任务完成;它仅仅是一个请求。 -
不可变性:一旦
CompletableFuture
被取消或完成(无论是正常还是异常),其状态就不能再被改变。
**isCompletedExceptionally()**
** 注意事项**
-
异常完成状态:
isCompletedExceptionally()
方法会返回true
如果CompletableFuture
异常地完成了。这意味着在异步任务执行过程中抛出了异常,并且没有通过调用exceptionally
方法或其他方式提供一个替代结果。 -
与
exceptionally()
方法的关系:如果你希望在CompletableFuture
异常完成时提供一个替代结果,你应该使用exceptionally()
方法。然而,如果exceptionally()
方法已经为异常提供了替代结果,那么isCompletedExceptionally()
仍然会返回false
,因为从外部来看,CompletableFuture
是以正常方式完成的(尽管它实际上完成的是一个替代结果)。 -
结合使用:通常,你会将
isCompletedExceptionally()
与get()
或join()
方法结合使用来检查是否发生了异常,并可能地获取异常信息。但是,请注意,在调用get()
或join()
时,如果CompletableFuture
异常完成,这些方法会抛出ExecutionException
或CompletionException
,你可以捕获这些异常来获取原始异常。
综合注意事项
-
线程安全性:
CompletableFuture
的状态是线程安全的,你可以在多线程环境中安全地检查其状态。 -
避免重复检查:在编写逻辑时,尽量避免在检查状态后立即再次检查状态,因为这可能导致竞态条件,特别是当
CompletableFuture
的状态可能在两次检查之间发生变化时。 -
处理异常:当使用
CompletableFuture
时,始终准备好处理可能的异常,无论是通过exceptionally()
方法还是通过捕获get()
或join()
方法抛出的异常。 -
链式调用:利用
CompletableFuture
的链式调用功能来简化异步编程模型,但请注意在链中适当地处理异常。
状态监控:getNumberOfDependents
CompletableFuture
的 getNumberOfDependents
方法返回一个整数,表示有多少异步操作或者依赖是注册在当前 CompletableFuture
实例上的。
@Test
public void testGetNumberOfDependents(){
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
});
// 添加一些依赖操作
future.thenApply(String::toUpperCase);
future.exceptionally(e -> {
System.out.println("异常为: " + e.getMessage());
return null;
});
// 获取依赖的数量
int numberOfDependents = future.getNumberOfDependents();
System.out.println("依赖数量: " + numberOfDependents);
}
强制处理:obtrudeValue/obtrudeException方法
obtrudeValue
方法用于将 CompletableFuture
的结果强制设置为提供的值。如果 CompletableFuture
已经以任何方式完成(无论是正常完成、异常完成还是被取消),那么调用 obtrudeValue
将不会有任何效果。
obtrudeValue(T value)的具体使用方法
obtrudeValue
方法用于将CompletableFuture
的结果强制设置为提供的值。如果CompletableFuture
已经以任何方式完成(无论是正常完成、异常完成还是被取消),那么调用obtrudeValue
将不会有任何效果。
@Test
public void testObtrudeValue() {
CompletableFuture<String> future = new CompletableFuture<>();
future.obtrudeValue("Hello");
future.thenAccept(System.out::println);
}
obtrudeException(Throwable throwable)的具体使用方法
obtrudeException
方法用于将CompletableFuture
的结果强制设置为由提供的异常引起的异常完成。同样,如果CompletableFuture
已经完成,调用obtrudeException
将不会有任何效果。
@Test
public void testObtrudeException() {
CompletableFuture<String> future = new CompletableFuture<>();
future.obtrudeException(new RuntimeException("dasdas"));
future.exceptionally(e -> {
System.out.println(e.getMessage());
return null;
});
}
使用场景
-
obtrudeValue 和 obtrudeException 可以在需要覆盖 CompletableFuture 的完成状态时使用,例如,在某些错误恢复场景中,你可能需要将 CompletableFuture 强制设置为成功或失败状态。
-
这些方法也可以用来在程序的其他部分手动触发 CompletableFuture 的完成,即使异步操作尚未开始或已经取消。
注意事项
-
obtrudeValue 和 obtrudeException 应该谨慎使用,因为它们会覆盖 CompletableFuture 的自然完成状态,可能导致程序逻辑的混乱。
-
一旦 CompletableFuture 完成,无论是通过正常完成、异常完成、取消还是通过这些方法之一,它的状态就不能被改变了。
-
这些方法适用于需要手动干预异步操作结果的场景,例如,在某些错误恢复策略中,或者当你需要根据外部事件强制完成 CompletableFuture 时。
在实际编程中,通常推荐使用 CompletableFuture 的标准完成方法,如 complete 和 completeExceptionally,因为它们更符合 CompletableFuture 的预期使用模式。obtrudeValue 和 obtrudeException 应该用于特定的、非标准的场景。
实战
场景1
需要查询一个订单信息,首先需要查询商品信息,然后查询支付信息,最后汇总成一个对象返回。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "商品信息")
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "支付信息"), (a, b) -> {
// 组装信息
return a + b;
});
log.info("ret =========>{}",future.get());
场景2
用户注册,首先需要校验用户信息,然后生成账号信息,最后保存到数据库。这三个操作互相依赖。
// A -> B-> C
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
if (ThreadLocalRandom.current().nextBoolean()){
return;
}
throw new RuntimeException("该手机号码已经注册");
}).thenCompose(ret -> CompletableFuture.supplyAsync(() -> {
if (ThreadLocalRandom.current().nextBoolean()) {
// 生成账号信息
return "账号信息: 16289";
}
throw new RuntimeException("账号信息生成失败。。");
})).thenApplyAsync(ret -> {
// 保存账号信息
log.info("保存账号信息->{}", ret);
return "注册成功";
}).exceptionally(e -> "注册失败" + e.getMessage());
log.info("最终返回结果:===》 {}",future.get());
场景3
private List<DeviceProcessListExportVO> listByFlowJobIds(List<String> flowJobIds, Map<String, ProcessInfoVo> map, Map<Integer,UserInfoDTO> userInfoDTOMap, Map<Integer,HatCity> cityMap){
//result 列表保存组装完成的数据
List<DeviceProcessListExportVO> result = new LinkedList<>();
//每次组装100条数据
List<List<String>> partition = Lists.partition(flowJobIds,100);
List<CompletableFuture> futures = partition.stream().map(subList -> CompletableFuture.supplyAsync(() -> {
//packVOs 方法就是组装数据
return packVOs(subList,map,userInfoDTOMap,cityMap);
},ASYNC_IO_POOL).whenCompleteAsync((r,e)->result.addAll(r))
.exceptionally(e->{
log.error(e.getMessage(),e);
log.error("listByFlowJobIds error.");
return result;
})).collect(Collectors.toList());
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
log.info("任务阻塞 ");
Instant start = Instant.now();
//阻塞,直到所有任务结束。
all.join();
log.info("任务阻塞结束 耗时 = {}",ChronoUnit.MILLIS.between(start, Instant.now()));
return result;
ist<DeviceProcessListExportVO> result = new LinkedList<>();
//每次组装100条数据
List<List<String>> partition = Lists.partition(flowJobIds,100);
List<CompletableFuture> futures = partition.stream().map(subList -> CompletableFuture.supplyAsync(() -> {
//packVOs 方法就是组装数据
return packVOs(subList,map,userInfoDTOMap,cityMap);
},ASYNC_IO_POOL).whenCompleteAsync((r,e)->result.addAll(r))
.exceptionally(e->{
log.error(e.getMessage(),e);
log.error("listByFlowJobIds error.");
return result;
})).collect(Collectors.toList());
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
log.info("任务阻塞 ");
Instant start = Instant.now();
//阻塞,直到所有任务结束。
all.join();
log.info("任务阻塞结束 耗时 = {}",ChronoUnit.MILLIS.between(start, Instant.now()));
return result;
}
具体步骤如下:
-
将原始数据按照每组100条进行拆分。(具体每组拆分多少条需要根据实际的业务情况和服务器性能,多测试一下应该就知道了)
-
多线程组成数据,每个线程组装一组数据(上面拆分的100条原始数据)。packVOs 方法就是组装数据。为了高效,我建议 在组装数据的时候多采用批量,缓存的思想,能批量尽量批量,重复数据就尽量缓存下来。
-
CompletableFuture.supplyAsync() 方法说明如下。第一个参数是线程需要执行的动作,第二个参数是线程执行用的Executor,可以填自定义的,也可以不填写,不填写程序会使用默认的执行器。
-
whenCompleteAsync 方法含义和名字一样,将上一步执行的结果或者异常作为参数传给指定的参数。这里我们希望分批组装的结果能过add进result中。
-
exceptionally 是用来处理异常。当一个线程执行出现异常的时候应该执行怎样的操作。
-
all.join() 这个方法是等待所有的任务(所有的CompletableFuture)完成。组装数据是耗时的,如果我们不等待所有组装任务完成,直接返回result,相信result中不会有数据,或者数据是不完整的。我们期待的结果是所有的数据都正常组装完成,添加进result。
避坑指南
避免阻塞主线程
CompletableFuture
作为CompletionStage
接口的实现,提供了一套全面的回调机制,支持多种灵活的组合操作模式,每种组合场景均配备有同步和异步两种执行方式,以满足不同场景下的性能需求。
同步方法(非Async后缀):
-
即时执行:若注册回调时,所依赖的
CompletableFuture
操作已完成,则回调逻辑将由当前线程直接执行,无需等待或切换线程,从而提高执行效率。 -
延迟执行:若注册时,所依赖的操作尚未完成,则回调将暂时挂起,直至依赖的操作完成后,再由完成该操作的线程(即回调线程)执行回调逻辑,避免了不必要的线程切换开销。
异步方法(带Async后缀):
-
灵活线程控制:这些方法允许开发者指定
Executor
来执行回调逻辑,从而提供了对执行线程的精确控制。通过传递特定的线程池参数,可以轻松实现复杂的多线程调度策略。 -
默认线程池:若不显式指定
Executor
,CompletableFuture
将默认使用ForkJoinPool
中的CommonPool
来执行回调。CommonPool
的大小默认为CPU核心数减一,适用于计算密集型任务。然而,对于IO密集型应用,由于CommonPool
的线程数可能不足以支撑大量的IO操作,因此可能成为性能瓶颈。在这种情况下,建议通过传递自定义的线程池来避免潜在的性能问题。
总结:要用自定义线程池,并且最好,设置自定义线程池的名字,方便知道是哪一个线程执行的,出问题也好排查。
正确处理异常
由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。使用方法如下所示:
public CompletableFuture<Integer> getThirdCallback() {
//业务方法,内部会发起异步rpc调用
-- 此处省略业务的rpc调用
return remarkResultFuture
.exceptionally(err -> {
//通过exceptionally 捕获异常,打印日志并返回默认值
log.error("XX类.XX方法 exception:",err);
return 0;
});
}
有一点需要注意,CompletableFuture在回调方法中对异常进行了包装。大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常。但是,有些情况下会直接返回真正的异常,最好使用工具类提取异常,如下代码所示:
public CompletableFuture<Integer> getThirdCallback() {
//业务方法,内部会发起异步rpc调用
-- 此处省略业务的rpc调用
return remarkResultFuture
.exceptionally(err -> {
//通过exceptionally 捕获异常,打印日志并返回默认值
log.error("XX类.XX方法 exception:",ExceptionUtils.extractRealException(err));
return 0;
});
}
上面代码中用到了一个自定义的工具类ExceptionUtils,用于CompletableFuture的异常提取,在使用CompletableFuture做异步编程时,可以直接使用该工具类处理异常。实现代码如下:
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
异步回调中显式传递线程池参数
在使用异步回调方法时,虽然可以选择是否传递Executor
线程池参数,但出于性能和资源隔离的考虑,我们强烈建议总是显式地传递线程池参数(最好强制使用)。这一做法有助于避免默认使用ForkJoinPool
的公共线程池CommonPool
可能带来的问题。CommonPool
虽然为所有异步任务提供了一个共享的线程资源,但其核心线程数通常设置为处理器数量减一(对于单核处理器则为1),这意味着在高并发或IO密集型的场景下,所有异步回调都将竞争这些有限的线程资源,极易导致性能瓶颈和延迟增加。
通过手动传递线程池参数,开发者可以更加灵活地控制回调任务的执行线程,包括但不限于调整线程池大小、设置线程优先级,设置线程的名字等。更重要的是,这种方式允许我们根据业务场景的不同,为不同的业务逻辑分配独立的线程池,实现资源隔离。这样一来,不同业务之间的异步回调将不会相互干扰,从而提升了系统的稳定性和可维护性。
因此,为了优化系统性能、实现资源的有效利用和业务间的良好隔离,我们建议在异步回调中始终显式地传递线程池参数。
异步RPC调用注意不要阻塞IO线程池
在将服务异步化后,多个服务流程步骤常常依赖于异步RPC调用的结果,特别是当采用基于NIO(如Netty)的异步RPC框架时,这一点尤为重要。这类框架中,RPC调用的返回结果通常是由专门的IO线程负责处理的,这意呀着回调方法(包括CompletableFuture的同步回调,如thenApply
、thenAccept
等不带Async
后缀的方法)将由这些IO线程直接触发。
由于整个服务通常共享一个有限的IO线程池,因此必须谨慎处理同步回调中的逻辑。如果同步回调中包含了阻塞或耗时过长的操作,这些操作将直接阻塞IO线程,导致该线程无法处理其他IO事件,进而影响整个服务的响应能力和吞吐量。
为了避免这种情况,应当确保同步回调中的逻辑尽可能轻量,避免任何形式的阻塞操作。对于确实需要执行耗时操作的情况,应考虑将这些操作异步化,即利用另一个线程池来执行这些操作,并通过异步回调(如CompletableFuture的**thenApplyAsync**
、**thenAcceptAsync**
等带**Async**
后缀的方法)将结果返回给原始流程。这样做不仅可以避免阻塞IO线程,还能更好地利用多核CPU的优势,提升服务的整体性能。
总之,在使用基于NIO的异步RPC时,必须特别注意同步回调对IO线程池的影响,确保回调逻辑的高效性和非阻塞性,以保障服务的稳定运行和高效响应。
防止资源泄露
异步编程中资源泄露是一个常见问题。使用CompletableFuture
时,虽然它本身不直接涉及资源关闭操作(如文件流、数据库连接等),但应确保所有由异步任务创建的资源在使用完毕后得到正确释放。这通常通过确保资源在不再需要时显式关闭,或在CompletableFuture
链的末尾添加清理逻辑来实现。对于资源密集型操作,还需考虑使用适当的资源管理机制,如连接池,以减少资源消耗和泄露的风险。
线程池循环引用会导致死锁
问题:
在使用supplyAsync
向ThreadPoolKit.get(ThreadPoolConstant.TENGXUN_TASK_THREAD_POOL)
请求线程执行父任务时,如果父任务内部又进一步向同一线程池提交子任务,且线程池大小有限(如26个线程),就存在潜在的死锁风险。当线程池被全部占用,新到达的子任务将被放入阻塞队列等待空闲线程,但此时如果父任务的完成依赖于这些尚未执行的子任务,就会导致父任务也无法完成,进而阻塞了所有等待父任务结果的线程,包括可能的主线程在执行completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList())
时也会被永久阻塞。
为了修复这一问题,应当采取线程池隔离的策略,即父任务与子任务应当使用不同的线程池来执行。这样做可以确保父任务的执行不会受到子任务线程池状态的影响,反之亦然,从而避免了因线程资源竞争和循环依赖导致的死锁问题。
具体实现时,可以为父任务定义一个专用的线程池(如ThreadPoolKit.get(ThreadPoolConstant.TENGXUN_TASK_THREAD_POOL)
),而为子任务定义另一个线程池(如继续使用或定义新的ThreadPoolKit.get(ThreadPoolConstant.TENGXUN_SUB_TASK_THREAD_POOL)
)。这样,父任务可以并发执行而不受子任务执行进度的影响,同时子任务也有足够的资源去处理,从而保证了整个异步处理流程的顺畅进行。
通过实施这种线程池隔离策略,可以显著提升应用程序的健壮性和响应能力,避免由于线程资源不当管理导致的性能瓶颈和潜在的运行时错误。
解决方案
总结
高效利用CompletableFuture
进行异步编程,关键在于避免阻塞主线程、妥善处理异常、有效管理多个异步任务以及防止资源泄露。通过遵循这些最佳实践,可以显著提升应用程序的并发性能、响应速度和稳定性。
沉淀的工具类
CompletableFutureUtils 工具类
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.BinaryOperator;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* CompletableFuture封装工具类
*
* @author zhouhengzhe
*/
@Slf4j
@UtilityClass
public class CompletableFutureUtils {
/**
* 设置CF状态为失败
*/
public <T> CompletableFuture<T> failed(Throwable ex) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(ex);
return completableFuture;
}
/**
* 设置CF状态为成功
*/
public <T> CompletableFuture<T> success(T result) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.complete(result);
return completableFuture;
}
/**
* 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>
*/
public <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) {
return sequence(completableFutures, t -> true);
}
/**
* 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并支持添加自定义的过滤策略
*
* @Param filterFunction 自定义过滤策略
*/
public <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures,
Predicate<? super T> filterFunction) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures
.stream()
.map(CompletableFuture::join)
.filter(filterFunction)
.collect(Collectors.toList())
);
}
/**
* 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并过滤调null值
*/
public <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
}
/**
* 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>
* 多用于分页查询的场景
*/
public <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) {
return sequenceList(completableFutures, t -> true);
}
/**
* 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并过滤调null值
* 多用于分页查询的场景
*/
public <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) {
return sequenceList(completableFutures, Objects::nonNull);
}
/**
* 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并支持添加自定义的过滤策略
*
* @Param filterFunction 自定义过滤策略
*/
public <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures,
Predicate<? super T> filterFunction) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap(listFuture -> listFuture.join().stream().filter(filterFunction))
.collect(Collectors.toList())
);
}
/**
* 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>,使用提供的合并函数处理键冲突
*
* @Param mergeFunction 自定义key冲突时的merge策略
*/
public <K, V> CompletableFuture<Map<K, V>> sequenceMap(
Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures
.stream()
.map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, mergeFunction)));
}
/**
* 将List<CompletableFuture<Map<K,V>>>转为 CompletableFuture<Map<K,V>>。 多个map合并为一个map。
* 如果key冲突,采用新的value覆盖。(使用后加入的 value 覆盖现有的 value)
*/
public <K, V> CompletableFuture<Map<K, V>> sequenceMapOverwrite(
Collection<CompletableFuture<Map<K, V>>> completableFutures) {
return sequenceMap(completableFutures, (oldValue, newValue) -> newValue);
}
/**
* 将Set<CompletableFuture<T>>转为 CompletableFuture<Set<T>>。
*
* @param completableFutures
* @param <T>
* @return
*/
public <T> CompletableFuture<Set<T>> sequenceSet(Collection<CompletableFuture<T>> completableFutures) {
return sequenceSet(completableFutures, t -> true);
}
/**
* 将Set<CompletableFuture<T>>转为 CompletableFuture<Set<T>>。
*
* @param completableFutures
* @param <T>
* @return
*/
public <T> CompletableFuture<Set<T>> sequenceSet(Collection<CompletableFuture<T>> completableFutures,
Predicate<? super T> filterFunction) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(filterFunction)
.collect(Collectors.toSet())
);
}
}
CompletableFuture异常捕获工具类
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
/**
* @author zhouhengzhe
*/
public class ExceptionUtils {
/**
* 提取真正的异常
*/
public static Throwable extractRealException(Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
异常日志处理
顶层日志类
package com.changzhi.utils.log;
import com.changzhi.exception.ApiException;
import com.changzhi.utils.ExceptionUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhouhengzhe
*/
@Slf4j
@AllArgsConstructor
public abstract class AbstractLogAction<R> {
protected final String methodName;
protected final Object[] args;
protected void logResult(R result, Throwable throwable) {
if (throwable != null) {
boolean isBusinessError = throwable.getCause() != null;
if (isBusinessError) {
logBusinessError(throwable);
//这里为内部rpc框架抛出的异常,使用时可以酌情修改
} else if (throwable instanceof ApiException) {
log.error("{} degrade exception, param:{} , error:{}", methodName, args, throwable.getMessage(), throwable);
} else {
log.error("{} unknown error, param:{} , error:{}", methodName, args, throwable.getMessage(), ExceptionUtils.extractRealException(throwable));
}
} else {
log.info("{} param:{}", methodName, args);
}
}
private void logBusinessError(Throwable throwable) {
log.error("{} business error, param:{} , error:{}", methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));
}
}
whenComplete场景下的异常处理
import com.changzhi.utils.log.AbstractLogAction;
import lombok.extern.slf4j.Slf4j;
import java.util.function.BiConsumer;
/**
* 发生异常时,根据是否为业务异常打印日志。
* 跟CompletableFuture.whenComplete配合使用,不改变completableFuture的结果(正常OR异常)
*
* @author zhouhengzhe
*/
@Slf4j
public class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable> {
public LogErrorAction(String methodName, Object... args) {
super(methodName, args);
}
@Override
public void accept(R result, Throwable throwable) {
logResult(result, throwable);
}
}
handle场景下的处理
import com.changzhi.utils.log.AbstractLogAction;
import java.util.Arrays;
import java.util.function.BiFunction;
/**
* 当发生异常时返回自定义的值
*
* 跟CompletableFuture.handle配合使用,可以实现对异常的处理
*
* @author zhouhengzhe
*/
public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {
private final R defaultValue;
/**
* 当返回值为空的时候是否替换为默认值
*/
private final boolean isNullToDefault;
/**
* @param methodName 方法名称
* @param defaultValue 当异常发生时自定义返回的默认值
* @param args 方法入参
*/
public DefaultValueHandle(String methodName, R defaultValue, Object... args) {
super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = false;
}
/**
* @param isNullToDefault
* @param defaultValue 当异常发生时自定义返回的默认值
* @param methodName 方法名称
* @param args 方法入参
*/
public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {
super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = isNullToDefault;
}
@Override
public R apply(R result, Throwable throwable) {
logResult(result, throwable);
if (throwable != null) {
return defaultValue;
}
if (result == null && isNullToDefault) {
return defaultValue;
}
return result;
}
public static <R> DefaultValueHandle.DefaultValueHandleBuilder<R> builder() {
return new DefaultValueHandle.DefaultValueHandleBuilder<>();
}
public static class DefaultValueHandleBuilder<R> {
private boolean isNullToDefault;
private R defaultValue;
private String methodName;
private Object[] args;
DefaultValueHandleBuilder() {
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> isNullToDefault(final boolean isNullToDefault) {
this.isNullToDefault = isNullToDefault;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> defaultValue(final R defaultValue) {
this.defaultValue = defaultValue;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> methodName(final String methodName) {
this.methodName = methodName;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> args(final Object... args) {
this.args = args;
return this;
}
public DefaultValueHandle<R> build() {
return new DefaultValueHandle<R>(this.isNullToDefault, this.defaultValue, this.methodName, this.args);
}
public String toString() {
return "DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault=" + this.isNullToDefault + ", defaultValue=" + this.defaultValue + ", methodName=" + this.methodName + ", args=" + Arrays.deepToString(this.args) + ")";
}
}
}
使用案例
public static void main(String[] args) {
CompletableFuture<String> cf=new CompletableFuture<>();
// cf.complete("123");
cf.completeExceptionally(new ApiException("123"));
cf.handle(new DefaultValueHandle<>("orderService.getOrder", Collections.emptyMap(), "yes"));
}
参考
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
扩展
可以学习到的体系
-
项目完全从0到1开始架构,包含前端,后端,架构,服务器,技术管理相关运维知识!
- 最佳包名设计,项目分层
-
破冰CRUD,手撕中间件!
-
基于MybatisPlus封装属于自己的DDD ORM框架
-
基于Easyexcel封装属于自己的导入导出组件
-
oss对象存储脚手架(阿里云,minio,腾讯云,七牛云等)
-
邮件脚手架
-
completefuture脚手架
-
redis脚手架
-
xxl-job脚手架
-
短信脚手架
-
常用工具类等
-
-
传统MVC代码架构弊端的解决方案
- DDD+CQRS+ES最难架构
-
结合实际代码的业务场景
-
多租户单点登录中心
-
用户中台
-
消息中心
-
配置中心
-
监控设计
-
-
程序员的职业规划,人生规划
-
打工永远没有出路!
-
打破程序员的35岁魔咒
-
技术带给你的优势和竞争力【启发】
-
万物互联网的淘金之路!
-
技术以外的赚钱路子
可以一起沟通
具体的文章目录
购买链接
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)