Java多线程系列--掌握CompletableFuture,驾驭异步编程
前言
本文隶属于我归纳整理的Android知识体系的第四部分,属于 异步
部分的多线程内容
您可以通过访问 总纲 阅读系列内的其他文章。
作者按:草稿进行了几次大改,移除了Demo部分、源码解析部分、设计原理部分。结合实际工作经验,"掌握API能熟练使用、能无障碍阅读相关框架源码" 已基本够用。
读者可结合下面的导图进行快速的知识自查
一个美好的期望
通常情况下,我们希望代码的执行顺序和代码的组织顺序一致,即代码表述了同步执行的程序,这样可以减少很多思考。
而 阅读异步的程序代码,需要在脑海中建立事件流,当程序业务复杂时,将挑战人的记忆力和空间想象力,并非所有人都擅长在脑海中构建并分析异步事件流模型。
所以,我们期望拥有一个非常友好的框架,能够让我们方便地进行异步编程,并且在框架内部设计有线程同步、异常处理机制。
并且,基于该框架编写的代码具有很高的可读、可理解性。
而Future基本无法满足这一期望。
Future的不足与CompletableFuture的来源
Future的不足
在先前的系列文章中,我们已经回顾了Future类的设计,在绝大多数场景下,我们选择使用多线程,是为了 充分利用机器性能 以及 避免用户交互线程出现长时间阻塞 以致影响体验。
所以我们将耗时的、会引起长时间阻塞的任务分离到其他线程执行,并在 合适时机 进行线程同步,于主线程(一般负责用户交互处理、界面渲染)中处理结果。
详见拙作 掌握Future,轻松获取异步任务结果 、掘金链接
Future
于 Java 1.5版本引入,它类似于 异步处理的结果占位符
, 提供了两个方法获取结果:
get()
, 调用线程进入阻塞直至得到结果或者异常。get(long timeout, TimeUnit unit)
, 调用线程将仅在指定时间 timeout 内等待结果或者异常,如果超时未获得结果就会抛出 TimeoutException 异常。
Future
可以实现 Runnable
或 Callable
接口来定义任务,一定程度上满足 使用框架进行异步编程
的期望,但通过整体源码可知它存在如下 3个问题 :
- 调用
get()
方法会一直阻塞直到获取结果、异常,无法在任务完成时获得 "通知" ,无法附加回调函数 - 不具备链式调用和结果聚合处理能力,当我们想链接多个
Future
共同完成一件任务时,没有框架级的处理,只能编写业务级逻辑,合并结果,并小心的处理同步 - 需要单独编写异常处理代码
使用 get(long timeout, TimeUnit unit)
和 isDone()
判断,确实可以缓解问题1,但这需要结合业务单独设计(调优),存在大量的不确定性。不再展开
Java 8中引入 CompletableFuture
来解决 Future
的不足。
CompletableFuture来源
CompletableFuture
的设计灵感来自于 Google Guava
库的 ListenableFuture
类,它实现了 Future接口
和 CompletionStage接口
,
并且新增一系列API,支持Java 8的 lambda特性
,通过回调利用非阻塞方法,提升了异步编程模型。
它解决了Future的不足,允许我们在非主线程中运行任务,并向启动线程 (一般是主线程) 通知 任务完成
或 任务失败
,编写异步的、非阻塞的程序。
使用CompletableFuture
最简方式获取实例
使用 CompletableFuture.completedFuture(U value)
可以获取一个 执行状态已经完成
的 CompletableFuture
对象。
这可以用于快速改造旧程序,并进行逐步过渡
class Demo {
@Test
public void testSimpleCompletableFuture() {
CompletableFuture<String> completableFuture =
CompletableFuture.completedFuture("testSimpleCompletableFuture");
assertTrue(completableFuture.isDone());
try {
assertEquals("testSimpleCompletableFuture", completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
改造线程同步部分
部分老旧程序已经建立了多线程业务模型,我们可以使用 CompletableFuture
改造其中的线程同步部分,但暂不改造数据传递。
使用 runAsync()
方法,该方法接收一个 Runnable
类型的参数返回 CompletableFuture<Void>
:
//并不改变原项目中数据传递的部分、或者不关心结果数据,仅进行同步
class Demo {
@Test
public void testCompletableFutureRunAsync() {
AtomicInteger variable = new AtomicInteger(0);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
runAsync.join();
assertEquals(1, variable.get());
}
public void process(AtomicInteger variable) {
System.out.println(Thread.currentThread() + " Process...");
variable.set(1);
}
}
进一步改造结果数据传递
当我们关心异步任务的结果数据、或者改造原 多线程业务模型 的 数据传递方式 时,可以使用 supplyAsync()
方法,该方法接收一个 Supplier<T>
接口类型的参数,它实现了任务的逻辑,方法返回 CompletableFuture<T>
实例。
class Demo {
@Test
public void testCompletableFutureSupplyAsync() {
CompletableFuture<String> supplyAsync =
CompletableFuture.supplyAsync(this::process);
try {
// Blocking
assertEquals("testCompletableFutureSupplyAsync", supplyAsync.get());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "testCompletableFutureSupplyAsync";
}
}
指定执行线程池
"获取用于执行任务的线程" 类似 Java 8 中的 parallelStream
, CompletableFuture
默认从全局
ForkJoinPool.commonPool()
获取线程,用于执行任务。同时也提供了指定线程池的方式用于获取线程执行任务,您可以使用API中具有 Executor
参数的重载方法。
class Demo {
@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
ExecutorService newFixedThreadPool =
Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process,
newFixedThreadPool);
try {
// Blocking
assertEquals("testCompletableFutureSupplyAsyncWithExecutor", supplyAsync.get());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "testCompletableFutureSupplyAsyncWithExecutor";
}
}
CompletableFuture
中有众多API,方法命名中含有 Async
的API可使用线程池。
截至此处,以上使用方式均与 Future
类似,接下来演示 CompletableFuture
的不同
回调&链式调用
CompletableFuture
的 get()
API是阻塞式获取结果,CompletableFuture
提供了
thenApply
thenAccept
thenRun
等API来避免阻塞式获取,并且可添加 任务完成
后的回调。这几个方法的使用场景如下:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
收到结果后,可以进行转化CompletableFuture<Void> thenAccept(Consumer<? super T> action)
收到结果后,对其进行消费CompletableFuture<Void> thenRun(Runnable action)
收到结果后,执行回调,无法消费结果只能消费 这一事件
API较为简单,不再代码演示
显然,通过链式调用可以组装多个执行过程。
有读者可能会疑惑:
Function
和Consumer
也可以进行链式组装,是否存在冗余呢?
两种的链式调用特性确实存在重叠,您可以自行选择用法,但 thenRun
只能采用 CompletableFuture
的链式调用。
另外,前面提到,我们可以指定线程池执行任务,对于这三组API,同样有相同的特性,通过 thenXXXXAsync
指定线程池,这是 Function
和 Consumer
的链式组装所无法完成的。
class Demo {
@Test
public void testCompletableFutureApplyAsync() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// 从线程池 newFixedThreadPool 获取线程执行任务
CompletableFuture<Double> completableFuture =
CompletableFuture.supplyAsync(() -> 1D, newFixedThreadPool)
.thenApplyAsync(d -> d + 1D, newSingleThreadScheduledExecutor)
.thenApplyAsync(d -> d + 2D);
Double result = completableFuture.join();
assertEquals(4D, result);
}
}
聚合多个CompletableFuture
通过 聚合
多个 CompletableFuture
,可以组成更 复杂
的业务流,可以达到精细地控制粒度、聚焦单个节点的业务。
注意:操作符并不能完全的控制 CompletableFuture
任务执行的时机,您需要谨慎的选择 CompletableFuture
的创建时机
thenCompose、thenComposeAsync
compose
原意为 组成
, 通过多个 CompletableFuture
构建异步流。
在操作的 CompletableFuture
获得结果时,将另一个 CompletableFuture
compose
到异步流中,compose的过程中,可以根据操作的 CompletableFuture
的结果编写逻辑。
与 thenApply
相比,thenCompose
返回逻辑中提供的 CompletableFuture
而 thenApply
返回框架内处理的新实例。
注意,这一特性在使用 FP编程范式
进行编码时,会显得非常灵活,一定程度上提升了函数的复用性
API含义直观,不再进行代码演示
thenCombine、thenCombineAsync
thenCombine
可以用于合并多个 独立任务 的处理结果。
注意:
thenCompose
进行聚合时,下游可以使用上游的结果,在业务需求上一般表现为依赖上一步结果,而非两者相互独立。
例如,产品希望在博客详情页同时展示 "博客的详情" 和 "作者主要信息" ,以避免内容区抖动或割裂的骨架占位。这两者 可以独立获取时 ,则可以使用 thenCombine
系列API,分别获取,并合并结果。
combine
的特点是 被合并的两个 CompletableFuture
可以并发,等两者都获得结果后进行合并。
但它依旧存在使用上的不便捷,合并超过2个 CompletableFuture
时,显得不够灵活。可以使用
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
API。
allOf
创建了 CompletableFuture<Void>
,并不会帮助我们合并结果,所以需要自行编写业务代码合并,故存在 Side Effects
。
runAfterBoth、runAfterBothAsync;runAfterEither、runAfterEitherAsync
runAfterBoth
系列API在两个CompletableFuture
都获得结果后执行回调runAfterEither
系列API在两个CompletableFuture
任意一个获得结果后执行回调
通过API,不难理解它们需要使用者自行处理结果
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
;CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
同样可以增加编码灵活性,不再赘述。
applyToEither、applyToEitherAsync;acceptEither、acceptEitherAsync;thenAcceptBoth、thenAcceptBothAsync
applyToEither
系列API表现如thenApply
和Either
的组合,两个同类型的CompletableFuture
任意一个获得结果后,可消费该结果并进行改变,类似 thenApplyacceptEither
系列API表现如thenAccept
和Either
的组合,两个同类型的CompletableFuture
任意一个获得结果后,可消费该结果,类似 thenAcceptthenAcceptBoth
系列API表现如thenCombine
,但返回CompletableFuture<Void>
同样可以增加编码灵活性,不再赘述
结果处理
使用回调处理结果有两种API,注意,除了正常获得结果外还可能获得异常,而这两组API簇差异体现在对 异常
的处理中。
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
handle
使用 BiFunction
,无论是正常结果还是异常情况,均视作可被逻辑接受,消费后转化
而 whenComplete
使用 BiConsumer
,仅可消费但不能转化,异常情况被视作不可被逻辑接受,仍会抛出。
举个例子,进行网络编程时会遇到 Exception
, 如果业务设计中使用的模型实体包含了 正常结果
、异常
两种情况:
open class Result<T>(val t: T?) {
open val isThr: Boolean = false
}
class FailResult<T>(val tr: Throwable) : Result<T>(null) {
override val isThr: Boolean = true
}
则适合使用 handle
API在底层处理。否则需要额外的异常处理,可依据项目的设计选择处理方式,一般在依据FP范式设计的程序中,倾向于使用handle,避免增加side effect。
异常处理
在多线程背景下,异常处理并不容易。它不仅仅是使用 try-catch
捕获异常,还包含程序异步流中,节点出现异常时流的业务走向。
在 CompletableFuture
中,节点出现异常将跳过后续节点,进入异常处理。
_如果您不希望某个节点抛出异常导致后续流程中断,则可在节点的处理中捕获并包装为结果、或者对子 CompletableFuture 节点采用 handle
、exceptionally
API转换异常 _
除前文提到的 handle
whenComplete
,CompletableFuture
中还提供了 exceptionally
API用于处理异常
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
从表现结果看,它类似于 handle
API中对异常的处理,将异常转换为目标结果的一种特定情形。