JUC高并发编程—Fork / Join 📅 2026/6/20 19:46:09 一、Fork/Join 解决什么问题1. 传统 ThreadPoolExecutor 缺点普通线程池任务提交到阻塞队列线程从队列抢任务任务粒度不能拆分。 如果一个超大耗时任务比如遍历 100 万条数据求和丢给单一线程其他线程全部空闲CPU 利用率极低无法充分多核并行。2. Fork/Join 核心思想分而治之类比归并排序Fork拆分大任务拆成若干小任务直到小任务足够简单不用再拆Join合并等待所有子小任务执行完成收集所有子结果汇总成最终结果。一句话通俗理解 大活拆成一堆小活分给多个人干所有人干完把结果汇总最大化利用多核 CPU。3. 适用场景 vs 不适用场景✅ 适合CPU 密集型、可递归拆分、计算量大大数据分段求和、数组归并排序、树形递归遍历、海量数据统计 ❌ 不适合IO 密集型数据库 / 网络等待线程阻塞会严重降低效率二、Fork/Join 底层核心原理重中之重1. 核心类结构plaintextExecutorService 线程池顶层接口 ↑ ForkJoinPool Fork/Join专属线程池替代ThreadPoolExecutor ↓ ForkJoinTaskV 任务抽象类不能直接new ├ RecursiveTaskV 有返回值的拆分任务最常用 └ RecursiveAction 无返回值只执行逻辑不汇总结果2. 最关键机制工作窃取 Work-StealingForkJoinPool 灵魂普通线程池全局共享 1 个队列多线程竞争锁抢任务空闲线程只能等待。 ForkJoinPool每个工作线程自带双端队列 Deque线程自己 fork 拆分出来的子任务放到自己队列队尾线程优先从自己队列尾部取任务执行如果某个线程自己队列为空干完了会去其他忙碌线程的队列头部偷任务执行优势减少锁竞争性能更高不会出现部分线程忙死、部分线程闲置CPU 充分打满双端队列自己取尾部别人偷头部互不冲突。3. Fork、Join 底层流程fork()把拆分后的子任务提交到当前线程的本地双端队列异步执行join()阻塞等待当前子任务完成并获取任务返回结果如果子任务还没执行当前线程会去偷其他任务执行不会空等浪费 CPU递归拆分阈值重点必须设置阈值否则无限递归拆分栈溢出 例数组长度小于 1000 就不拆直接串行计算大于 1000 继续二分拆分。4. ForkJoinPool 线程池参数java运行public ForkJoinPool( int parallelism, // 并行度默认CPU核心数代表最大并发线程数 ForkJoinWorkerThreadFactory factory, // 线程工厂 UncaughtExceptionHandler handler, // 异常处理器 boolean asyncMode // 是否异步模式true只从队列头部取不用于递归合并 )日常开发直接用默认构造new ForkJoinPool()即可并行度等于 CPU 核心数。三、两大任务类区分RecursiveTask / RecursiveAction1. RecursiveTaskV 有返回值需要拆分计算后汇总结果求和、求最大值、数据统计 重写compute()方法返回泛型结果2. RecursiveAction 无返回值只做遍历、写入、打印不需要合并结果批量文件处理、批量数据更新 重写compute()返回 void四、完整代码示例 1RecursiveTask 数组分段求和最经典需求超大数组 1~100000 求和递归二分拆分最后合并所有分段结果java运行import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; // 拆分求和任务有返回值继承RecursiveTask class SumTask extends RecursiveTaskLong { // 数组、计算区间 [start, end] private final long[] arr; private final int start; private final int end; // 阈值区间长度小于该值不再拆分直接计算 private static final int THRESHOLD 1000; public SumTask(long[] arr, int start, int end) { this.arr arr; this.start start; this.end end; } Override protected Long compute() { // 1. 区间长度小于阈值直接串行计算终止递归 if (end - start THRESHOLD) { long sum 0; for (int i start; i end; i) { sum arr[i]; } return sum; } // 2. 区间过大二分拆分 int mid (start end) / 2; SumTask leftTask new SumTask(arr, start, mid); SumTask rightTask new SumTask(arr, mid 1, end); // fork异步提交子任务到本地队列 leftTask.fork(); rightTask.fork(); // join阻塞获取两个子任务结果 Long leftSum leftTask.join(); Long rightSum rightTask.join(); // 合并结果返回 return leftSum rightSum; } } public class ForkJoinSumDemo { public static void main(String[] args) { // 构造10万长度数组存放1~100000 long[] arr new long[100000]; for (int i 0; i arr.length; i) { arr[i] i 1; } // 创建ForkJoin线程池 ForkJoinPool pool new ForkJoinPool(); // 提交根任务 SumTask rootTask new SumTask(arr, 0, arr.length - 1); Long total pool.invoke(rootTask); System.out.println(1~100000总和 total); // 校验结果公式 n*(n1)/2 System.out.println(公式计算校验 100000L * 100001 / 2); pool.shutdown(); } }执行流程拆解根任务区间 0~99999远超阈值 1000拆左右两半两半各自继续 fork 拆分直到所有子任务区间≤1000空闲线程自动窃取其他线程未执行的子任务底层子任务全部计算完成后逐层 join 向上汇总最终得到全局总和。五、代码示例 2RecursiveAction 无返回值批量遍历打印场景只遍历处理数据不需要汇总结果继承 RecursiveActionjava运行import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; class PrintTask extends RecursiveAction { private final int start; private final int end; private static final int THRESHOLD 20; public PrintTask(int start, int end) { this.start start; this.end end; } Override protected void compute() { // 区间小直接打印 if (end - start THRESHOLD) { for (int i start; i end; i) { System.out.print(Thread.currentThread().getName() - i ); } System.out.println(); return; } // 拆分 int mid (start end) / 2; PrintTask left new PrintTask(start, mid); PrintTask right new PrintTask(mid 1, end); // 两个子任务异步执行 left.fork(); right.compute(); // 优化一个fork一个当前线程直接执行减少队列压力 left.join(); } } public class ForkJoinActionDemo { public static void main(String[] args) { ForkJoinPool pool new ForkJoinPool(); PrintTask task new PrintTask(1, 200); pool.invoke(task); pool.shutdown(); } }小优化技巧不要全部都调用fork()可以一个 fork、一个直接调用compute() 减少队列任务数量降低内存开销性能更好。六、关键 API 方法详解1. 任务方法ForkJoinTaskfork()将子任务提交到当前工作线程的本地双端队列异步执行无返回值。join()阻塞等待任务完成返回计算结果线程阻塞时会自动窃取其他任务不浪费 CPU。invoke()同步执行当前任务内部自动完成 forkjoin主线程调用阻塞等待最终结果最常用提交方式。invokeAll(task1,task2)批量 fork 多个任务等价于多个 forkjoin 组合。2. ForkJoinPool 提交任务三种方式invoke(task)同步阻塞直接返回任务结果递归计算首选submit(task)异步提交返回 ForkJoinTask后续可 join/get 获取结果execute(task)异步无返回类似普通线程池 execute3. get () 和 join () 核心区别join()只能抛出未检查异常RuntimeException递归拆分场景专用get()继承 Future 方法会抛出受检异常InterruptedException、ExecutionException需要 try-catch 捕获 递归分治代码优先使用join()代码更简洁。七、工作窃取机制演示对比通俗举例假设 CPU4 核4 条线程 T1/T2/T3/T4T1 分到超大任务不断 fork 拆分本地队列堆满子任务T2/T3/T4 很快干完自己的小任务队列为空T2 去偷 T1 队列头部的子任务执行T3、T4 同理所有 CPU 核心同时干活不会出现资源闲置。普通线程池做不到如果 T1 持有大任务其他线程只能空等全局队列无法拆分窃取。八、Fork/Join 典型业务应用场景场景 1大规模数值计算最主流数组求和、求最大 / 最小值、矩阵运算、统计学批量计算。场景 2分治类算法并行化归并排序、快速排序、二分查找并行优化。场景 3树形结构递归遍历文件夹递归遍历、树形菜单批量数据处理、树节点统计。场景 4海量内存数据批量处理千万级内存集合过滤、转换、聚合非数据库 IO。九、ForkJoinPool 与 ThreadPoolExecutor 对比表格特性ForkJoinPool普通 ThreadPoolExecutor任务模型支持递归拆分fork/join任务不可拆分只能整体执行队列模型每个线程独立双端队列工作窃取全局共享单阻塞队列竞争锁适用负载CPU 密集、可分治计算IO 密集、独立零散小任务线程等待join 阻塞时自动偷任务无空闲get 阻塞线程原地等待CPU 空转结果合并原生支持子任务结果汇总需要手动维护集合汇总 Future十、使用注意事项避坑重点1. 必须设置合理阈值 THRESHOLD阈值太小拆分层级极深创建大量任务对象GC 压力大 阈值太大拆分太少并发度不足多核利用率低 经验值单任务串行执行 1~5ms 左右最合适。2. 禁止在 ForkJoin 任务中执行 IO 阻塞如果子任务出现数据库 / 网络阻塞对应工作线程被占死无法窃取任务并行效率大幅下降。IO 场景使用普通线程池。3. 避免无限递归拆分区间划分逻辑出错会无限 fork直接栈溢出 StackOverflowError。4. 线程池尽量复用不要频繁新建ForkJoinPool 创建开销大全局单例复用不要在循环里 new ForkJoinPool。5. JDK8 Stream 底层就是 ForkJoin并行流list.parallelStream()底层使用 ForkJoinPool 实现分治并行本质是对 Fork/Join 框架的封装。 示例// 并行流求和底层ForkJoin long sum LongStream.rangeClosed(1, 100000).parallel().sum();十一、整体核心总结核心设计分治算法 工作窃取双端队列专门优化多核 CPU 密集计算两大任务RecursiveTask有返回合并、RecursiveAction无返回纯执行核心方法fork 拆分任务、join 阻塞合并结果、invoke 同步执行优势线程空闲时自动窃取任务CPU 利用率拉满局限只适合纯内存计算IO 阻塞场景不推荐优先普通线程池。