心态决定一切
“平安是幸,知足是福,清心是禄,寡欲是寿。”此言揭示了一个重要的人生哲学:内心的平静比任何外在的成功都更为珍贵。一个懂得珍惜当下、不贪婪的人,往往更容易获得真正的幸福。同时,它也提醒我们要学会控制自己的欲望,避免被过多的需求所累。
坚持不懈的力量
“滴水穿石,不是力量大,而是功夫深。”这句名言形象地比喻了持续努力所能带来的巨大影响。即使是最微小的努力,只要坚持不懈地积累下去,最终也能达成看似不可能的目标。这种精神鼓励我们在遇到困难时不轻易放弃,而是要相信时间的力量,一步一个脚印地向前迈进。
目录
介绍
核心组件
优点
示例代码
使用RecursiveTask实现带返回值的任务
使用RecursiveAction实现不带返回值的任务
在Fork/Join框架中使用线程池
创建ForkJoinPool
提交任务给ForkJoinPool
关闭ForkJoinPool
示例:利用ForkJoinPool进行数组求和
介绍
Fork/Join框架是Java 7引入的一项重要特性,它特别适合处理可以被递归地分割成更小子问题的任务。该框架不仅实现了分治算法(divide-and-conquer),还通过工作窃取算法(work-stealing algorithm)来优化多核处理器上的任务执行效率。接下来我们将详细介绍Fork/Join框架的核心组件、其优点以及如何使用RecursiveTask<V>
和RecursiveAction
类实现具体的并行计算任务。
核心组件
-
ForkJoinPool:这是Fork/Join框架中的线程池实现,用于管理一组工作线程,并支持“工作窃取”机制。每个线程都有自己的双端队列(deque),用来存放待执行的任务。当某个线程完成了自己队列中的所有任务时,它可以去其他线程的队列末端“偷取”任务来执行,从而保证了所有CPU核心都能得到充分利用。
-
ForkJoinTask:这是所有Fork/Join任务的基础抽象类,提供了
fork()
方法用于异步启动子任务,以及join()
方法等待子任务完成并获取结果。根据是否需要返回值,可以选择继承RecursiveTask<V>
(有返回值)或RecursiveAction
(无返回值)。这两个类都定义了一个抽象方法compute()
,用户需要在这个方法中实现具体的任务逻辑。
优点
- 自动负载均衡:由于采用了工作窃取策略,即使某些线程提前完成了分配给它们的任务,也可以继续参与其他未完成的工作,避免了资源闲置的情况发生。
- 高效利用多核处理器:对于那些能够自然地分解为多个独立子任务的问题,如排序、搜索或者大规模数据处理等,Fork/Join框架可以显著提升程序性能,因为它能更好地适应现代计算机架构中的多核特性。
示例代码
使用RecursiveTask<V>
实现带返回值的任务
下面是一个简单的例子,展示了如何用RecursiveTask<Integer>
来计算一个整数数组元素之和:
import java.util.concurrent.RecursiveTask; import java.util.Random;public class SumCalculator extends RecursiveTask<Integer> {private static final int THRESHOLD = 10; // 阈值,决定何时停止拆分任务private int[] numbers;private int start, end;public SumCalculator(int[] nums, int s, int e) {this.numbers = nums;this.start = s;this.end = e;}@Overrideprotected Integer compute() {if (end - start <= THRESHOLD) { // 如果任务足够小,则直接计算结果int sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;} else { // 否则进一步拆分任务int middle = (start + end) / 2;SumCalculator leftTask = new SumCalculator(numbers, start, middle);SumCalculator rightTask = new SumCalculator(numbers, middle, end);// 异步执行左侧任务leftTask.fork();// 同步执行右侧任务int rightResult = rightTask.compute();// 等待左侧任务完成并合并结果return leftTask.join() + rightResult;}}public static void main(String[] args) throws Exception {int[] arr = new Random().ints(1000000, 0, 100).toArray(); // 生成随机数组ForkJoinPool pool = new ForkJoinPool(); // 创建ForkJoinPool实例SumCalculator task = new SumCalculator(arr, 0, arr.length); // 初始化任务System.out.println("Sum: " + pool.invoke(task)); // 提交任务并打印结果} }
使用RecursiveAction
实现不带返回值的任务
如果我们有一个不需要返回值的任务,比如遍历文件夹查找特定类型的文件,我们可以选择使用RecursiveAction
。这里给出一个简化版的例子:
import java.io.File; import java.util.concurrent.RecursiveAction;public class FileSearcher extends RecursiveAction {private final File directory;private final String fileType;public FileSearcher(File dir, String type) {this.directory = dir;this.fileType = type;}@Overrideprotected void compute() {File[] files = directory.listFiles();if (files != null) {for (File file : files) {if (file.isDirectory()) {FileSearcher subtask = new FileSearcher(file, fileType);subtask.fork(); // 异步执行子任务} else if (file.getName().endsWith(fileType)) {System.out.println("Found: " + file.getAbsolutePath());}}}}public static void main(String[] args) throws Exception {ForkJoinPool pool = new ForkJoinPool();File rootDir = new File("/path/to/search"); // 替换为实际路径String extension = ".txt"; // 查找文本文件pool.invoke(new FileSearcher(rootDir, extension));} }
以上两个例子分别演示了如何在Java中使用RecursiveTask<V>
和RecursiveAction
来创建并行任务。通过这种方式,开发者可以轻松地将复杂任务分解为更小的部分,并利用多核处理器的优势加速任务的完成。
在Fork/Join框架中使用线程池
在Fork/Join框架中使用线程池主要是通过ForkJoinPool
类来实现的。ForkJoinPool
是专门为执行Fork/Join任务设计的一种特殊类型的线程池,它不仅能够有效地管理线程资源,还实现了工作窃取算法以提高多核处理器上的并行计算效率。
创建ForkJoinPool
你可以通过多种方式创建一个ForkJoinPool
实例。最简单的方法是调用无参构造函数,这将创建一个默认配置的线程池,其并行度等于当前机器上的可用处理器数量。如果你想要自定义线程池的行为,比如设置特定的并行级别或指定异常处理程序,则可以通过带参数的构造函数来完成。
// 使用默认配置创建ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool();// 或者根据需要定制化创建 int parallelism = Runtime.getRuntime().availableProcessors(); // 获取CPU核心数 ForkJoinPool customPool = new ForkJoinPool(parallelism);
此外,从Java 8开始,ForkJoinPool
提供了一个静态方法commonPool()
,可以用来获取一个公共的ForkJoinPool实例,这个实例被所有未明确指定线程池的任务所共享。
ForkJoinPool commonPool = ForkJoinPool.commonPool();
提交任务给ForkJoinPool
一旦有了ForkJoinPool
对象,就可以向其中提交任务了。对于有返回值的任务,应该继承RecursiveTask<V>
;而对于没有返回值的任务,则应选择RecursiveAction
。提交任务的方式有两种:一种是直接调用invoke()
方法同步执行任务,另一种是使用submit()
异步提交任务,并通过Future
接口获取结果。
// 同步执行任务 SumTask sumTask = new SumTask(array, 0, array.length); long result = forkJoinPool.invoke(sumTask);// 异步提交任务 ForkJoinTask<Long> asyncTask = forkJoinPool.submit(() -> {SumTask task = new SumTask(array, 0, array.length);return task.invoke(); }); try {long asyncResult = asyncTask.get(); // 等待任务完成并获取结果 } catch (InterruptedException | ExecutionException e) {e.printStackTrace(); }
关闭ForkJoinPool
当不再需要ForkJoinPool
时,应当调用它的shutdown()
方法来停止接受新的任务,并等待已提交的任务完成。如果希望立即终止所有正在运行的任务,则可以调用shutdownNow()
。不过需要注意的是,在大多数情况下,推荐使用shutdown()
而不是强制关闭,因为后者可能会导致数据丢失或其他不稳定行为。
forkJoinPool.shutdown(); try {if (!forkJoinPool.awaitTermination(60, TimeUnit.SECONDS)) {forkJoinPool.shutdownNow();} } catch (InterruptedException e) {forkJoinPool.shutdownNow(); }
示例:利用ForkJoinPool进行数组求和
下面是一个具体的例子,展示了如何使用ForkJoinPool
来进行数组元素求和的操作。我们将数组分为多个部分,每个部分作为一个单独的任务交给线程池处理,最后汇总各个子任务的结果得到最终答案。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;public class ArraySumExample {static class SumTask extends RecursiveTask<Long> {private static final int THRESHOLD = 1000; // 设定阈值private final long[] array;private final int start;private final int end;public SumTask(long[] arr, int s, int e) {this.array = arr;this.start = s;this.end = e;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) { // 如果任务足够小,直接计算long sum = 0L;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else { // 否则继续拆分任务int middle = (start + end) / 2;SumTask left = new SumTask(array, start, middle);SumTask right = new SumTask(array, middle, end);left.fork(); // 异步执行左侧任务Long rightResult = right.compute(); // 同步执行右侧任务return left.join() + rightResult; // 汇总结果}}}public static void main(String[] args) throws Exception {long[] largeArray = new long[1_000_000]; // 创建一个较大的数组for (int i = 0; i < largeArray.length; i++) {largeArray[i] = i + 1L; // 初始化数组元素}ForkJoinPool pool = new ForkJoinPool(); // 创建ForkJoinPoolSumTask task = new SumTask(largeArray, 0, largeArray.length); // 创建任务long startTime = System.currentTimeMillis();long totalSum = pool.invoke(task); // 执行任务并获得结果long endTime = System.currentTimeMillis();System.out.println("Total sum: " + totalSum);System.out.println("Time taken: " + (endTime - startTime) + " ms");pool.shutdown(); // 关闭线程池} }
在这个示例中,我们定义了一个名为SumTask
的类,它继承自RecursiveTask<Long>
,用于表示一个可以递归分割成更小任务的工作单元。然后,我们在主函数里创建了一个包含一百万个整数的数组,并将其作为输入传递给SumTask
实例。接着,我们创建了一个ForkJoinPool
实例并将任务提交给它执行。最后,我们打印出计算得到的总和以及花费的时间,并确保正确地关闭了线程池。
综上所述,ForkJoinPool
为开发者提供了一种强大而灵活的方式来构建高性能的并行应用程序,特别是在面对那些能够自然地分解为多个独立子任务的问题时表现尤为出色。通过合理地配置和使用ForkJoinPool
,我们可以充分利用现代计算机硬件的能力,显著提升程序性能.