点击Mr.绵羊的知识星球解锁更多优质文章。

目录

一、介绍

1. 简介

2. 分类(按线程池执行任务分类)

3. 架构设计

4. 优点

二、使用场景

1. 快速响应用户请求,响应速度优先

2. 单位时间处理更多请求,吞吐量优先

三、线程池参数

1. 七大核心参数

2. 参数如何配置

四、执行流程

1. 线程池执行流程图

2. 文字+图片描述

五、实际应用

1. 案例一


一、介绍

1. 简介

一种线程使用模式。

线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

2. 分类(按线程池执行任务分类)

(1) cpu 密集型任务

cpu 密集型任务,需要线程长时间进行的复杂的运算,这种类型的任务需要少创建线程,过多的线程将会频繁引起上文切换,降低任务处理速度

(2) io 密集型任务

io 密集型任务,由于线程并不是一直在运行,可能大部分时间在等待 IO 读取/写入数据,增加线程数量可以提高并发度,尽可能多处理任务

3. 架构设计

(1) Executor

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

(2) ExecutorService接口增加了一些能力

a. 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法。

b. 提供了管控线程池的方法,比如停止线程池的运行。

(3) AbstractExecutorService

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

(4) ThreadPoolExecutor

ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

4. 优点

(1) 降低资源消耗:通过重复利用已创建的线程,降低线程创建和销毁造成的开支。

(2) 提高响应速度:当任务到达时,任务可以不需要等待创建线程,直接使用线程池中创建好的线程。

(3) 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅消耗系统资源还会降低稳定性。

(4) 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

注意:线程池虽然好用,但是使用时一定要知道其线程池参数如何配置和使用,执行流程怎样的,详情见下文。

二、使用场景

1. 快速响应用户请求,响应速度优先

(1) 比如一个用户请求,需要通过 RPC 调用好几个服务去获取数据然后聚合返回,此场景就可以用线程池并行调用,响应时间取决于响应最慢的那个 RPC 接口的耗时。

(2) 某些批量操作,用户请求批量删除10个账号,希望能够快速得到响应,如果删除一个账号需要1秒,删除10个就需要10秒。如果你用了线程池异步执行,显然会很快得到响应结果。具体多快,取决于你服务器的性能和线程池的参数配置。

(3) 或者一个注册请求,注册完之后要发送短信、邮件通知,为了快速返回给用户,可以将该通知操作丢到线程池里异步去执行,然后直接返回客户端成功,提高用户体验。

这样的场景就建议不设置队列或设置短的队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。

2. 单位时间处理更多请求,吞吐量优先

比如接受 MQ 消息,然后去调用第三方接口查询数据,此场景并不追求快速响应,主要利用有限的资源在单位时间内尽可能多的处理任务,可以利用队列进行任务的缓冲

这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

三、线程池参数

1. 七大核心参数

(1) corePoolSize:线程池中常驻的核心线程数。

(2) maximumPoolSize:线程池能容纳的同时执行的最大线程数。

(3) keepAliveeTim:多余的(除核心线程外的其他线程)空闲线程的存活时间。

(4) unit:keepAliveTime的单位(ms、s…)

(5) workQueue:任务队列,已提交但是未执行的任务。

(6) threadFactory:生成线程池中线程的工厂。

(7) handler:拒绝策略,当前队列满了并且正在工作的线程等于最大线程数(maximumPoolSize)时如何处理任务。

2. 参数如何配置

网上有很多计算公式,例如:CPU+1、CPU核数*2。或者根据cpu密集型或io密集型,进行配置。但是这样真的合理吗?可以适用所有场景吗?答案肯定是不行的。那该如何配置呢?

(1)根据经验和实践配制出合理的参数。

(2) 根据监控服务线程池资源利用情况结合业务场景动态配制合理参数。

有的兄弟可能觉得说了等于没说,我觉得你应该先知道线程池执行流程(四、执行流程)和各参数使用情况,需要结合服务器配置以及业务场景,动态调整线程池参数。你有什么比较好的线程池参数设置方式呢?欢迎评论留言!

四、执行流程

1. 线程池执行流程图

2. 文字+图片描述

咱也可以按照生活中的场景,讲一下这个线程池参数的使用和执行流程,方便兄弟们理解。

