【Java踩坑笔记】15_Stream并行流不是银弹,用错反而更慢

📅 2026/6/30 23:25:11
【Java踩坑笔记】15_Stream并行流不是银弹,用错反而更慢
摘要parallelStream()一键并行听起来很美好。但默认共用ForkJoinPool.commonPool()CP 密集型任务和 IO 任务混在一起会互相拖慢。本文讲清并行流的适用场景和坑点。一、问题现象publicclassParallelStreamTest{publicstaticvoidmain(String[]args){ListIntegerlistIntStream.range(0,1000).boxed().collect(Collectors.toList());// 串行流longstartSystem.currentTimeMillis();list.stream().forEach(i-{try{Thread.sleep(1);}catch(InterruptedExceptione){}});System.out.println(串行耗时(System.currentTimeMillis()-start)ms);// 并行流startSystem.currentTimeMillis();list.parallelStream().forEach(i-{try{Thread.sleep(1);}catch(InterruptedExceptione){}});System.out.println(并行耗时(System.currentTimeMillis()-start)ms);}}运行结果参考值4 核 CP串行耗时约 1000ms 并行耗时约 250ms ← 看起来快了但如果任务是IO 密集型数据库查询、HTTP 调用并行流可能反而更慢因为commonPool的线程被占满其他任务得不到执行。二、踩坑现场场景 1并行流里做数据库查询// ❌ 危险并行流 数据库查询list.parallelStream().forEach(id-{UseruseruserMapper.selectById(id);// IO 操作// ...});问题数据库连接有上限如连接池大小 20并行流默认线程数 CP 核心数 - 1如果 CP 是 8 核并行流用 7 个线程同时查数据库连接池不够用导致等待场景 2并行流修改共享变量// ❌ 并行流修改共享变量线程不安全ListStringresultnewArrayList();list.parallelStream().forEach(i-{result.add(i.toString());// ❌ ArrayList 不是线程安全的});场景 3commonPool被占满影响其他并行流// ❌ 两个并行流共用 commonPool互相竞争list1.parallelStream().forEach(...);// 占满了 commonPoollist2.parallelStream().forEach(...);// 等待反而更慢三、原理解析3.1 并行流的线程池parallelStream()默认使用ForkJoinPool.commonPool()// ForkJoinPool 源码publicstaticForkJoinPoolcommonPool(){returncommonPool;// 全 JVM 共享一个线程池}默认线程数Runtime.getRuntime().availableProcessors() - 13.2 为什么 IO 任务不适合并行流ForkJoinPool.commonPool() - 线程数 CP 核心数 - 1假设 7 个线程 IO 任务数据库查询每次 50ms 7 个线程同时执行 → 7 个查询同时发出 如果连接池只有 5 个连接 → 2 个线程等待 等待的线程占着 commonPool 的线程 → 其他并行流也卡住结论IO 密集型任务会占满commonPool导致整个 JVM 里的其他并行流都变慢。3.3 数据量太小并行流反而慢数据量 10 串行主线程依次处理几乎没有开销 并行切线程 任务拆分 结果合并 → 开销 实际处理时间 → 并行反而慢经验值数据量 1000 或单条处理时间 1ms不要用并行流。四、正确写法4.1 用自定义线程池运行并行流// ✅ 用自定义的 ForkJoinPool 运行并行流不占用 commonPoolForkJoinPoolcustomPoolnewForkJoinPool(4);// 4 个线程try{customPool.submit(()-list.parallelStream().forEach(i-{// ...})).get();}finally{customPool.shutdown();}4.2 CP 密集型任务才用并行流// ✅ 适合并行流纯计算无 IOListLongresultlist.parallelStream().map(i-fibonacci(i))// 纯计算.collect(Collectors.toList());4.3 IO 密集型任务用线程池不用并行流// ✅ IO 任务用线程池ExecutorServiceexecutorExecutors.newFixedThreadPool(10);for(Longid:ids){executor.submit(()-{UseruseruserMapper.selectById(id);// ...});}executor.shutdown();4.4 并行流 线程安全的收集器// ✅ 并行流收集结果用线程安全的收集器ListStringresultlist.parallelStream().map(Object::toString).collect(Collectors.toList());// ✅ Collectors.toList() 内部是线程安全的// ❌ 手动向共享集合添加线程不安全ListStringresultnewArrayList();list.parallelStream().forEach(i-result.add(i.toString()));// ❌五、最佳实践✅ 并行流使用决策表场景是否用并行流替代方案纯 CP 计算数据量 10000✅ 适合-IO 操作数据库、HTTP❌ 不适合线程池数据量 1000❌ 不适合串行流单条处理时间 1ms❌ 不适合串行流需要自定义线程数⚠️ 可以用但要用自定义 ForkJoinPool线程池收集结果到共享集合⚠️ 可以用但要用线程安全的收集器Collectors.toList() 如何判断是否需要并行流基准测试// 用 JMH 做基准测试对比串行和并行的性能// 不要凭感觉决定是否用并行流️ 监控 commonPool 的使用情况// 打印 commonPool 的状态ForkJoinPoolcommonPoolForkJoinPool.commonPool();System.out.println(Pool size: commonPool.getPoolSize());System.out.println(Active threads: commonPool.getActiveThreadCount());六、小结并行流默认使用ForkJoinPool.commonPool()全 JVM 共享IO 密集型任务会占满commonPool影响其他并行流CP 密集型 大数据量才适合用并行流需要并行处理 IO 任务时用自定义线程池不要用并行流并行流收集结果时用Collectors.toList()等线程安全的收集器不确定是否要并行先基准测试别猜第二辑完。—— 下一篇进入第三辑并发编程篇首篇synchronized锁定的是对象不是代码块。