适合已经在项目中用过 CompletableFuture、见过thenApply/thenCompose、想搞清楚内部实现和踩坑点的读者。不适合第一次接触异步编程的新手。CompletableFuture 就是升级版的 Future加了个回调——两年前我也是这么理解的。直到有一次线上出了 bug一个异步链路在异常时既不抛异常也不返回结果线程池里几百个线程全卡住了查了一整天才定位到问题。回头看CompletableFuture 远不止Future 回调这么简单。它内部的状态机设计、异步触发机制、以及线程池的策略选择每一个都有值得深挖的细节。从 Future 到 CompletableFuture为什么不够用Java 5 引入的 Future 接口说起来挺尴尬的// java/util/concurrent/Future.java — JDK 5 public interface FutureV { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, TimeoutException; }它只有两个核心能力查状态和阻塞拿结果。在异步编程里这基本等于没有——你去查状态然后轮询或者get()把自己阻塞住那要异步干什么Guava 在 2012 年就给出了改进方案——ListenableFuture可以注册回调。Java 8 的CompletableFuture抄了这个思路但走得更远。CompletableFuture 的核心状态机// java/util/concurrent/CompletableFuture.java — JDK 8 // 内部结构——高度精简 public class CompletableFutureT implements FutureT, CompletionStageT { volatile Object result; // 最终结果或 AltResult包装异常 volatile Completion stack; // 等待完成的依赖动作Treiber Stack // result 的状态编码 // null → 未完成 // AltResult → 异常完成 // 其他 → 正常完成或间接完成 }精髓就在这俩字段。result存储最终状态一旦赋值通过 CAS就不可变。stack是一个无锁的 Treiber Stack所有依赖当前 Future 的回调都压在这个栈上。// internalComplete 方法——complete 的核心 final void internalComplete(T value) { // CAS 设置 result // 如果成功之前是 null触发后续回调 if (RESULT.compareAndSet(this, null, value)) { postComplete(); // 弹栈执行所有依赖的 Completion } }我读这段源码时的第一反应是这不就是个简化的异步状态机吗每个 CompletableFuture 维护一个状态result以及一个等待状态的依赖列表stack。状态一旦确定就不可逆——所有注册的依赖按 LIFO 顺序出栈执行。链式调用的执行机制CompletableFuture.supplyAsync(() - hello) // 阶段 1 .thenApply(s - s world) // 阶段 2 .thenAccept(System.out::println); // 阶段 3这三行代码背后发生了什么// supplyAsync 阶段 public static U CompletableFutureU supplyAsync(SupplierU supplier) { return asyncSupplyStage(asyncPool, supplier); } static U CompletableFutureU asyncSupplyStage(ForkJoinPool? pool, SupplierU f) { CompletableFutureU d new CompletableFuture(); // 将任务提交到 ForkJoinPool.commonPool() 执行 // 任务完成后调用 d.complete() pool.execute(new AsyncSupply(d, f)); return d; }// thenApply 创建依赖 public U CompletableFutureU thenApply(Function? super T,? extends U fn) { return uniApplyStage(null, fn); // null 不指定线程池 } private V CompletableFutureV uniApplyStage(Executor e, Function? super T,? extends V f) { CompletableFutureV d new CompletableFuture(); if (e ! null || !d.uniApply(this, f, null)) { // 如果当前 Future 还没完成 // 将 UniApply 压入 this.stack // 等 this 完成后从 stack 弹出执行 UniApplyT,V c new UniApply(e, d, this, f); push(c); // 防止在 push 和 tryComplete 之间有 race c.tryFire(NESTED); } return d; }关键流程supplyAsync() → 创建 CF1提交到线程池运行 ↓ thenApply(fn) → 创建 CF2(d) ├── 如果 CF1 已完 → 直接在当前线程执行 fn结果赋给 CF2 └── 如果 CF1 未完成 → 创建 UniApply 节点压入 CF1.stack等回调这段代码我看了好几遍才明白它的双路径设计如果依赖的 Future 已经完成就直接执行如果没完成就注册回调。这跟 JavaScript Promise 的.then()是一样的——只是实现方式更Java用 CAS 和 Treiber Stack 而非微任务队列。线程池选择一个容易忽略的坑// 默认线程池 ForkJoinPool.commonPool() CompletableFuture.supplyAsync(() - compute()); // 走 commonPool // 自定义线程池 CompletableFuture.supplyAsync(() - compute(), executor); // 走你的线程池这两个选择很关键而且很多人选错了。ForkJoinPool.commonPool()// java/util/concurrent/ForkJoinPool.java — JDK 8 // commonPool 的线程数 CPU 核数 - 1 public static ForkJoinPool commonPool() { // ... return common; }默认线程数是Runtime.getRuntime().availableProcessors() - 1。如果你的服务部署在 8 核机器上commonPool 只有 7 个线程。问题在哪如果你在 commonPool 里做了阻塞操作比如调了一个 DB 查询或者等待另一个 Future那这 7 个线程全被占住了——后面所有的异步任务都在排队等线程而线程在等 I/O。自定义线程池的问题那我用自定义线程池不就行了——也不行另一个坑等着你。ExecutorService pool Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() - slowIoOp(), pool) .thenApply(result - process(result)) .thenAccept(System.out::println);因为thenApply和thenAccept没有指定线程池所以它们默认继承触发线程取决于当时执行supplyAsync的线程或者走commonPool。结果你的异步链路跑在两个不同的线程池里——调度混乱甚至可能发生线程饥饿死锁。我之前踩过的那个坑就是这个A 任务在 poolA 里等待 B 任务的完成B 任务又被提交到 poolA 里但 poolA 的线程全被 A 占满了——死锁。最佳实践// 统一用一个线程池贯穿整个链 ExecutorService executor new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new ThreadFactoryBuilder().setNameFormat(async-worker-%d).build() ); CompletableFuture .supplyAsync(() - step1(), executor) .thenApplyAsync(result - step2(result), executor) // 注意thenApplyAsync .thenAcceptAsync(System.out::println, executor); // 一律用 Async 后缀区别thenApply可能在调用者线程或依赖的 Future 完成线程上执行thenApplyAsync总是提交到指定线程池执行我觉得在团队里应该立个规矩线上代码一律用*Async版本显式传入线程池。让thenApply这种非异步的方法留在测试代码里。异常处理的三个层次CompletableFuture 的异常处理有三个主要的 API语义不太一样// 方式 1: exceptionally — 类似 catch可以替换结果 CompletableFuture.supplyAsync(() - { if (Math.random() 0.5) throw new RuntimeException(fail); return 42; }).exceptionally(ex - { log.error(Failed, use default, ex); return -1; // 替代结果 }); // 方式 2: handle — 不管成功失败都调自己判断 .supplyAsync(() - riskyOp()) .handle((result, ex) - { if (ex ! null) return fallback; return result; }); // 方式 3: whenComplete — 看到结果/异常但不替换 .supplyAsync(() - riskyOp()) .whenComplete((result, ex) - { if (ex ! null) log.error(op failed, ex); // 不能改变结果链 });这里有个细节容易踩坑如果exceptionally本身抛异常那最终结果就是那个新异常不像 try-catch 里的异常会往外冒。// java/util/concurrent/CompletableFuture.java — exceptionally 实现 public CompletableFutureT exceptionally(FunctionThrowable, ? extends T fn) { return uniExceptionallyStage(null, fn); } // 内部实现——也是压栈触发时执行 fn // 如果 fn 抛异常结果就是 AltResult(新异常)超时控制Java 9 才补上Java 8 的 CompletableFuture 有个明显缺陷——没有内置超时。如果要超时得自己用Future.get(timeout)或者搞一个ScheduledExecutorService来超时取消。// Java 9 引入的 orTimeout CompletableFuture.supplyAsync(() - expensiveOp()) .orTimeout(3, TimeUnit.SECONDS) // 3 秒超时 .exceptionally(ex - { if (ex instanceof TimeoutException) { return timeout fallback; } return other error; });// Java 9 的 completeOnTimeout CompletableFuture.supplyAsync(() - expensiveOp()) .completeOnTimeout(default, 3, TimeUnit.SECONDS) // 超时则用默认值完成不会抛异常实现原理也不复杂// java/util/concurrent/CompletableFuture.java — Java 9 public CompletableFutureT orTimeout(long timeout, TimeUnit unit) { // 用 Delayer内部 ScheduledExecutor延迟执行 // 到时间后还没完成 → completeExceptionally(new TimeoutException()) if (unit.toNanos(timeout) 0L) return this; Delayer.delay(new Timeout(this), timeout, unit); return this; }不过即使到 JDK 17CompletableFuture 的超时实现也是有问题的——超时后异步任务还在后台继续运行只是主链路不再等待它的结果了。这在某些场景下会导致资源泄漏。如果需要超时真正取消得自己包装Future.cancel(true)。生产环境的最佳实践我总结了几条经验1. 统一线程池全程用*Async// ✅ 好的做法 CompletableFuture.supplyAsync(this::fetchData, executor) .thenApplyAsync(this::transform, executor) .thenAcceptAsync(this::persist, executor) .exceptionallyAsync(ex - handleError(ex), executor);2. 控制并发度用并行流还是 CompletableFuture并行流对分治任务好使比如数组求和但它的线程池是ForkJoinPool.commonPool()不能自定义。如果你要控制并发度、或者子任务之间有依赖必须用 CompletableFuture。3. 大批量任务用信号量限流Semaphore semaphore new Semaphore(20); ListCompletableFutureResult futures tasks.stream() .map(task - CompletableFuture.supplyAsync(() - { semaphore.acquireUninterruptibly(); try { return process(task); } finally { semaphore.release(); } }, executor)) .toList();4. allOf 的返回值设计很蠢// CompletableFuture.allOf() 返回 CompletableFutureVoid // 你想拿到所有结果得自己遍历 CompletableFutureVoid all CompletableFuture.allOf(futures); return futures.stream() .map(CompletableFuture::join) .collect(toList());说实话我不理解为什么 JDK 官方不提供一个allOfWithResults。这不是什么刁钻需求。性能对比我在 4C8G 的机器上测了一组数据1000 个异步任务与 I/O 混合方式总耗时 (ms)CPU 利用率Future 线程池自己轮询352045%CompletableFuture默认 commonPool128078%CompletableFuture自定义线程池 20 线程98085%CompletableFuture 信号量限流105082%CompletableFuture 相比手写 Future 轮询性能优势明显——主要是因为回调机制消除了不必要的阻塞轮询开销。最后回头看CompletableFuture 最值得理解的不是它的 API 怎么用——而是它的状态机设计。理解了 result stack 两个字段理解了立即执行 vs 压栈回调的双路径整个框架就不神秘了。如果你只用 Java 8建议升级到 11 或 17超时控制和CompletionStage的各种补齐用法写起来舒服多了。文中引用的 JDK 源码路径java/util/concurrent/CompletableFuture.java —— 核心状态机与所有 stage 方法java/util/concurrent/Future.java —— 原始 Future 接口java/util/concurrent/ForkJoinPool.java —— commonPool 定义完整源码github.com/openjdk/jdk