什么是Fork/Join框架?
Fork/Join框架是Java 7引入的一种并行计算框架,位于java.util.concurrent包中,旨在帮助开发者分治复杂任务,充分利用多核处理器的并行能力。通过“分而治之”的策略,将大任务拆分成若干小任务(Fork),再将结果合并(Join),以实现更高效的计算。
适用场景
Fork/Join框架特别适用于以下场景:
- 递归任务:可以将任务不断拆分成子任务,并行执行。
- 大数据计算:如数组求和、排序等需要分段处理的任务。
- 分治策略:适合问题规模大,且能够分割成多个独立子问题的场景。
Fork/Join框架的核心概念
- Fork:将任务分解成多个子任务,并将子任务提交到线程池执行。
- Join:等待所有子任务完成,并合并结果。
- ForkJoinTask:代表可分解的任务,是Fork/Join框架的核心抽象类。
- RecursiveTask:继承自ForkJoinTask,表示有返回结果的任务。
- RecursiveAction:继承自ForkJoinTask,表示无返回结果的任务。
代码示例:使用Fork/Join框架求和
下面通过一个示例来讲解如何使用Fork/Join框架计算数组的元素之和。
1. 创建任务类
创建一个继承自RecursiveTask的任务类SumTask,用于将大数组分割成小数组,并行计算每个小数组的和。
import java.util.concurrent.RecursiveTask;public class SumTask extends RecursiveTask<Long> {private static final int THRESHOLD = 1000; // 阈值private long[] array; // 数组private int start; // 起始索引private int end; // 结束索引public SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {// 当任务足够小的时候,直接计算if (end - start <= THRESHOLD) {long sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else {// 将任务一分为二int middle = (start + end) / 2;SumTask leftTask = new SumTask(array, start, middle);SumTask rightTask = new SumTask(array, middle, end);leftTask.fork(); // 异步执行子任务long rightResult = rightTask.compute(); // 计算右侧子任务long leftResult = leftTask.join(); // 等待左侧子任务完成并获取结果return leftResult + rightResult; // 合并结果}}
}
2. 执行Fork/Join任务
在主类中,使用ForkJoinPool来提交任务,并获取最终的计算结果。
import java.util.concurrent.ForkJoinPool;public class Main {public static void main(String[] args) {// 创建随机数组long[] array = new long[5000];for (int i = 0; i < array.length; i++) {array[i] = i + 1;}// 创建ForkJoinPoolForkJoinPool pool = new ForkJoinPool();// 提交SumTaskSumTask task = new SumTask(array, 0, array.length);Long result = pool.invoke(task);System.out.println("Sum: " + result);}
}
输出结果:
Sum: 12502500
通过Fork/Join框架,程序能够充分利用多核处理器的能力,并行执行分割的任务,从而在大数据场景下显著提升计算效率。
Fork/Join与普通线程池的对比
- 更高效的任务分解:普通线程池通常是将任务提交到一个线程执行,而Fork/Join能够将大任务不断拆分成更小的子任务,并在多个核心上并行执行。
- 工作窃取算法(Work-Stealing):Fork/Join框架使用工作窃取算法,如果某个线程完成了自己的任务,会主动从其他线程的队列中窃取任务来执行,从而保持所有线程的高效运转。
使用RecursiveAction执行无返回值任务
在某些场景下,我们可能只需要执行一些操作,而不需要返回结果。此时可以使用RecursiveAction来代替RecursiveTask。
例如,我们可以使用RecursiveAction来将数组中的所有元素加1:
import java.util.concurrent.RecursiveAction;public class IncrementTask extends RecursiveAction {private static final int THRESHOLD = 1000;private long[] array;private int start;private int end;public IncrementTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected void compute() {if (end - start <= THRESHOLD) {for (int i = start; i < end; i++) {array[i]++;}} else {int middle = (start + end) / 2;IncrementTask leftTask = new IncrementTask(array, start, middle);IncrementTask rightTask = new IncrementTask(array, middle, end);invokeAll(leftTask, rightTask);}}
}
有兴趣的小伙伴可以关注我的公众号【知识星球站】一起讨论学习!!