Java多线程系列--掌握CompletableFuture,驾驭异步编程

前言

本文隶属于我归纳整理的Android知识体系的第四部分,属于 异步 部分的多线程内容

您可以通过访问 总纲 阅读系列内的其他文章。

desc

作者按:草稿进行了几次大改,移除了Demo部分、源码解析部分、设计原理部分。结合实际工作经验,"掌握API能熟练使用、能无障碍阅读相关框架源码" 已基本够用。

读者可结合下面的导图进行快速的知识自查

desc

一个美好的期望

通常情况下,我们希望代码的执行顺序和代码的组织顺序一致,即代码表述了同步执行的程序,这样可以减少很多思考。

阅读异步的程序代码,需要在脑海中建立事件流,当程序业务复杂时,将挑战人的记忆力和空间想象力,并非所有人都擅长在脑海中构建并分析异步事件流模型

所以,我们期望拥有一个非常友好的框架,能够让我们方便地进行异步编程,并且在框架内部设计有线程同步、异常处理机制。

并且,基于该框架编写的代码具有很高的可读、可理解性。

而Future基本无法满足这一期望。

Future的不足与CompletableFuture的来源

Future的不足

在先前的系列文章中,我们已经回顾了Future类的设计,在绝大多数场景下,我们选择使用多线程,是为了 充分利用机器性能 以及 避免用户交互线程出现长时间阻塞 以致影响体验。

所以我们将耗时的、会引起长时间阻塞的任务分离到其他线程执行,并在 合适时机 进行线程同步,于主线程(一般负责用户交互处理、界面渲染)中处理结果。

详见拙作 掌握Future,轻松获取异步任务结果掘金链接

Future 于 Java 1.5版本引入,它类似于 异步处理的结果占位符 , 提供了两个方法获取结果:

  • get(), 调用线程进入阻塞直至得到结果或者异常。
  • get(long timeout, TimeUnit unit), 调用线程将仅在指定时间 timeout 内等待结果或者异常,如果超时未获得结果就会抛出 TimeoutException 异常。

Future 可以实现 RunnableCallable 接口来定义任务,一定程度上满足 使用框架进行异步编程 的期望,但通过整体源码可知它存在如下 3个问题

  • 调用 get() 方法会一直阻塞直到获取结果、异常,无法在任务完成时获得 "通知" ,无法附加回调函数
  • 不具备链式调用和结果聚合处理能力,当我们想链接多个 Future 共同完成一件任务时,没有框架级的处理,只能编写业务级逻辑,合并结果,并小心的处理同步
  • 需要单独编写异常处理代码

使用 get(long timeout, TimeUnit unit)isDone() 判断,确实可以缓解问题1,但这需要结合业务单独设计(调优),存在大量的不确定性。不再展开

Java 8中引入 CompletableFuture 来解决 Future 的不足。

CompletableFuture来源

CompletableFuture 的设计灵感来自于 Google Guava 库的 ListenableFuture 类,它实现了 Future接口CompletionStage接口 , 并且新增一系列API,支持Java 8的 lambda特性,通过回调利用非阻塞方法,提升了异步编程模型。

它解决了Future的不足,允许我们在非主线程中运行任务,并向启动线程 (一般是主线程) 通知 任务完成任务失败,编写异步的、非阻塞的程序。

使用CompletableFuture

最简方式获取实例

使用 CompletableFuture.completedFuture(U value) 可以获取一个 执行状态已经完成CompletableFuture 对象。

这可以用于快速改造旧程序,并进行逐步过渡

