• 作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列
  • 如果感觉博主的文章还不错的话,请三连支持一下博主哦
  • 博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

文章目录

  • 线程池源码剖析
    • 一、引言
    • 二、使用
    • 三、源码
      • 1、初始化
        • 1.1 拒绝策略
          • 1.1.1 AbortPolicy
          • 1.1.2 CallerRunsPolicy
          • 1.1.3 DiscardOldestPolicy
          • 1.1.4 DiscardPolicy
          • 1.1.5 自定义拒绝策略
        • 1.2 其余变量
      • 2、线程池的execute方法
      • 3、线程池的addWorker方法
        • 3.1 校验
        • 3.2 添加线程
      • 4、线程池的 worker 源码
      • 5、线程池的 runWorker 方法
      • 6、线程池的 getTask 方法
      • 7、线程池的 processWorkerExit 方法
      • 8、线程池的关闭方法
        • 8.1 shutdownNow 方法
        • 8.2 shutdown 方法
    • 四、流程图
    • 五、总结

线程池源码剖析

一、引言

线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。

于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马

二、使用

我想大部分人应该都使用过线程池,我们的 JDK 中也提供了一些包装好的线程池使用,比如:

  • newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
  • newSingleThreadExecutor:返回一个核心线程数为 1 的线程池

    public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
  • newCachedThreadPool:大同小异

    public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}

通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建

这时候,我们不免有疑问,我们难道不可以直接使用 ThreadPoolExecutor 的构造方法去进行创建嘛

是的,阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池

所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:

public class ThreadPoolTest {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(2,// 核心线程数5,// 最大线程数200, // 非核心工作线程在阻塞队列位置等待的时间TimeUnit.SECONDS,// 非核心工作线程在阻塞队列位置等待的单位new LinkedBlockingQueue<>(), // 阻塞队列,存放任务的地方new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种);for (int i = 0; i < 10; i++) {MyTask task = new MyTask();executor.execute(task);}// 关闭线程executor.shutdown();}}class MyTask implements Runnable {@Overridepublic void run() {System.out.println("我被执行了....");}}

三、源码

整体的流程如下:

1、初始化

聊源码不从初始化聊的,都是不讲道理的

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}// 执行ThreadPoolExecutor的构造方法进行初始化// corePoolSize: 核心线程数// maximumPoolSize: 最大线程数// keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间// unit: 非核心工作线程在阻塞队列位置等待的时间单位// workQueue: 存放任务的阻塞队列// threadFactory: 线程工厂(生产线程的地方)// RejectedExecutionHandler: 拒绝策略public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 核心线程数可以为0// 最大线程数不为0// 最大线程数 大于 核心线程数// 等待时间大于等于0if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 将当前的入参赋值给成员变量this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

我们上面初始化的过程主要对入参做了一些校验,然后将方法的入参赋予给成员变量

1.1 拒绝策略

1.1.1 AbortPolicy

简单粗暴,直接抛出异常

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());}}
1.1.2 CallerRunsPolicy

当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 如果当前的if (!e.isShutdown()) {r.run();}}}
1.1.3 DiscardOldestPolicy

如果当前的阻塞队列满了,弹出时间最久的

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 获取阻塞队列,弹出一个时间最久的e.getQueue().poll();// 执行当前的e.execute(r);}}}
1.1.4 DiscardPolicy

简单粗暴,不做任何操作

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
1.1.5 自定义拒绝策略

自己写的业务逻辑,可以将拒绝的任务放至数据库等存储,等后续在执行

public static class MyRejectedExecution implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("这是我自己的拒绝策略");}}

1.2 其余变量

// 该数值代表两个意思:// 高3位表示当前线程池的状态// 低29位表示当前线程池工作线程的个数private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// CAPACITY就是当前工作线程能记录的工作线程的最大个数private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。private static final int RUNNING= -1 << COUNT_BITS;// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。private static final int SHUTDOWN =0 << COUNT_BITS;// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。private static final int STOP =1 << COUNT_BITS;// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。private static final int TIDYING=2 << COUNT_BITS;// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。private static final int TERMINATED =3 << COUNT_BITS;// 基于&运算的特点,保证只会拿到ctl高三位的值private static int runStateOf(int c) { return c & ~CAPACITY; }// 基于&运算的特点,保证只会拿到ctl低29位的值private static int workerCountOf(int c){ return c & CAPACITY; }

线程池的状态变化流程图:

