当前位置: 首页> 文旅> 酒店 > Java 并发(五)—— 线程池

Java 并发(五)—— 线程池

时间:2025/8/26 12:40:30来源:https://blog.csdn.net/weixin_68074170/article/details/141278272 浏览次数:0次
  • 线程池核心参数?(核心线程数、最大线程数、任务队列)
  • 线程池构造方法中除了保存参数以外还要做什么事?(设置线程工厂、任务拒绝策略)
  • 提交任务时线程池要做什么?(任务执行机制)
  • 创建非核心线程时,线程run方法中的内容是什么?(调用runWorker方法getTask()获取任务,然后执行task.run()方法)
  • 如何将提交的任务交给线程运行?(execute / submit
  • 创建线程时需要区分核心线程和非核心线程吗?(需要)
  • 线程池中线程的任务执行完毕以后是什么状态?(TIDYING状态 / TERMINATED 状态)
  • 线程池中的线程如何回收?(消除线程引用后,由JVM进行回收)

一、创建线程池

1.ThreadPoolExecutor 自定义线程池

继承关系

 内部包含四个构造函数如下:

public class ThreadPoolExecutor extends AbstractExecutorService {public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
}

7个核心参数

  1. corePoolSize:the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set

    (核心线程数大小:不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。)

  2. maximumPoolSize:the maximum number of threads to allow in the pool。

    (最大线程数:线程池中最多允许创建 maximumPoolSize 个线程。)

  3. keepAliveTime:when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating。

    (存活时间:如果经过 keepAliveTime 时间后,超过核心线程数的线程还没有接受到新的任务,那就回收。)

  4. unit:the time unit for the {@code keepAliveTime} argument

    (keepAliveTime 的时间单位。)

  5. workQueue:the queue to use for holding tasks before they are executed.  This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method。

    (存放待执行任务的队列:当提交的任务数超过核心线程数大小后,再提交的任务就存放在这里。它仅仅用来存放被 execute 方法提交的 Runnable 任务。所以这里就不要翻译为工作队列了,好吗?不要自己给自己挖坑。)

  6. threadFactory:the factory to use when the executor creates a new thread。

    (线程工程:用来创建线程工厂。比如这里面可以自定义线程名称,当进行虚拟机栈分析时,看着名字就知道这个线程是哪里来的,不会懵逼。)

  7. handler :the handler to use when execution is blocked because the thread bounds and queue capacities are reached。

    (拒绝策略:当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略。)

2.Executors 工具类(不推荐)

可以看到内部其实还是调用 ThreadPoolExecutor 实现线程池的创建。

public class Executors {//1.固定线程数量的线程池public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}//2.只有一个线程的线程池public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}//3.可根据实际情况调整线程数量的线程池public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}//4.给定的延迟后运行任务或者定期执行任务的线程池public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}
}

Executors 返回线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:使用的是有界阻塞队列是 LinkedBlockingQueue ,其任务队列的最大长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
  • ScheduledThreadPoolSingleThreadScheduledExecutor :使用的无界的延迟阻塞队列 DelayedWorkQueue ,任务队列最大长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。

二、线程池的生命周期管理

线程池内部使用一个变量 ctl 维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:

 转换过程



 上面都是些基础知识,下面进入任务提交到回收的全流程:

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理、线程管理。

  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。
  • 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。 

四、任务执行机制(生产者)

1.提交任务——executor、submit

“线程池中线程异常后:销毁还是复用?” (qq.com)

2.任务调度——execute

所有任务的调度都是由execute方法完成的。这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {  //线程数<核心线程数if (addWorker(command, true))  //增加核心线程return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {  //加入阻塞队列int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);  //任务拒绝else if (workerCountOf(recheck) == 0)  //线程数<最大线程数addWorker(null, false);  //增加非核心线程}else if (!addWorker(command, false))reject(command);}

3.任务缓冲——Queue 

4.任务申请——getTask  

5.任务拒绝

用户可以通过实现 RejectedExecutionHandler 接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略 

四、Worker 线程管理(消费者)

1.线程池内部类——Worker

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。


Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。

  • thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;
  • firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  • lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 如果正在执行任务,则不应该中断线程。
  • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
  private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;// 通过ThreadFactory来创建线程Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 线程执行任务public void run() {runWorker(this);}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}//AQS 实现独占锁// The value 0 represents the unlocked state.// The value 1 represents the locked state.public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

2.Worker线程增加——addWorker

图片中的上面一排是核心线程自带firstWorker任务执行,下面则是非核心线程取任务执行,由上面的executor方法里也可以看出来。

    private final ReentrantLock mainLock = new ReentrantLock();private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);   //创建线程workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}}

线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示: 

3.Worker线程执行任务——runWorker

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//1.while循环不断地通过getTask()方法获取任务//2.getTask()方法从阻塞队列中取任务while (task != null || (task = getTask()) != null) {//AQS实现独占锁w.lock();//3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {//4.执行任务task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程processWorkerExit(w, completedAbruptly); }}

4.Worker线程回收——processWorkerExit

    private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<>();private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);  //将线程移出线程池} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。



五、动态设置线程池参数

如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。 (qq.com)



六、参考

 Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)

关键字:Java 并发(五)—— 线程池

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: