【1】为什么要使用线程池?
示例演示:
//设置业务模拟class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int getCount() { return count; } @Override public void run() { for (int i = 0; i < 100000; i++) { count += i; } System.out.println("结果:"+count); }}//模拟线程池复用线程执行业务public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { executorService.execute(myRunnable); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("时间:"+(System.currentTimeMillis() - start));}//模拟每次执行业务都开一个线程public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(myRunnable); thread.start(); thread.join(); } System.out.println("时间:" + (System.currentTimeMillis() - start));}
示例结果:
采用每次都开一个线程的结果是292毫秒,而线程池的是69毫秒。(随着业务次数的增多这个数值的差距会越大)
示例说明:
如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。
如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。(说明了我们什么时候使用线程池:1.单个任务处理时间比较短;2.需要处理的任务数量很大;)
线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使用应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足的情况。
【2】线程池的介绍
(1)线程池优势
1.重用存在的线程,减少线程创建,消亡的开销,提高性能
2.提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
3.提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
(2)常见线程池
1.newSingleThreadExecutor :单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
2.newFixedThreadExecutor(n) :固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
3.newCacheThreadExecutor(推荐使用) :可缓存线程池, 当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。
4.newScheduleThreadExecutor :大小无限制的线程池,支持定时和周期性的执行线程
5.常见线程池的说明
在阿里的开发手册中其实不推荐我们使用默认的线程池,为什么?
【1】Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
【2】其次newCacheThreadExecutor,没有核心线程数,且非核心线程数是最大值,不断创建线程容易出现CPU100%的问题。
(3)默认线程池
1.ThreadPoolExecutor
1)说明
实际上不管是newSingleThreadExecutor,newFixedThreadExecutor还是newCacheThreadExecutor,他们都是使用ThreadPoolExecutor去生成的。
只不过由于参数不同导致产生的线程池的不同,因此,我们常使用是ThreadPoolExecutor去自建自己想要的线程池。
2)参数解析
1.corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
2.maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
3.keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
4.unit
keepAliveTime的单位;
5.workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
6.threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
7.handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。(自定义的才是最常用的)
【3】线程池相关的类分析
1.ExecutorService接口与Executor接口
//定义了一个用于执行Runnable的execute方法public interface Executor { void execute(Runnable command);}/** * 接口ExecutorService,其中定义了线程池的具体行为 * 1,execute(Runnable command):履行Ruannable类型的任务, * 2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象 * 3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务, * 4,shutdownNow():停止所有正在履行的任务并封闭办事。 * 5,isTerminated():测试是否所有任务都履行完毕了。 * 6,isShutdown():测试是否该ExecutorService已被关闭。 */public interface ExecutorService extends Executor { // 停止线程池 void shutdown(); // 立即停止线程池,返回尚未执行的任务列表 List shutdownNow(); // 线程池是否停止 boolean isShutdown(); // 线程池是否终结 boolean isTerminated(); // 等待线程池终结 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable类型任务 Future submit(Callable task); // 提交Runnable类型任务,预先知道返回值 Future submit(Runnable task, T result); // 提交Runnable类型任务,对返回值无感知 Future submit(Runnable task); // 永久阻塞 - 提交和执行一个任务列表的所有任务 List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException; // 带超时阻塞 - 提交和执行一个任务列表的所有任务 List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和执行一个任务列表的某一个任务 T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException; // 带超时阻塞 - 提交和执行一个任务列表的某一个任务 T invokeAny(Collection<? extends Callable> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
2.抽象类AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable); } public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask; } public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; } ....}
3.ThreadPoolExecutor类
public class ThreadPoolExecutor extends AbstractExecutorService {... public void execute(Runnable command) { if (command == null) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }...}
4.ScheduledThreadPoolExecutor类
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {... public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture t = decorateTask(callable, new ScheduledFutureTask(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }...}
5.问题点
1)execute方法与submit方法的区别?
【1】最明显的就是 :
void execute() //提交任务无返回值
Future submit() //任务执行完成后有返回值
【2】另外一个不明显的就是队列的提交方法(add【ScheduledThreadPoolExecutor类中使用】与offer【ThreadPoolExecutor类中使用】)
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}
明显当队列满了的时候,add方法会抛出异常,而offer不会。
【4】线程池的状态分析
1.线程池存在5种状态
1)RUNNING = ‐1 << COUNT_BITS; //高3位为111 运行状态
2)SHUTDOWN = 0 << COUNT_BITS; //高3位为000 关闭状态
3)STOP = 1 << COUNT_BITS; //高3位为001 停止状态
4)TIDYING = 2 << COUNT_BITS; //高3位为010 整理状态
5)TERMINATED = 3 << COUNT_BITS; //高3位为011 销毁状态
2.状态说明
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN
(1)状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1)状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1)状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。
(2)状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1)状态说明:线程池彻底终止,就变成TERMINATED状态。
(2)状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
进入TERMINATED的条件如下:
线程池不是RUNNING状态;
线程池状态不是TIDYING状态或TERMINATED状态;
如果线程池状态是SHUTDOWN并且workerQueue为空;
workerCount为0;
设置TIDYING状态成功。
3.汇总
默认情况下,如果不调用关闭方法,线程池会一直处于 RUNNING 状态,而线程池状态的转移有两个路径:当调用 shutdown() 方法时,线程池的状态会从 RUNNING 到 SHUTDOWN,再到 TIDYING,最后到 TERMENATED 销毁状态;当调用 shutdownNow() 方法时,线程池的状态会从 RUNNING 到 STOP,再到 TIDYING,最后到 TERMENATED 销毁状态。
4.图示
【5】线程池的源码解析
1.针对自定义线程池的运行分析
1)示例代码:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10));//自定义线程for (int i = 1; i <= 100; i++) { threadPoolExecutor.execute(new MyTask(i));}
2)示例结果:
3)示例疑问:
输出的顺序并不是预想的1-5,6-10,11-15,16-20。反而是1-5,16-20,6-10,11-15。(深入源码查探原因)
2.针对自定义线程池ThreadPoolExecutor类的运行分析
1)ThreadPoolExecutor类重要属性private final AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //默认值-536870912private static final int COUNT_BITS = Integer.SIZE - 3; //默认值29,转为2进制11101private static final int CAPACITY = (1 << COUNT_BITS)-1; //默认值536870911,转为2进制11111111111111111111111111111private static final int RUNNING = -1 << COUNT_BITS; //-536870912private static final int SHUTDOWN = 0 << COUNT_BITS; //0private static final int STOP = 1 << COUNT_BITS; //536870912private static final int TIDYING = 2 << COUNT_BITS; //1073741824private static final int TERMINATED = 3 << COUNT_BITS; //1610612736//ctl相关方法private static int runStateOf(int c) { return c & ~CAPACITY; } //runStateOf:获取运行状态;//~x=-(x+1) //默认值0private static int workerCountOf(int c) { return c & CAPACITY; } //workerCountOf:获取活动线程数; //默认值0,当线程数+1是值也会+1private static int ctlOf(int rs, int wc) { return rs | wc; } //ctlOf:获取运行状态和活动线程数的值。//默认值-536870912说明:ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),
可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。PS:1.&和&&的区别 相同点: 最终得到的boolean值结果一样,都是“并且and”的意思不同点: &既是逻辑运算符也是位运算符;&&只是逻辑运算符 &不具有短路效果,即左边false,右边还会执行;&&具有短路效果,左边为false,右边则不执行2.| 和 || 的区别 相同点: 最终得到的boolean值结果一样,都是“或者or”的意思不同点: | 既是逻辑运算符也是位运算符;|| 只是逻辑运算符 | 不具有短路效果,即左边true,右边还会执行;|| 具有短路效果,左边为true,右边则不执行
2)ThreadPoolExecutor类#execute方法【这里涉及到一个概念,提交优先级: 核心线程>队列>非核心线程】
展示
public void execute(Runnable command) { if (command == null) //不能提交空任务 throw new NullPointerException(); int c = ctl.get(); //获取运行的线程数 //核心线程数不满 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //在addWorker中创建工作线程执行任务 return; c = ctl.get(); } //线程还在运行,且核心数满了,放入线程池队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//线程池是否处于运行状态,如果不是,则刚塞入的任务要移除 reject(command); //走拒绝策略 //这一步其实没有很大意义,除非出现线程池所有线程完蛋了,但是队列还有任务的情况。(一般是进入时时运行态,然后遇到状态变更的情况) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常 else if (!addWorker(command, false)) reject(command); //走拒绝策略}
说明
在正常运行状态下,线程池:核心线程执行任务-》塞入队列-》非核心线程执行任务。
体现了在并发不激烈的情况下,尽量减少创建线程的操作,用已有的线程。而且核心线程数并不是提前创建的,而是用到的时候才会创建。而且核心线程数不满,优先以创建线程来执行任务。
逻辑展示
3)ThreadPoolExecutor类#addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //获取线程池的状态 int c = ctl.get(); int rs = runStateOf(c); //如果是非运行状态(因为只有运行状态是负数) if (rs >= SHUTDOWN && ! //判断是不是关闭状态,不接收新任务,但能处理已添加的任务 //任务是不是空任务,队列是不是空(这一步说明了关闭状态不接受任务) (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取活动线程数 int wc = workerCountOf(c); //检验线程数是否大于容量值【这是避免设置的非核心线程数没有限制大小】 //根据传入参数判断核心线程数与非核心线程数是否达到了最大值 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //尝试增加workerCount数量【也就是活跃线程数+1】,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //线程启动标志 boolean workerAdded = false; //线程添加标志 Worker w = null; try { //根据firstTask来创建Worker对象,每一个Worker对象都会创建一个线程 w = new Worker(firstTask); //【调用1】 final Thread t = w.thread; //如果过线程不为空,则试着将线程加入工作队列中 if (t != null) { final ReentrantLock mainLock = this.mainLock; //加重入锁 mainLock.lock(); try { // 重新获取线程的状态 int rs = runStateOf(ctl.get()); //是否线程池正处于运行状态 if (rs < SHUTDOWN || //线程池是否处于关闭状态 且 传入的任务为空(说明关闭状态还是能添加工作者,但是不允许添加任务) (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //判断线程是否存活 throw new IllegalThreadStateException(); //workers是一个HashSet,将该worker对象添加其中 workers.add(w); //记录线程工作者的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //修改添加标记 workerAdded = true; } } finally { //解锁 mainLock.unlock(); } //如果添加成功,则启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//【调用2】 } return workerStarted;}
//调用1Worker(Runnable firstTask) { setState(-1); // 创建时不允许中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);}//调用2:添加工作者失败方法private void addWorkerFailed(Worker w) { //加重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除工作者 if (w != null) workers.remove(w); //任务数量减一 decrementWorkerCount(); //进入整理状态() tryTerminate(); } finally { mainLock.unlock(); }}
说明
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?
可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
1)lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
2)如果正在执行任务,则不应该中断线程;
3)如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
4)线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
5)之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS(AbstractQueuedSynchronizer类),用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断。tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
4)ThreadPoolExecutor类#runWorker方法【这里有涉及到一个概念,执行优先级:核心线程>非核心线程>队列】
代码展示
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //取出任务 w.firstTask = null; //将工作者持有任务清空 w.unlock(); //将线程置为可中断,因为创建时候设置不可中断 boolean completedAbruptly = true; // 是否因为异常退出循环 try { //当没有任务的时候,优先从队列里面获取(自旋方式) while (task != null || (task = getTask()) != null) { w.lock(); //如果线程池正在停止,那么要保证当前线程是中断状态; if ((runStateAtLeast(ctl.get(), STOP) || // 如果不是的话,则要保证当前线程不是中断状态;(这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP。STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。) (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //如果不是中断状态,则调用task.run()执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。 processWorkerExit(w, completedAbruptly); }}
汇总说明
总结一下runWorker方法的执行过程:
1)while循环不断地通过getTask()方法获取任务;
2)getTask()方法从阻塞队列中取任务;
3)如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;调用task.run()执行任务;
4)如果task为null则跳出循环,执行processWorkerExit()方法;
5)ThreadPoolExecutor类#getTask方法
代码展示
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; for (;;) { //获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); //是否是非运行状态(因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。) //如果是非运行状态:是不是STOP,TIDYING,TERMINATED ,三种状态之一;或者队列为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //如果以上条件满足,则将workerCount减1并返回null decrementWorkerCount(); //CAS线程数减一 return null; } //重新获取线程的数量 int wc = workerCountOf(c); //allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; //wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;对于超过核心线程数量的这些线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//有非核心线程必定为true //wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; //timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时。 if ((wc > maximumPoolSize || (timed && timedOut)) //接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; //如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //如果减1失败,则返回重试。 continue; } try { //根据timed来判断,如果为true(大概率是有非核心线程),则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; //否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //判断任务超时 } catch (InterruptedException retry) { // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } }}
汇总说明
运行状态下(这种情况下会把超出核心线程数的部分进入回收,也有一定概率回收核心线程):
情况1:当有非核心线程数的时候,timed为true,导致调用poll方法,这时候如果没有任务且超时,timedOut变为true,第二次进入自旋,timed还是true,进入判断会走compareAndDecrementWorkerCount,线程数减一,并返回null。(这种情况存在极端情况就是,全部线程走到同一逻辑去减,导致全部线程数都被减完了【即时有着wc > 1的判断,因为多线程并发情况,你懂得】)
情况2:没有非核心线程数,timed为false,导致调用take方法,线程一致阻塞直至,拿到任务。(这时候不存在减少线程)
非运行状态下(这种情况下是线程都会进入回收):
情况3:如果线程状态是STOP,TIDYING,TERMINATED,那么调用decrementWorkerCount,线程数减一,返回null。
情况4:如果线程状态是SHUTDOWN,队列不为空,则继续任务,如果队列为空,那么调用decrementWorkerCount,线程数减一,返回null。
所以,综上所述,非核心线程和核心线程其实都存在被回收的概率。
6)ThreadPoolExecutor类#processWorkerExit方法
代码展示
//主要用于线程的清理工作private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; //从workers中移除,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } finally { mainLock.unlock(); } //根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); //当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;如果是其他三种,就不会去补Worker。 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果allowCoreThreadTimeOut=true(可设置),并且等待队列有任务,至少保留一个worker; if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果allowCoreThreadTimeOut=false(默认值),workerCount不少于corePoolSize。【靠后面的addWorker】 if (workerCountOf(c) >= min) return; } addWorker(null, false); }}
代码说明
通过设置allowCoreThreadTimeOut参数,我们可以选择核心线程的回收,在不用的时候保留一个worker。(这种更适用于某时间段高并发,其余时间段工作量不足的情况)
7)ThreadPoolExecutor类#tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS }}
8)ThreadPoolExecutor类#shutdown方法
//shutdown方法要将线程池切换到SHUTDOWN状态,并调用 interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判断 checkShutdownAccess(); // 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池 tryTerminate();}
9)ThreadPoolExecutor类#interruptIdleWorkers方法
private void interruptIdleWorkers() { interruptIdleWorkers(false);}
//interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。
//为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}
10)ThreadPoolExecutor类#hutdownNow方法
public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中断所有工作线程,无论是否空闲 interruptWorkers(); // 取出队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks;}
11)问题思考
1.在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
(1)在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
(2)shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
(3)在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用workQueue.take()进行阻塞;
(4)如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了workQueue.take()后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了;
(5)由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
(6)解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
(7)但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;
(8)所以Worker继承自AQS,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。
【6】额外拓展
(1)有关阻塞队列部分(可查看java原生阻塞队列详解索引)
(2)有关Future和Callable的部分(可查看 针对Future部分的详解)
tryTerminate