【编码心得】CompletableFuture的使用心得
文章目录
- 前言
- 干货篇
- Future 介绍
- CompletableFuture 介绍
- CompletableFuture常用操作
- 处理异步结算的结果
- 异常处理
- 你可以通过 handle() 方法来处理任务执行过程中可能出现的抛出异常的情况。
- 你还可以通过 exceptionally() 方法来处理异常情况。
- 如果你想让 CompletableFuture 的结果就是异常的话,可以使用 completeExceptionally() 方法为其赋值。
- 组合 CompletableFuture
- 顺序连接
- 并行连接
- 随机连接
- 并行运行多个 CompletableFuture
- CompletableFuture 使用建议
- 使用自定义线程池
- 尽量避免使用 get()
- 异常处理的常见操作
- 总结
前言
实际项目中,一个接口可能需要同时获取多种不同来源的数据,然后再做处理返回。
如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。
串行流程:
但通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。
并行流程:
这篇文章是 CompletableFuture 的简单使用心得,废话不多说,开始上干货~
干货篇
Future 介绍
讲CompletableFuture之前,先得回顾下Future,该类主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。
在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法
主要包括下面这 4 个功能:
- 取消任务
- 判断任务是否被取消
- 判断任务是否已经执行完成
- 获取任务执行结果
CompletableFuture 介绍
Future不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。
所以引入CompletableFuture 类解决Future 的这些缺陷。
CompletableFuture 类的定义。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
可以发现,CompletableFuture 同时实现了 Future 和 CompletionStage 接口。
CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程的能力。
CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。CompletableFuture 的函数式能力就是这个接口赋予的
CompletableFuture常用操作
- 通过 new 关键字创建 CompletableFuture 对象这种使用方式可以看作是将 CompletableFuture 当做 Future 来使用。
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
- 假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete() 方法为其传入结果,这表示 resultFuture 已经被完成了。
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
- 通过 isDone() 方法来检查是否已经完成。
public boolean isDone() {return result != null;
}
- 获取异步计算的结果也非常简单,直接调用 get() 方法即可。调用 get() 方法的线程会阻塞直到 CompletableFuture 完成运算。
rpcResponse = completableFuture.get();
- 如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture 。
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
- runAsync() 方法接受的参数是 Runnable ,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync() 方法。
@FunctionalInterface
public interface Runnable {public abstract void run();
}
- supplyAsync() 方法接受的参数是 Supplier ,这也是一个函数式接口,U 是返回结果值的类型。
@FunctionalInterface
public interface Supplier<T> {/*** Gets a result.** @return a result*/T get();
}
- 当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync() 方法。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!"));
future.get();// 输出 "hello!"
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello!");
assertEquals("hello!", future2.get());
处理异步结算的结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
thenAccept()
thenRun()
whenComplete()
thenApply() 方法接受一个 Function 实例,用它来处理结果。
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}
//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);
}
thenApply() 方法使用示例如下:
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!").thenApply(s -> s + "world!");
assertEquals("hello!world!", future.get());
// 这次调用将被忽略。
future.thenApply(s -> s + "nice!");
assertEquals("hello!world!", future.get());
你还可以进行 流式调用:
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!").thenApply(s -> s + "world!").thenApply(s -> s + "nice!");
assertEquals("hello!world!nice!", future.get());
如果你不需要从回调函数中获取返回结果,可以使用 thenAccept() 或者 thenRun()。这两个方法的区别在于 thenRun() 不能访问异步计算的结果。
thenAccept() 方法的参数是 Consumer<? super T> 。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);
}public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {return uniAcceptStage(defaultExecutor(), action);
}public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) {return uniAcceptStage(screenExecutor(executor), action);
}
顾名思义,Consumer 属于消费型接口,它可以接收 1 个输入对象然后进行“消费”。
@FunctionalInterface
public interface Consumer<T> {void accept(T t);default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}
thenRun() 的方法是的参数是 Runnable 。
public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);
}public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(defaultExecutor(), action);
}public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);
}
thenAccept() 和 thenRun() 使用示例如下:
CompletableFuture.completedFuture("hello!").thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!CompletableFuture.completedFuture("hello!").thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!
whenComplete() 的方法的参数是 BiConsumer<? super T, ? super Throwable> 。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);
}public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(defaultExecutor(), action);
}
// 使用自定义线程池(推荐)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {return uniWhenCompleteStage(screenExecutor(executor), action);
}
相对于 Consumer , BiConsumer 可以接收 2 个输入对象然后进行“消费”。
@FunctionalInterface
public interface BiConsumer<T, U> {void accept(T t, U u);default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {Objects.requireNonNull(after);return (l, r) -> {accept(l, r);after.accept(l, r);};}
}
whenComplete() 使用示例如下:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!").whenComplete((res, ex) -> {// res 代表返回的结果// ex 的类型为 Throwable ,代表抛出的异常System.out.println(res);// 这里没有抛出异常所有为 nullassertNull(ex);});
assertEquals("hello!", future.get());
异常处理
你可以通过 handle() 方法来处理任务执行过程中可能出现的抛出异常的情况。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);
}public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(defaultExecutor(), fn);
}public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);
}
示例代码如下:
CompletableFuture<String> future= CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Computation error!");}return "hello!";
}).handle((res, ex) -> {// res 代表返回的结果// ex 的类型为 Throwable ,代表抛出的异常return res != null ? res : "world!";
});
assertEquals("world!", future.get());
你还可以通过 exceptionally() 方法来处理异常情况。
CompletableFuture<String> future= CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Computation error!");}return "hello!";
}).exceptionally(ex -> {System.out.println(ex.toString());// CompletionExceptionreturn "world!";
});
assertEquals("world!", future.get());
如果你想让 CompletableFuture 的结果就是异常的话,可以使用 completeExceptionally() 方法为其赋值。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
组合 CompletableFuture
顺序连接
你可以使用 thenCompose() 按顺序链接两个 CompletableFuture 对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(defaultExecutor(), fn);
}public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);
}
thenCompose() 方法会使用示例如下:CompletableFuture<String> future= CompletableFuture.supplyAsync(() -> "hello!").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!"));
assertEquals("hello!world!", future.get());
在实际开发中,这个方法还是非常有用的。比如说,task1 和 task2 都是异步执行的,但 task1 必须执行完成后才能开始执行 task2(task2 依赖 task1 的执行结果)。
并行连接
thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
随机连接
例如,如果我们想要实现 task1 和 task2 中的任意一个任务执行完后就执行 task3 的话,可以使用 acceptEither()。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {return orAcceptStage(null, other, action);
}public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {return orAcceptStage(asyncPool, other, action);
}
任务组合操作acceptEitherAsync()会在异步任务 1 和异步任务 2 中的任意一个完成时触发执行任务 3,但是需要注意,这个触发时机是不确定的。如果任务 1 和任务 2 都还未完成,那么任务 3 就不能被执行。
并行运行多个 CompletableFuture
- 通过 CompletableFuture 的 allOf()这个静态方法来并行运行多个 CompletableFuture 。
allOf() 方法会等到所有的 CompletableFuture 都运行完成之后再返回
示例代码如下:
CompletableFuture<Void> task1 =CompletableFuture.supplyAsync(()->{//自定义业务操作});
......
CompletableFuture<Void> task6 =CompletableFuture.supplyAsync(()->{//自定义业务操作});
......CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);try {headerFuture.join();} catch (Exception ex) {......}
System.out.println("all done. ");
- 调用 join() 可以让程序等future1 和 future2 都运行完了之后再继续执行。
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2);
completableFuture.join();
assertTrue(completableFuture.isDone());
System.out.println("all futures done...");
输出:
future1 done...
future2 done...
all futures done...
- anyOf() 方法不会等待所有的 CompletableFuture 都运行完成之后再返回,只要有一个执行完成即可!
CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2);
System.out.println(f.get());
输出结果可能是:
future2 done...
efg
也可能是:
future1 done...
abc
CompletableFuture 使用建议
使用自定义线程池
CompletableFuture 默认使用ForkJoinPool.commonPool() 作为执行器,这个线程池是全局共享的,可能会被其他任务占用,导致性能下降或者饥饿。因此,建议使用自定义的线程池来执行 CompletableFuture 的异步任务,可以提高并发度和灵活性。
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());CompletableFuture.runAsync(() -> {//...
}, executor);
尽量避免使用 get()
CompletableFuture的get()方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10_000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello, world!";});// 获取异步任务的返回值,设置超时时间为 5 秒try {String result = future.get(5, TimeUnit.SECONDS);System.out.println(result);} catch (InterruptedException | ExecutionException | TimeoutException e) {// 处理异常e.printStackTrace();}
}
异常处理的常见操作
上面这段代码在调用 get() 时抛出了 TimeoutException 异常。这样我们就可以在异常处理中进行相应的操作,比如取消任务、重试任务、记录日志等。
总结
下面是我对 CompletableFuture 使用的一些心得总结:
1. 异步编程的利器
CompletableFuture 使得异步编程变得更加容易和直观。你可以将耗时的操作(如网络请求、文件读写等)封装成 CompletableFuture,然后继续执行其他任务,当异步操作完成时,你可以通过回调函数来处理结果。使用 whenComplete 方法可以在任务完成时触发回调函数,并正确地处理异常,而不是让异常被吞噬或丢失。
2. 丰富的API支持
CompletableFuture 提供了丰富的API来支持链式调用、组合多个异步操作、异常处理等。例如,.thenApply() 用于对异步操作的结果进行转换;.thenCompose() 用于组合多个 CompletableFuture;.exceptionally() 用于处理异步操作中抛出的异常。使用 exceptionally 方法可以处理异常并重新抛出,以便异常能够传播到后续阶段,而不是让异常被忽略或终止。
3. 易于理解和使用
虽然异步编程本身可能有些复杂,但 CompletableFuture 的API设计得相对直观,通过链式调用可以清晰地表达异步逻辑。此外,CompletableFuture 还支持Lambda表达式,使得代码更加简洁。
4. 注意避免嵌套地狱
虽然 CompletableFuture 提供了链式调用的能力,但如果过度使用或嵌套过多,可能会导致代码难以阅读和维护。这时候,可以考虑使用 .thenCompose() 来代替 .thenApply() 或 .thenAccept(),以减少嵌套层次。
5. 合理利用线程池
CompletableFuture 默认使用 ForkJoinPool.commonPool() 来执行异步任务,但在某些情况下,你可能需要自定义线程池来更好地控制并发级别和线程资源。可以通过 supplyAsync() 或 runAsync() 方法传入自定义的 Executor 来实现。
6. 异常处理的重要性
异步编程中,异常处理尤为重要。CompletableFuture 提供了 .exceptionally() 方法来处理异步操作中的异常,但请注意,这只会捕获到异步任务执行过程中抛出的未检查异常(RuntimeException 和 Error)。对于检查型异常,你需要在 CompletableFuture 的工厂方法(如 supplyAsync())中显式捕获并处理。
7. 替代方案
虽然 CompletableFuture 是 Java 8 引入的一个强大工具,但在某些情况下,你可能会考虑使用其他异步编程框架或库,如 Reactor(在响应式编程中广泛使用)或 CompletableRxJava(基于 RxJava 的一个轻量级版本)或京东的 asyncTool 。。这些框架提供了更高级的抽象和更丰富的操作符,可能会更适合某些特定的场景。
我是杰叔叔,一名沪漂的码农,下期再会!