某某公司分部(threadFactory)为客户办理业务,一共有4个柜员(maximumPoolSize),不忙的时段就分配2个柜员(corePoolSize)办理业务,剩余2人休息。摆了4个椅子(workQueue),供客户进行等待。并且这个公司给员工定了个规定,就是当工作区的柜员都在工作,并且等待区的座位都做满时,那么处于休息区的柜员要出来帮忙。直到所有柜员都在工作,等待区也坐满了。那为了我们公司员工身体的考虑,暂时拒绝任何人来办理业务(handler),如果有某个员工空闲下来并且超过了10(keepAliveTime)分钟(unit),那就可以回到休息区休息,但是必须保证有2个柜员在工作区。

(1) 两个柜员为两位顾客办理业务

(2) 给客户1,2办理过程中,陆续来客户把坐位坐满了。这时客户7来了~

(3) 柜员3出来帮忙,至于柜员3给新来的客户办理业务还是给等待区的客户办理业务,那得取决于选择什么队列。这时候客户8来了~

(4) 由于柜员1,2,3,4都在办理业务,并且等待区都坐满了,那么就关门拒绝给客户办理业务。

(5) 客户1,2业务办理结束,继续办理等待区客户的业务。

(6) 继续办理等待区客户业务,但是柜员4等了10分钟都没有客户要办理业务,于是他就去休息区休息了。

(7) 当所有客户的业务都办理完毕,工作区还需要留下两个人。

根据上面的图例,我们发现客户9被拒绝后走掉了。那就说明我们人员或者设置的座位大小不正确。如果你是这个公司的总裁,在这个公司分部怎样设置你觉的最合理呢?欢迎补充。

1. 分部店面(服务器)大小合适,节约成本。

2. 柜员(线程池中工作线程)尽量处于工作状态,但是不能累坏(我就是柜员[微笑])。

3. 座位(队列)尽量都坐满。

4. 不要让任何客户被拒之门外。

五、实际应用

说之前,先来说说Java提供的三种创建线程池的工具(注意:自己做Demo可用,做项目咱不用)

1. Executors.newScheduledThreadPool();

2. Executors.newWorkStealingPool();

3. Executors.newFixedThreadPool();

阿里编码规约曾说:

【强制】线程池不允许使用Executors去创建, 而是通过ThreadPoolExecutor的方式, 这样的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM

2)CachedThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

3)ScheduledThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM

我的天这要是上线了,搞不好年终奖与你擦肩而过。

1. 案例一

(1) 场景

接下来就是编码时间,就简单介绍下ThreadPoolExecutor在代码中的使用。

(2) 代码:git地址

a. application.yml

# 线程池task:threadPool:# 核心线程corePoolSize: 4# 最大线程maximumPoolSize: 8# 等待工作的超时时间 (秒)keepAliveTime: 60# 队列长度cacheQueueLen: 20

b.ThreadPool

import lombok.Setter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.*;/** * 线程池 * 注解: @ConfigurationProperties和@Setter一起使用, 否则成员变量无法赋值 * * @author wxy * @date 2023-02-12 */@Component@Setter@ConfigurationProperties(prefix = "thread.pool")public class ThreadPool {private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPool.class);private ExecutorService executorService;private int queueCapacity;private int corePoolSize;private int maximumPoolSize;private int keepAliveTime;/** * 初始化线程池 * 调用时间: SpringBoot项目启动自动调用 */@PostConstructpublic void init() {LOGGER.info("thread pool init start");if (corePoolSize <= 0|| maximumPoolSize < corePoolSize|| keepAliveTime <= 0) {// 强烈建议增加参数校验, 如果不加有些jdk版本可能会有问题throw new IllegalArgumentException();}executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue(queueCapacity),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());LOGGER.info("thread pool init end");}/** * 获取线程池 * * @return ThreadPool Executor */public ExecutorService getThreadPoolExecutor() {return this.executorService;}/** * 关闭线程池 * 调用时间: SpringBoot项目结束自动调用 */@PreDestroypublic void destroy() {executorService.shutdown();LOGGER.info("thread pool destroy success");}}

参考文章:

1.Java线程池实现原理及其在美团业务中的实践(推荐观看)