CompletableFuture的cancel和handleAsync的一个小坑
CompletableFuture的cancel和handleAsync的一个小坑
环境
- Ubuntu 22.04
- Java 17.0.3.1
- IntelliJ IDEA 2022.1.3
背景
最近遇到了一个CompletableFuture的一个小坑,记录一下,顺便复习一下CompletableFuture。
简单的说,就是调用CompletableFuture的 cancel()
方法之后,貌似没有触发 handleAsync()
所指定的回调方法。经过一番测试和研究,最终发现是我自己的问题。
测试1
首先写了一个最简单的测试程序:通过CompletableFuture的 supplyAsync()
方法运行一个task runTask()
,并通过CompletableFuture的 handleAsync()
方法添加一个回调函数 handleResult()
,用于处理task的运行结果。代码如下:
public class Test0524 {
private String runTask() {
System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("runTask interrupted! " + System.currentTimeMillis());
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": runTask end " + System.currentTimeMillis());
return "task result: success";
}
public void test() {
System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runTask());
future.handleAsync((result, throwable) -> handleResult(result, throwable));
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
}
private String handleResult(String result, Throwable throwable) {
System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);
System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());
return "handle result: success";
}
public static void main(String[] args) {
Test0524 obj = new Test0524();
obj.test();
}
}
注意:在task里sleep了3秒钟,以模拟task需要做一些事情。同时,在主线程里sleep了4秒钟,以确保主线程在task之后结束。如果主线程比task早结束,则整个程序随着主线程一起结束了。
运行程序,结果如下:
main: test start 1684930792470
ForkJoinPool.commonPool-worker-1: runTask start 1684930792495
ForkJoinPool.commonPool-worker-1: runTask end 1684930795496
ForkJoinPool.commonPool-worker-2: handleResult start 1684930795498
ForkJoinPool.commonPool-worker-2: result = task result: success, throwable = null
ForkJoinPool.commonPool-worker-2: handleResult end 1684930795510
main: test end 1684930796496
可见,主线程、task、result handler分别为3个不同的线程,这是因为使用了 supplyAsync()
和 handleAsync()
。顾名思义,它们都是以“Async”结尾,表示异步线程。
本次测试没有问题,程序工作正常。
测试2
在task运行的过程中,把task cancel掉。
修改 test()
方法如下:
public void test() {
System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());
CompletableFuture<String> future;
future = CompletableFuture.supplyAsync(() -> runTask());
future.handleAsync((result, throwable) -> handleResult(result, throwable));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ": cancel start " + System.currentTimeMillis());
boolean cancel = future.cancel(true);
System.out.println(Thread.currentThread().getName() + ": cancel end " + System.currentTimeMillis() + ", result = " + cancel);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
}
注意:因为task持续运行3秒钟,所以在主线程里先sleep了2秒钟,以确保task运行起来,然后cancel task,最后再sleep 2秒钟,确保task有充足的时间运行结束。
运行程序,结果如下:
main: test start 1684931916879
ForkJoinPool.commonPool-worker-1: runTask start 1684931916899
main: cancel start 1684931918901
ForkJoinPool.commonPool-worker-2: handleResult start 1684931918902
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
ForkJoinPool.commonPool-worker-2: handleResult end 1684931918914
main: cancel end 1684931918902, result = true
ForkJoinPool.commonPool-worker-1: runTask end 1684931919900
main: test end 1684931920917
从结果可见, cancel()
操作立即触发了 handleAsync()
回调方法,二者只差1毫秒。
然而,我们可以看到,实际上task并没有受到影响,它仍然在运行。尽管给 cancel()
方法的参数(表示 mayInterruptIfRunning
)传了true值,但是并没有给task发中断信号(如果有中断信号,会被task的sleep方法捕获)。
JDK文档里是这样说明的:
mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.
所以,实际的行为是:在运行的task上调用 cancel()
方法时,task运行本身并不受影响,而会立即触发其 handle()
/ handleAsync()
所指定的回调方法。当然,task运行结束后,就不再触发回调方法了。
handle()
/ handleAsync()
的回调方法有两个参数:一个是task的运行结果,另一个是error(Throwable)。正常情况下error是null,异常情况下运行结果是null。
从运行结果可见,cancel task之后,Throwable是 CancellationException
,而运行结果是null。
测试3
task运行过程中出现异常。
修改 runTask()
方法如下:
private String runTask() {
System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("runTask interrupted! " + System.currentTimeMillis());
e.printStackTrace();
}
throw new RuntimeException("runTask exception!");
}
注意:task的sleep时间改为1秒,表示在task cancel之前就出现异常。
运行程序,结果如下:
main: test start 1684933211432
ForkJoinPool.commonPool-worker-1: runTask start 1684933211450
ForkJoinPool.commonPool-worker-2: handleResult start 1684933212454
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CompletionException: java.lang.RuntimeException: runTask exception!
ForkJoinPool.commonPool-worker-2: handleResult end 1684933212487
main: cancel start 1684933213452
main: cancel end 1684933213453, result = false
main: test end 1684933215484
可见:
- task抛出异常后,立即触发了
handleAsync()
的回调方法,task是异常结束的,结果为null,Throwable为CompletionException
(caused byRuntimeException
)。 - task在cancel之前已经结束了,所以cancel task无效,
cancel()
方法返回值为false。
测试4
同测试3,唯一区别是先cancel,然后task运行过程中才出现异常。
把 runTask()
方法里的sleep时间改为3秒。
运行程序,结果如下:
main: test start 1684933790852
ForkJoinPool.commonPool-worker-1: runTask start 1684933790878
main: cancel start 1684933792880
ForkJoinPool.commonPool-worker-2: handleResult start 1684933792884
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
main: cancel end 1684933792883, result = true
ForkJoinPool.commonPool-worker-2: handleResult end 1684933792904
main: test end 1684933794904
可见,cancel操作立即触发了 handleAsync()
的回调方法。当然,实际上task还在运行,并且抛出了异常,但是没有被捕获处理。
测试5
前面几个测试,经多次运行,结果都是期望的,没有问题。
那为什么我的代码中,感觉cancel task之后,并没有触发 handleAsync()
的回调方法呢。因为系统比较复杂,不方便调试,我的判断依据是看log,在回调方法中有一些记录log的逻辑,而查看log时,只看到了cancel的log,并没有找到回调方法的log,所以我感觉是回调方法没有被触发。
经过好几天的测试和分析,我始终没有找到原因,甚至开始怀疑是JDK的问题,今天才终于发现,还是我自己的问题。囧……
直接说结论,因为 handleAsync()
是启动异步线程,所以如果在回调方法中出现异常,并没有被捕获。
说白了就是,在回调方法中,出现了异常,没有捕获,所以线程一下子挂了。
那么出现了什么异常了?为什么正常结束时,回调方法里就没有异常了呢,说来惭愧,因为cancel task,导致task结果是null,而我在回调方法里使用了类似 result.getXxx()
的代码,所以出现异常,而正常结束时task结果不是null,所以就没有异常了。
在 handleResult()
方法最前面添加一行代码:
System.out.println(result.length());
运行程序,结果如下:
main: test start 1684934679760
ForkJoinPool.commonPool-worker-1: runTask start 1684934679788
main: cancel start 1684934681789
main: cancel end 1684934681792, result = true
main: test end 1684934683830
看,是不是感觉cancel操作没有触发 handleAsync()
的回调方法?
当然,本例是十分简化的模型,看起来一目了然,而在实际项目中,情况要复杂百倍,花费了我好几天时间,才最终找到原因。
测试6
在回调方法中捕获异常。
找到问题原因,接下来就好办了,只需在回调方法中捕获异常。修改 handleResult()
方法如下:
private String handleResult(String result, Throwable throwable) {
try {
System.out.println(result.length());
System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);
throwable.printStackTrace();
System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());
return "handle result: success";
} catch (Exception e) {
System.out.println("handleResult exception!");
e.printStackTrace();
throw e;
}
}
运行程序,结果如下:
main: test start 1684935436681
ForkJoinPool.commonPool-worker-1: runTask start 1684935436699
main: cancel start 1684935438700
handleResult exception!
java.lang.NullPointerException: Cannot invoke "String.length()" because "result" is null
at com.example.test0524.Test0524.handleResult(Test0524.java:51)
at com.example.test0524.Test0524.lambda$test$1(Test0524.java:28)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
main: cancel end 1684935438703, result = true
main: test end 1684935440732
OK,现在一切都正常了。
测试7
思考:如果把 handleAsync()
替换成 handle()
,也就是用同步线程,会怎么样呢?
修改 test()
方法,把 handleAsync
替换为 handle
。
修改 handleResult()
方法,去除try…catch块,在最前面加上: System.out.println(Thread.currentThread().getName() + "====================");
运行程序,结果如下:
main: test start 1684936772747
ForkJoinPool.commonPool-worker-1: runTask start 1684936772769
main: cancel start 1684936774770
main====================
main: cancel end 1684936774776, result = true
main: test end 1684936776796
结论:回调方法和主线程使用的是同一个线程,但是如果回调方法有异常,不会被捕获,而且不影响主线程的逻辑。所以还是得捕获回调方法的异常。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)