2、线程池的execute方法

  • Step1:当前的线程池个数低于核心线程数,直接添加核心线程即可
  • Step2:当前的线程池个数大于核心线程数,将任务添加至阻塞队列中
  • Step3:如果添加阻塞队列失败,则需要添加非核心线程数处理任务
  • Step4:如果添加非核心线程数失败(满了),执行拒绝策略
public void execute(Runnable command) {// 如果当前传过来的任务是null,直接抛出异常即可if (command == null)throw new NullPointerException();// 获取当前的数据值int c = ctl.get();//==========================线程池第一阶段:启动核心线程数开始==================================================// Step1:获取ctl低29位的数值,与我们的核心线程数相比if (workerCountOf(c) < corePoolSize) {// Step2:添加一个核心线程if (addWorker(command, true)){return;}// 更新一下当前值c = ctl.get();}//==========================线程池第一阶段:启动核心线程数结束==================================================// 如果走到下面会有两种情况:// 1、核心线程数满了,需要往阻塞队列里面扔任务// 2、核心线程数满了,阻塞队列也满了,执行拒绝策略//==========================线程池第二阶段:任务放至阻塞队列开始==================================================// 判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)// 如果是Running的状态,则可以将任务放至阻塞队列中// 这里如果放阻塞队列失败了,证明阻塞队列满了if (isRunning(c) && workQueue.offer(command)) {// 再次更新数值int recheck = ctl.get();// 再次校验当前的线程池状态是不是Running// 如果线程池状态不是Running的话,需要删除掉刚刚放的任务if (!isRunning(recheck) && remove(command)){// 执行拒绝策略reject(command);} // 如果到这里,说明上面阻塞队列中已经有数据了// 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务// 不能让人家堵塞着else if (workerCountOf(recheck) == 0){addWorker(null, false);}}//==========================线程池第二阶段:任务放至阻塞队列结束==================================================// 如果走到这里的逻辑,证明上面的逻辑没走通,有以下两种情况:// 1、线程池的状态不是Running//1.1 如果是这种情况,下面的添加非核心工作线程失败执行拒绝策略,但这个并不是这个逻辑的重点// 2、阻塞队列添加任务失败(阻塞队列满了)//2.1 这种情况才是我们需要关心的//2.2 阻塞队列满了,添加非核心工作线程//2.3 若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略//==========================线程池第三阶段:启动非核心线程数开始==================================================// 添加一个非核心工作线程else if (!addWorker(command, false))// 工作队列中添加任务失败,执行拒绝策略reject(command);//==========================线程池第三阶段:启动非核心线程数结束==================================================}

流程图如下:

3、线程池的addWorker方法

3.1 校验

  • 校验当前线程池的状态
  • 校验当前线程池工作线程的个数(核心线程数、最大工作线程数)
private boolean addWorker(Runnable firstTask, boolean core) {// 这里主要是为了结束整个循环retry:for (;;) {// 获取当前线程池的数值(ctl)int c = ctl.get();// runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值int rs = runStateOf(c);//==========================线程池状态判断=============================================================// rs >= SHUTDOWN:代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常// 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)// 如果当前的状态是SHUTDOWN状态并且阻塞队列任务不为空且新任务为空// 需要新起一个非核心工作线程去执行任务// 如果不是前面的,直接返回false即可if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){return false;}//==========================工作线程个数判断==========================================================for (;;) {// 获取当前线程池中线程的个数int wc = workerCountOf(c);// 1、如果线程池线程的个数是否超过了工作线程的最大个数// 2、core=true(核心线程)=false(工作线程)// 2.1 根据当前core判断创建的是核心线程数(corePoolSize)还是非核心线程数(maximumPoolSize)if (wc >= CAPACITY || wc >= (core " />: maximumPoolSize)){return false;}// 尝试将线程池线程加一if (compareAndIncrementWorkerCount(c)){// CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。break retry;}// 获取当前线程池的数值(ctl)c = ctl.get(); // 获取当前线程池的状态// 判断当前线程池的状态等不等于我们上面的rs// 我们线程池的状态被人更改了,需要重新跑整个for循环判断逻辑if (runStateOf(c) != rs){continue retry;}}}// 省略下面的代码}

3.2 添加线程

{// 省略校验的步骤// 两个标记boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 将当前的任务封装成Workerw = new Worker(firstTask);// 拿到当前Worker的线程final Thread t = w.thread;// 线程不为空if (t != null) {// 上锁,保证线程安全(workers、largestPoolSize)final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 获取线程池的状态int rs = runStateOf(ctl.get());// 1、rs < SHUTDOWN:保证当前线程池的状态一定是RUNNING状态// 2、rs == SHUTDOWN && firstTask == null:如果当前线程池是SHUTDOWN状态且新任务为空if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {// 判断线程是否还活着if (t.isAlive())throw new IllegalThreadStateException();// private final HashSet workers = new HashSet();// 添加到我们的work队列中workers.add(w);// 获取works的大小int s = workers.size();// largestPoolSize在记录最大线程个数的记录// 如果当前的大小比最大的还要打,替换即可if (s > largestPoolSize)largestPoolSize = s;// worker添加成功workerAdded = true;}} finally {// 解锁mainLock.unlock();}// 如果添加成功if (workerAdded) {// 启动线程t.start();// 线程启动标志位workerStarted = true;}}} finally {// 如果线程没有启动成功,从workers集合中删除掉该workerif (!workerStarted)addWorkerFailed(w);}// 返回线程是否启动成功return workerStarted;}// Worker的初始化Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 从线程工厂里面拿一个线程出来this.thread = getThreadFactory().newThread(this);}// 从workers集合中删除掉该workerprivate void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}}

4、线程池的 worker 源码

// Worker继承了AQS,目的就是为了控制工作线程的中断。// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作private final class Worker extends AbstractQueuedSynchronizer implements Runnable{private static final long serialVersionUID = 6138294804551838833L;// 当前线程工厂创建的线程(也是执行任务使用的线程)final Thread thread;// 当前的第一个任务Runnable firstTask;// 记录执行了多少个任务volatile long completedTasks;// 构造方法Worker(Runnable firstTask) {// 将State设置为-1,代表当前不允许中断线程setState(-1); // 设置任务this.firstTask = firstTask;// 设置线程this.thread = getThreadFactory().newThread(this);}// 线程启动执行的方法public void run() {runWorker(this);}// =======================Worker管理中断================================ // 当前方法是中断工作线程时,执行的方法void interruptIfStarted() {Thread t;// 只有Worker中的state >= 0的时候,可以中断工作线程if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 如果状态正常,并且线程未中断,这边就中断线程t.interrupt();} catch (SecurityException ignore) {}}} }

5、线程池的 runWorker 方法

public void run() {runWorker(this);}final void runWorker(Worker w) {// 拿到当前的线程Thread wt = Thread.currentThread();// 拿到当前Worker的第一个任务(如果携带的话)Runnable task = w.firstTask;// 置空w.firstTask = null;// 解锁w.unlock(); boolean completedAbruptly = true;try {// 如果任务不等于空 或者 从阻塞队列中拿到的任务不等于空while (task != null || (task = getTask()) != null) {// 加锁w.lock();// 如果线程池状态 >= STOP,确保线程中断了if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 在任务执行前做一些操作,自己实现的钩子beforeExecute(wt, task);Throwable thrown = null;try {// 任务执行task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 任务执行后一些操作:自己实现的钩子afterExecute(task, thrown);}} finally {// 任务置空task = null;// 执行任务+1w.completedTasks++;// 解锁w.unlock();}}completedAbruptly = false;} finally {// 删除线程的方法processWorkerExit(w, completedAbruptly);}}

6、线程池的 getTask 方法

private Runnable getTask() {// 超时的标记boolean timedOut = false; // 死循环拿数据for (;;) {// 拿到当前的ctlint c = ctl.get();// 获取其线程池状态int rs = runStateOf(c);// 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null// 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 线程池中的线程个数减一decrementWorkerCount();return null;}// 当前线程池中线程个数int wc = workerCountOf(c);// 这里是个重点// allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑// wc > corePoolSize:当前线程池中的线程个数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// wc > maximumPoolSize:基本不存在// timed && timedOut:第一次肯定是失败的(超时标记为false)if ((wc > maximumPoolSize || (timed && timedOut))// 1、线程个数为1// 2、阻塞队列是空的&& (wc > 1 || workQueue.isEmpty())) {// 线程池的线程个数减一if (compareAndDecrementWorkerCount(c)){return null;}continue;}try {// 根据我们前面的timed的值(当前线程池中的线程个数是否大于核心线程数)// 如果大于,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)带有时间的等待,超过时间无任务,会返回null// 如果小于,执行workQueue.take(),死等任务,不会返回nullRunnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r != null){return r;}// 到这里,说明上面的等待超时了// 这里要注意一下,如果这里超时后,我们上面 if ((wc > maximumPoolSize || (timed && timedOut)) 这个判断要起作用了// (timed && timedOut) true// wc > 1 || workQueue.isEmpty():当线程大于1或者阻塞队列无数据,直接返回null,让外部循环删除timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

7、线程池的 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 正常退出的:completedAbruptly=false// 不是正常退出的:completedAbruptly=trueif (completedAbruptly) decrementWorkerCount();// 加锁——上锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将当前worker的执行任务数累加到线程池中completedTaskCount += w.completedTasks;// 线程池删除该工作线程workers.remove(w);} finally {// 解锁mainLock.unlock();}tryTerminate();// 获取ctl的数据int c = ctl.get();// 这里只有SHUTDOWN、RUNNING会进入判断if (runStateLessThan(c, STOP)) {// 正常退出的if (!completedAbruptly) {// 是否允许超时// 允许:0// 不允许:核心线程数int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果min=0并且阻塞队列不为空// 将min设置成1if (min == 0 && ! workQueue.isEmpty())min = 1;// 当前线程池的大小大于最小值,直接返回即可if (workerCountOf(c) >= min){return;}}// 如果没有的话,说明线程池中没有线程了,并且还有阻塞任务// 只能添加一个非核心线程去处理这些任务addWorker(null, false);}}

8、线程池的关闭方法

8.1 shutdownNow 方法

  • 将线程池状态修改为Stop(不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管)
  • 将线程池中的线程全部中断
  • 删除当前线程池所有的工作线程
  • 将线程池的状态从:Stop –> TIDYING –> TERMINATED,正式标记线程池的结束(唤醒一下等待的主线程)
public List<Runnable> shutdownNow() {// 声明返回结果List<Runnable> tasks;// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池状态修改为STOPadvanceRunState(STOP);// 将线程池中的线程全部中断interruptWorkers();// 删除当前所有的工作线程tasks = drainQueue();} finally {// 解锁mainLock.unlock();}// 查看当前线程池是否可以变为TERMINATED状态// 从 Stop 状态修改为 TIDYING,在修改为 TERMINATEDtryTerminate();return tasks;}// targetState = STOP// 作用:将当前线程池的状态修改为Stopprivate void advanceRunState(int targetState) {// 进来直接死循环for (;;) {// 拿到当前的ctlint c = ctl.get();// runStateAtLeast(c, targetState):当前的c是不是大于STOP(如果大于Stop的话,说明线程池状态已经G了// 基于CAS,将ctl从c修改为Stop状态,不修改工作线程个数,仅仅将状态修改为Stop// 如果可以修改成功,直接退出即可if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}// 将线程池中的线程全部中断private void interruptWorkers() {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 循环遍历线程组for (Worker w : workers)// 中断线程 // 这里会给线程打一个中断的标记,具体什么时候中断线程,需要我们自己去控制w.interruptIfStarted();} finally {// 解锁mainLock.unlock();}}// 删除当前所有的工作线程private List<Runnable> drainQueue() {// 存放工作线程的队列BlockingQueue<Runnable> q = workQueue;// 返回的结果ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 清空阻塞队列并将数据放入taskList中q.drainTo(taskList);// 校验当前的数据是够真的清空if (!q.isEmpty()) {// 如果确实有遗漏的,毕竟这哥们也没上锁// 手动的将线程从workQueue删除掉并且放到taskList中for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}// 最终返回即可return taskList;}final void tryTerminate() {for (;;) {// 拿到ctlint c = ctl.get();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// CAS将当前的ctl设置成TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 该方法是一个钩子函数,我们自己定义,在线程池销毁之前做最后的处理terminated();} finally {// 将ctl设置成TERMINATED标志着线程池的正式结束ctl.set(ctlOf(TERMINATED, 0));// 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。// 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作// 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程// 简单来说,当时等待线程池返回的主线程,由于线程池已经销毁了,他们也必须要唤醒termination.signalAll();}return;}} finally {// 解锁mainLock.unlock();}}}

8.2 shutdown 方法

public void shutdown() {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 将线程池中的线程全部中断interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 查看当前线程池是否可以变为TERMINATED状态// 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATEDtryTerminate();}

四、流程图

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

我们下期再见。

我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

往期文章推荐:

  • 从源码全面解析LinkedBlockingQueue的来龙去脉
  • 从源码全面解析 ArrayBlockingQueue 的来龙去脉
  • 从源码全面解析ReentrantLock的来龙去脉
  • 阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似
  • 从源码全面解析 ThreadLocal 关键字的来龙去脉
  • 从源码全面解析 synchronized 关键字的来龙去脉
  • 阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试