class Demo {
    @Test
    public void testSimpleCompletableFuture() {
        CompletableFuture<String> completableFuture =
                CompletableFuture.completedFuture("testSimpleCompletableFuture");

        assertTrue(completableFuture.isDone());
        try {
            assertEquals("testSimpleCompletableFuture", completableFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

改造线程同步部分

部分老旧程序已经建立了多线程业务模型,我们可以使用 CompletableFuture 改造其中的线程同步部分,但暂不改造数据传递。

使用 runAsync() 方法,该方法接收一个 Runnable 类型的参数返回 CompletableFuture<Void>:

//并不改变原项目中数据传递的部分、或者不关心结果数据,仅进行同步
class Demo {
    @Test
    public void testCompletableFutureRunAsync() {
        AtomicInteger variable = new AtomicInteger(0);
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
        runAsync.join();
        assertEquals(1, variable.get());
    }

    public void process(AtomicInteger variable) {
        System.out.println(Thread.currentThread() + " Process...");
        variable.set(1);
    }
}

进一步改造结果数据传递

当我们关心异步任务的结果数据、或者改造原 多线程业务模型数据传递方式 时,可以使用 supplyAsync() 方法,该方法接收一个 Supplier<T> 接口类型的参数,它实现了任务的逻辑,方法返回 CompletableFuture<T> 实例。

class Demo {
    @Test
    public void testCompletableFutureSupplyAsync() {
        CompletableFuture<String> supplyAsync =
                CompletableFuture.supplyAsync(this::process);
        try {
            // Blocking 
            assertEquals("testCompletableFutureSupplyAsync", supplyAsync.get());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String process() {
        return "testCompletableFutureSupplyAsync";
    }
}

指定执行线程池

"获取用于执行任务的线程" 类似 Java 8 中的 parallelStreamCompletableFuture 默认从全局 ForkJoinPool.commonPool() 获取线程,用于执行任务。同时也提供了指定线程池的方式用于获取线程执行任务,您可以使用API中具有 Executor 参数的重载方法。

class Demo {
    @Test
    public void testCompletableFutureSupplyAsyncWithExecutor() {
        ExecutorService newFixedThreadPool =
                Executors.newFixedThreadPool(2);
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process,
                newFixedThreadPool);
        try {
            // Blocking 
            assertEquals("testCompletableFutureSupplyAsyncWithExecutor", supplyAsync.get());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String process() {
        return "testCompletableFutureSupplyAsyncWithExecutor";
    }
}

CompletableFuture 中有众多API,方法命名中含有 Async 的API可使用线程池。

截至此处,以上使用方式均与 Future 类似,接下来演示 CompletableFuture 的不同

回调&链式调用

CompletableFutureget()API是阻塞式获取结果,CompletableFuture 提供了

  • thenApply
  • thenAccept
  • thenRun

等API来避免阻塞式获取,并且可添加 任务完成 后的回调。这几个方法的使用场景如下:

  • <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 收到结果后,可以进行转化
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action) 收到结果后,对其进行消费
  • CompletableFuture<Void> thenRun(Runnable action) 收到结果后,执行回调,无法消费结果只能消费 这一事件

API较为简单,不再代码演示

显然,通过链式调用可以组装多个执行过程。

有读者可能会疑惑:FunctionConsumer 也可以进行链式组装,是否存在冗余呢?

两种的链式调用特性确实存在重叠,您可以自行选择用法,但 thenRun 只能采用 CompletableFuture的链式调用。

另外,前面提到,我们可以指定线程池执行任务,对于这三组API,同样有相同的特性,通过 thenXXXXAsync 指定线程池,这是 FunctionConsumer 的链式组装所无法完成的。

class Demo {
    @Test
    public void testCompletableFutureApplyAsync() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        // 从线程池 newFixedThreadPool 获取线程执行任务 
        CompletableFuture<Double> completableFuture =
                CompletableFuture.supplyAsync(() -> 1D, newFixedThreadPool)
                        .thenApplyAsync(d -> d + 1D, newSingleThreadScheduledExecutor)
                        .thenApplyAsync(d -> d + 2D);
        Double result = completableFuture.join();
        assertEquals(4D, result);
    }

}

聚合多个CompletableFuture

通过 聚合 多个 CompletableFuture,可以组成更 复杂 的业务流,可以达到精细地控制粒度、聚焦单个节点的业务。

注意:操作符并不能完全的控制 CompletableFuture 任务执行的时机,您需要谨慎的选择 CompletableFuture 的创建时机

thenCompose、thenComposeAsync

compose 原意为 组成, 通过多个 CompletableFuture 构建异步流。

在操作的 CompletableFuture 获得结果时,将另一个 CompletableFuture compose 到异步流中,compose的过程中,可以根据操作的 CompletableFuture 的结果编写逻辑。

thenApply 相比,thenCompose 返回逻辑中提供的 CompletableFuturethenApply 返回框架内处理的新实例。

注意,这一特性在使用 FP编程范式进行编码时,会显得非常灵活,一定程度上提升了函数的复用性

API含义直观,不再进行代码演示

thenCombine、thenCombineAsync

thenCombine 可以用于合并多个 独立任务 的处理结果。

注意: thenCompose 进行聚合时,下游可以使用上游的结果,在业务需求上一般表现为依赖上一步结果,而非两者相互独立。

例如,产品希望在博客详情页同时展示 "博客的详情" 和 "作者主要信息" ,以避免内容区抖动或割裂的骨架占位。这两者 可以独立获取时 ,则可以使用 thenCombine 系列API,分别获取,并合并结果。

combine 的特点是 被合并的两个 CompletableFuture 可以并发,等两者都获得结果后进行合并。

但它依旧存在使用上的不便捷,合并超过2个 CompletableFuture 时,显得不够灵活。可以使用 static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) API。

allOf 创建了 CompletableFuture<Void>,并不会帮助我们合并结果,所以需要自行编写业务代码合并,故存在 Side Effects

runAfterBoth、runAfterBothAsync;runAfterEither、runAfterEitherAsync

  • runAfterBoth 系列API在两个 CompletableFuture 都获得结果后执行回调
  • runAfterEither 系列API在两个 CompletableFuture 任意一个获得结果后执行回调

通过API,不难理解它们需要使用者自行处理结果

  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
  • CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)

同样可以增加编码灵活性,不再赘述。

applyToEither、applyToEitherAsync;acceptEither、acceptEitherAsync;thenAcceptBoth、thenAcceptBothAsync

  • applyToEither 系列API表现如 thenApplyEither 的组合,两个同类型的 CompletableFuture 任意一个获得结果后,可消费该结果并进行改变,类似 thenApply
  • acceptEither 系列API表现如 thenAcceptEither 的组合,两个同类型的 CompletableFuture 任意一个获得结果后,可消费该结果,类似 thenAccept
  • thenAcceptBoth 系列API表现如 thenCombine,但返回 CompletableFuture<Void>

同样可以增加编码灵活性,不再赘述

结果处理

使用回调处理结果有两种API,注意,除了正常获得结果外还可能获得异常,而这两组API簇差异体现在对 异常 的处理中。

<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

handle 使用 BiFunction,无论是正常结果还是异常情况,均视作可被逻辑接受,消费后转化

whenComplete 使用 BiConsumer,仅可消费但不能转化,异常情况被视作不可被逻辑接受,仍会抛出。

举个例子,进行网络编程时会遇到 Exception, 如果业务设计中使用的模型实体包含了 正常结果异常 两种情况:

open class Result<T>(val t: T?) {
    open val isThr: Boolean = false
}

class FailResult<T>(val tr: Throwable) : Result<T>(null) {
    override val isThr: Boolean = true
}

则适合使用 handle API在底层处理。否则需要额外的异常处理,可依据项目的设计选择处理方式,一般在依据FP范式设计的程序中,倾向于使用handle,避免增加side effect。

异常处理

在多线程背景下,异常处理并不容易。它不仅仅是使用 try-catch 捕获异常,还包含程序异步流中,节点出现异常时流的业务走向。

CompletableFuture 中,节点出现异常将跳过后续节点,进入异常处理。

_如果您不希望某个节点抛出异常导致后续流程中断,则可在节点的处理中捕获并包装为结果、或者对子 CompletableFuture 节点采用 handleexceptionally API转换异常 _

除前文提到的 handle whenCompleteCompletableFuture 中还提供了 exceptionally API用于处理异常

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

从表现结果看,它类似于 handle API中对异常的处理,将异常转换为目标结果的一种特定情形。