系列文章目录

文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第十一轮对话-修正运行时数据
- 三、修正任务计数器
- 四、第十二轮对话-生成单元测试
- 五、验证通过
- 七、参考文章
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,并且接入了监听器,但还是存在一些问题,限于篇幅,本文继续优化监听器。
本多线程框架使用示例如下:源码地址
1、引入依赖。
<dependency><groupId>io.github.vipjoey</groupId><artifactId>mmc-juc</artifactId><version>1.0</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法.taskMerger(Integer::sum) // 设置结果处理方法(可选).threshold(10) // 设置任务处理阈值(可选).taskName("mmcTaskExample") // 设置任务名称.rateLimiter(10, 20) // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4.build();// 同步执行并打印结果
System.out.println("result: " + mmcTaskExecutor.execute());// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms
result: 5050
二、第十一轮对话-修正运行时数据
1、上文提到,它在MmcTask来初始化运行时数据,这明显是不对的。TaskRuntime 应该从MmcTaskExecutor中首次提交任务时初始化并透传。
2、以下是AI的回复。
- 【抱歉,我理解了您的问题。我们可以将TaskRuntime的初始化从MmcTask构造函数中移除,并将其传递给MmcTask。这样,在任务fork时,TaskRuntime将保持不变。首先,我们需要修改MmcTask类以接收TaskRuntime作为参数。】
import java.util.List;
import java.util.concurrent.RecursiveTask;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private TaskRuntime taskRuntime;public MmcTask(Builder<T, R> builder) {// ...省略其他初始化代码this.taskRuntime = builder.taskRuntime;}@Overrideprotected R compute() {if (end - start <= threshold) {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(taskRuntime);// ...省略其他代码// 在任务完成后更新已完成任务的计数taskRuntime.getCompletedTasks().incrementAndGet();taskRuntime.getRemainingTasks().decrementAndGet();} else {// ...省略其他代码MmcTask<T, R> leftTask = new Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).start(start).end(mid).rateLimiter(rateLimiter).taskListener(taskListener).taskRuntime(taskRuntime).build();MmcTask<T, R> rightTask = new Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).start(mid).end(end).rateLimiter(rateLimiter).taskListener(taskListener).taskRuntime(taskRuntime).build();// ...省略其他代码}}public static class Builder<T, R> {// ...省略其他成员变量private TaskRuntime taskRuntime;// ...省略其他方法public Builder<T, R> taskRuntime(TaskRuntime taskRuntime) {this.taskRuntime = taskRuntime;return this;}public MmcTask<T, R> build() {return new MmcTask<>(this);}}
}
- 【接下来,我们需要修改MmcTaskExecutor类以在创建MmcTask时传递TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法// 提交新任务public R submitTask(MmcTask<T, R> mmcTask) {// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// 同步执行并返回结果public R execute() {TaskRuntime taskRuntime = new TaskRuntime(taskName, taskSource.size());MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()// ...省略其他方法调用.taskRuntime(taskRuntime).build();return submitTask(mmcTask);}
}
三、修正任务计数器
AI这次修改基本上符合我的诉求,现在就剩下计数器问题了。上文我提到过,AI理解的累计完成任务个数和我理解的任务个数不一样,可能跟我的提示词表达方式有关(有根更好表达方式的同学可以评论区回复留言),这里我就不纠结了,直接给他修正一下。
1、监听器的默认实现方式。
package com.mmc.juc;public class DefaultMmcTaskListener implements MmcTaskListener {@Overridepublic void onTasksSubmitted(TaskRuntime taskRuntime) {System.out.println("[" + taskRuntime.getTaskName() + "] Tasks submitted. Total tasks: " + taskRuntime.getTotalTasks());}@Overridepublic void onTaskStarted(TaskRuntime taskRuntime) {System.out.println("[" + taskRuntime.getTaskName() + "] Task started. Completed tasks: " + taskRuntime.getCompletedTasks().get() + ", remaining tasks: " + taskRuntime.getRemainingTasks().get());}@Overridepublic void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime) {System.out.println("[" + taskRuntime.getTaskName() + "] Tasks completed. Elapsed time: " + elapsedTime + " ms ");}
}
2、修正compute方法,修正计数器计数方式。
package com.mmc.juc;import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;@Getter
public class MmcTask<T, R> extends RecursiveTask<R> {// 省略其他代码@Overrideprotected R compute() {// 在处理任务之前尝试消耗令牌while (!rateLimiter.tryConsume()) {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}if (end - start <= threshold) {R result = taskProcessor.process(taskSource.subList(start, end));// 在任务完成后更新已完成任务的计数taskRuntime.getCompletedTasks().addAndGet(end - start);taskRuntime.getRemainingTasks().set(taskRuntime.getTotalTasks() - taskRuntime.getCompletedTasks().get());// 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数taskListener.onTaskStarted(taskRuntime);return result;}int middle = (start + end) / 2;MmcTask<T, R> leftTask = new MmcTask.Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).start(start).end(middle).rateLimiter(rateLimiter).taskListener(taskListener).taskName(taskName).taskRuntime(taskRuntime).build();MmcTask<T, R> rightTask = new MmcTask.Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).start(middle).end(end).rateLimiter(rateLimiter).taskListener(taskListener).taskName(taskName).taskRuntime(taskRuntime).build();leftTask.fork();R rightResult = rightTask.compute();R leftResult = leftTask.join();if (taskMerger != null) {return taskMerger.merge(leftResult, rightResult);} else {return null;}}
}
四、第十二轮对话-生成单元测试
1、我们要求AI生成单元测试,验证整个框架计算逻辑是否准确。
2、以下是AI的回复。
- 【为了验证整个框架的计算逻辑、监听器、限流和执行方式的准确性,我们可以创建一个单元测试。在这个测试中,我们将使用1加到100的任务作为示例。以下是一个单元测试的示例:】
package com.mmc.juc;import static org.junit.Assert.assertEquals;import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;public class MmcTaskExecutorTest {@Testpublic void testMmcTaskExecutor() {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个任务处理器,将每个整数相加MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();// 创建一个任务合并器,将所有整数的和合并MmcTaskMerger<Integer> taskMerger = Integer::sum;// 创建一个任务监听器,打印任务执行过程中的相关信息MmcTaskListener taskListener = new DefaultMmcTaskListener();// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = new MmcTaskExecutor.Builder<Integer, Integer>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).taskListener(taskListener).rateLimiter(10, 10) // 每秒处理10个任务.taskName("SumTask").build();// 执行任务并获取结果Integer result = mmcTaskExecutor.execute();// 验证结果是否正确(1到100的和等于5050)assertEquals(5050, result.intValue());}
}
五、验证通过
1、我们运行一下单元测试,看看效果。
[SumTask] Tasks submitted. Total tasks: 100
[SumTask] Task started. Completed tasks: 7, remaining tasks: 87
[SumTask] Task started. Completed tasks: 13, remaining tasks: 87
[SumTask] Task started. Completed tasks: 19, remaining tasks: 81
[SumTask] Task started. Completed tasks: 26, remaining tasks: 74
[SumTask] Task started. Completed tasks: 32, remaining tasks: 68
[SumTask] Task started. Completed tasks: 38, remaining tasks: 62
[SumTask] Task started. Completed tasks: 44, remaining tasks: 56
[SumTask] Task started. Completed tasks: 50, remaining tasks: 50
[SumTask] Task started. Completed tasks: 56, remaining tasks: 44
[SumTask] Task started. Completed tasks: 62, remaining tasks: 38
[SumTask] Task started. Completed tasks: 68, remaining tasks: 32
[SumTask] Task started. Completed tasks: 74, remaining tasks: 26
[SumTask] Task started. Completed tasks: 80, remaining tasks: 20
[SumTask] Task started. Completed tasks: 87, remaining tasks: 13
[SumTask] Task started. Completed tasks: 93, remaining tasks: 7
[SumTask] Task started. Completed tasks: 100, remaining tasks: 0
[SumTask] Tasks completed. Elapsed time: 2161 ms
很完美!整个框架运行正常!下一篇,我们将发布这个小巧玲珑的框架,让全世界都看到他,作为第一个版本的里程碑!
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你
![]() | ![]() |