点击Mr.绵羊的知识星球解锁更多优质文章。
目录
一、介绍
1. 简介
2. 分类(按线程池执行任务分类)
3. 架构设计
4. 优点
二、使用场景
1. 快速响应用户请求,响应速度优先
2. 单位时间处理更多请求,吞吐量优先
三、线程池参数
1. 七大核心参数
2. 参数如何配置
四、执行流程
1. 线程池执行流程图
2. 文字+图片描述
五、实际应用
1. 案例一
一、介绍
1. 简介
一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
2. 分类(按线程池执行任务分类)
(1) cpu 密集型任务
cpu 密集型任务,需要线程长时间进行的复杂的运算,这种类型的任务需要少创建线程,过多的线程将会频繁引起上文切换,降低任务处理速度。
(2) io 密集型任务
io 密集型任务,由于线程并不是一直在运行,可能大部分时间在等待 IO 读取/写入数据,增加线程数量可以提高并发度,尽可能多处理任务。
3. 架构设计
(1) Executor
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
(2) ExecutorService接口增加了一些能力
a. 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法。
b. 提供了管控线程池的方法,比如停止线程池的运行。
(3) AbstractExecutorService
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
(4) ThreadPoolExecutor
ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
4. 优点
(1) 降低资源消耗:通过重复利用已创建的线程,降低线程创建和销毁造成的开支。
(2) 提高响应速度:当任务到达时,任务可以不需要等待创建线程,直接使用线程池中创建好的线程。
(3) 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅消耗系统资源还会降低稳定性。
(4) 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
注意:线程池虽然好用,但是使用时一定要知道其线程池参数如何配置和使用,执行流程怎样的,详情见下文。
二、使用场景
1. 快速响应用户请求,响应速度优先
(1) 比如一个用户请求,需要通过 RPC 调用好几个服务去获取数据然后聚合返回,此场景就可以用线程池并行调用,响应时间取决于响应最慢的那个 RPC 接口的耗时。
(2) 某些批量操作,用户请求批量删除10个账号,希望能够快速得到响应,如果删除一个账号需要1秒,删除10个就需要10秒。如果你用了线程池异步执行,显然会很快得到响应结果。具体多快,取决于你服务器的性能和线程池的参数配置。
(3) 或者一个注册请求,注册完之后要发送短信、邮件通知,为了快速返回给用户,可以将该通知操作丢到线程池里异步去执行,然后直接返回客户端成功,提高用户体验。
这样的场景就建议不设置队列或设置短的队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
2. 单位时间处理更多请求,吞吐量优先
比如接受 MQ 消息,然后去调用第三方接口查询数据,此场景并不追求快速响应,主要利用有限的资源在单位时间内尽可能多的处理任务,可以利用队列进行任务的缓冲
这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
三、线程池参数
1. 七大核心参数
(1) corePoolSize:线程池中常驻的核心线程数。
(2) maximumPoolSize:线程池能容纳的同时执行的最大线程数。
(3) keepAliveeTim:多余的(除核心线程外的其他线程)空闲线程的存活时间。
(4) unit:keepAliveTime的单位(ms、s…)
(5) workQueue:任务队列,已提交但是未执行的任务。
(6) threadFactory:生成线程池中线程的工厂。
(7) handler:拒绝策略,当前队列满了并且正在工作的线程等于最大线程数(maximumPoolSize)时如何处理任务。
2. 参数如何配置
网上有很多计算公式,例如:CPU+1、CPU核数*2。或者根据cpu密集型或io密集型,进行配置。但是这样真的合理吗?可以适用所有场景吗?答案肯定是不行的。那该如何配置呢?
(1)根据经验和实践配制出合理的参数。
(2) 根据监控服务线程池资源利用情况结合业务场景动态配制合理参数。
有的兄弟可能觉得说了等于没说,我觉得你应该先知道线程池执行流程(四、执行流程)和各参数使用情况,需要结合服务器配置以及业务场景,动态调整线程池参数。你有什么比较好的线程池参数设置方式呢?欢迎评论留言!
四、执行流程
1. 线程池执行流程图
2. 文字+图片描述
咱也可以按照生活中的场景,讲一下这个线程池参数的使用和执行流程,方便兄弟们理解。
某某公司分部(threadFactory)为客户办理业务,一共有4个柜员(maximumPoolSize),不忙的时段就分配2个柜员(corePoolSize)办理业务,剩余2人休息。摆了4个椅子(workQueue),供客户进行等待。并且这个公司给员工定了个规定,就是当工作区的柜员都在工作,并且等待区的座位都做满时,那么处于休息区的柜员要出来帮忙。直到所有柜员都在工作,等待区也坐满了。那为了我们公司员工身体的考虑,暂时拒绝任何人来办理业务(handler),如果有某个员工空闲下来并且超过了10(keepAliveTime)分钟(unit),那就可以回到休息区休息,但是必须保证有2个柜员在工作区。
(1) 两个柜员为两位顾客办理业务
(2) 给客户1,2办理过程中,陆续来客户把坐位坐满了。这时客户7来了~
(3) 柜员3出来帮忙,至于柜员3给新来的客户办理业务还是给等待区的客户办理业务,那得取决于选择什么队列。这时候客户8来了~
(4) 由于柜员1,2,3,4都在办理业务,并且等待区都坐满了,那么就关门拒绝给客户办理业务。
(5) 客户1,2业务办理结束,继续办理等待区客户的业务。
(6) 继续办理等待区客户业务,但是柜员4等了10分钟都没有客户要办理业务,于是他就去休息区休息了。
(7) 当所有客户的业务都办理完毕,工作区还需要留下两个人。
根据上面的图例,我们发现客户9被拒绝后走掉了。那就说明我们人员或者设置的座位大小不正确。如果你是这个公司的总裁,在这个公司分部怎样设置你觉的最合理呢?欢迎补充。
1. 分部店面(服务器)大小合适,节约成本。
2. 柜员(线程池中工作线程)尽量处于工作状态,但是不能累坏(我就是柜员[微笑])。
3. 座位(队列)尽量都坐满。
4. 不要让任何客户被拒之门外。
五、实际应用
说之前,先来说说Java提供的三种创建线程池的工具(注意:自己做Demo可用,做项目咱不用)
1. Executors.newScheduledThreadPool();
2. Executors.newWorkStealingPool();
3. Executors.newFixedThreadPool();
阿里编码规约曾说:
【强制】线程池不允许使用Executors去创建, 而是通过ThreadPoolExecutor的方式, 这样的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
3)ScheduledThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
我的天这要是上线了,搞不好年终奖与你擦肩而过。
1. 案例一
(1) 场景
接下来就是编码时间,就简单介绍下ThreadPoolExecutor在代码中的使用。
(2) 代码:git地址
a. application.yml
# 线程池task:threadPool:# 核心线程corePoolSize: 4# 最大线程maximumPoolSize: 8# 等待工作的超时时间 (秒)keepAliveTime: 60# 队列长度cacheQueueLen: 20
b.ThreadPool
import lombok.Setter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.*;/** * 线程池 * 注解: @ConfigurationProperties和@Setter一起使用, 否则成员变量无法赋值 * * @author wxy * @date 2023-02-12 */@Component@Setter@ConfigurationProperties(prefix = "thread.pool")public class ThreadPool {private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPool.class);private ExecutorService executorService;private int queueCapacity;private int corePoolSize;private int maximumPoolSize;private int keepAliveTime;/** * 初始化线程池 * 调用时间: SpringBoot项目启动自动调用 */@PostConstructpublic void init() {LOGGER.info("thread pool init start");if (corePoolSize <= 0|| maximumPoolSize < corePoolSize|| keepAliveTime <= 0) {// 强烈建议增加参数校验, 如果不加有些jdk版本可能会有问题throw new IllegalArgumentException();}executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue(queueCapacity),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());LOGGER.info("thread pool init end");}/** * 获取线程池 * * @return ThreadPool Executor */public ExecutorService getThreadPoolExecutor() {return this.executorService;}/** * 关闭线程池 * 调用时间: SpringBoot项目结束自动调用 */@PreDestroypublic void destroy() {executorService.shutdown();LOGGER.info("thread pool destroy success");}}
参考文章:
1.Java线程池实现原理及其在美团业务中的实践(推荐观看)