前言
本篇文章主要整理hive-3.1.2版本的企业调优经验,有误请指出~
一、性能评估和优化
1.1 Explain查询计划
使用explain命令可以分析查询计划,查看计划中的资源消耗情况,定位潜在的性能问题,并进行相应的优化。
explain执行计划见文章:
Hive调优——explain执行计划-CSDN博客文章浏览阅读843次,点赞18次,收藏11次。Hive调优——explain执行计划https://blog.csdn.net/SHWAITME/article/details/136092007″ />– 开启Stage并行化,默认为falseSET hive.exec.parallel=true;– 指定并行化线程数,默认为8SET hive.exec.parallel.thread.number=16;
ps:调整并行度的措施,建议在数据量大,sql 逻辑复杂的时候使用。当数据量小或sql逻辑简单时开启并行度,优化效果不明显。
1.3本地模式
使用hive的过程中,有一些数据量不大的表也会抓换成MapReduce处理,提交到yarn集群时,需要申请资源,等待资源分配,启动JVM进行,再运行Task,一系列的过程比较繁琐,严重影响性能。Hive为解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交给yarn,直接在本地运行。
-- 开启本地模式set hive.exec.mode.local.auto = true;
1.4Fetch 抓取
Fetch 抓取是指:Hive 中对某些简单的查询可以不必使用 MapReduce 计算。hive-default.xml.template 配置文件的hive.fetch.task.conversion 默认是 more。此时在全局查找、字段查找、limit 查找等都不走mapreduce。例如:select *from employees;
二、Hive建表优化
2.1 分区表、分桶表
Hive的相关概念——分区表、分桶表-CSDN博客文章浏览阅读419次,点赞15次,收藏7次。Hive的相关概念——分区表、分桶表https://blog.csdn.net/SHWAITME/article/details/136111924″ />2.2文件格式及数据压缩优化 详细信息见文章: (10)Hive的相关概念——文件格式和数据压缩-CSDN博客文章浏览阅读36次。Hive的相关概念——文件格式和数据压缩https://blog.csdn.net/SHWAITME/article/details/136122673 Hive Join的底层是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce任务的性能,提供了多种Join方案来实现。例如:适合小表Join大表的Map Join,大表Join大表的Reduce Join, 以及大表Join的优化方案Bucket Join等。 1)应用场景:小表join大表、小表Join小表 2)概述:Map Join是直接在Map阶段完成join工作,没有Shuffle阶段,从而避免了数据倾斜。 3)工作机制:使用hadoop中DistributedCache(分布式缓存)将小表广播到每个map任务节点,转换成哈希表加载到内存中,之后在mapper端和大表的分散数据做笛卡尔积,直接输出结果。 4)Map Join的特点: 5)参数设置: 1)应用场景:大表Join大表 2)概述:将两张表按照相同的规则将数据划分、根据对应的规则的数据进行join、减少了比较次数,提高了性能 Sort Merge Bucket Join:基于有序的数据Join 1)应用场景:大表Join大表 2)概述:Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程。这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并 3)SkewJoin Hint 语法 在 4)Skew Join参数: 1)应用场景:大表Join大表 2)概述:两张表的数据关联会经过shuffle阶段,Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join(普通的join方式) 3)阶段阐述: 生成键值对,以join on 条件中的列作为key,以join之后所关心的列作为value值,在value中还会包含Tag标记信息,用于标明此value对应哪张表 根据key值进行hash分区, 按照hash值将键值对(key-value)发送到不同的reducer中 Reducer通过Tag来识别不同的表中的数据,然后分别做合并操作 4)sql举例: sql转化为mr任务流程如下图: 5)Reduce Join方法缺点: 如果分组字段本身存在大量重复值(该字段值也叫做热点key值),group by底层走shuffle,分组聚合会出现数据倾斜的现象,可以使用如下三种方案解决: 该参数可以将顶层的聚合操作放在 Map 阶段执行,从而减轻shuffle清洗阶段的数据传输和 Reduce阶段的执行时间,提升总体性能 开启该参数以后,当前程序会自动通过两个MapReduce来运行 该参数的优化原理是:将M->R阶段 拆解成 M->R->R阶段 引入两级reduce: 第一阶段的shuffle key = group key + 随机数,将热点数据打散,多个reduce并发做部分聚合; 第二阶段的shuffle key = group key,保障相同key分发到同一个reduce做最终聚合; 优化思路为: count (distinct) 使得map端无法预聚合,容易引发reduce端长尾。数据量大可以使用group by替换count ..distinct去实现去重,但是需要注意:如果count(distinct 大量重复数据),group by可能会带来数据倾斜问题(对热点key分组操作,容易导致数据倾斜)。 用group by 替换count(distinct)的案例见文章: (12)Hive调优——count distinct去重优化-CSDN博客文章浏览阅读187次,点赞2次,收藏2次。Hive调优——count distinct替换https://blog.csdn.net/SHWAITME/article/details/136118294″ />六、HQL—优化器引擎 根据不同的应用场景,可以选择CBO,设置方式如下: 用于提前运行一个MapReduce程序,基于表或者分区的信息构建元数据信息【包含表的信息、分区信息、列的信息】,搭配CBO引擎一起使用。 谓词下推Predicate Pushdown(PPD):在不影响结果的情况下,尽量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,提升任务性能。 谓词下推的场景分析见文章: (08)Hive——Join连接、谓词下推-CSDN博客文章浏览阅读1k次,点赞16次,收藏16次。Hive的Join连接https://blog.csdn.net/SHWAITME/article/details/136105973 in/exists操作,推荐使用Hive的left semi join(左半连接)进行替代 left semi join(左半连接)的详细说明见文章: hive/spark–left semi/anti join_sparksql left semi join-CSDN博客 拖慢HQL查询效率的原因除了join引发的shuffle过程外,还有一个就是子查询调用次数较多,存在冗余代码块。因此我们可以借助CTE 公共表达式,简单来讲就是:with as 语句,将代码中的子查询事先提取出来(类似临时表),可以避免重复计算。 ps:hive中的CTE公共表达式文章: (09)Hive——CTE 公共表达式-CSDN博客文章浏览阅读519次,点赞6次,收藏6次。Hive的CTE 公共表达式https://blog.csdn.net/SHWAITME/article/details/136108359″ />十、合理设置 Map 及 Reduce 数 1)通常情况下,作业会通过input的目录产生一个或者多个map 任务,map数量主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。 2)map数并非越多越好 如果一个任务有很多小文件(远远小于块大小 128m ),则每个小文件会启动一个 map 任务来执行,而一个 map 任务启动和初始化的时间远远大 于逻辑处理的时间,造成很大的资源浪费。 当input的文件很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,公式: 让maxSize低于blocksize,此时公式 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize))) =maxSize 原map个数= 输入文件的数据量/computeSliteSize = 输入文件的数据量/blocksize, 现在map个数=输入文件的数据量/computeSliteSize =输入文件的数据量/maxSize reducer数的计算公式 : 个数N = min ( hive.exec.reducers.max ,总输入数据量/ hive.exec.reducers.bytes.per.reducer) 根据该公式可以得知:reduce个数(hdfs上的落地文件数量)是动态计算的 在hadoop的mapred-default.xml 文件中修改下列参数, ps:reduce个数并不是越多越好 Hive小文件问题及解决方案,见文章: (14)Hive调优——合并小文件-CSDN博客文章浏览阅读775次,点赞10次,收藏17次。Hive的小文件问题https://blog.csdn.net/SHWAITME/article/details/136108785″ />(15)Hive调优——数据倾斜的解决指南-CSDN博客文章浏览阅读326次,点赞10次,收藏9次。Hive调优——数据倾斜指南https://blog.csdn.net/SHWAITME/article/details/136092028 参考文章: 大数据从业者必知必会的Hive SQL调优技巧 | 京东云技术团队 – 掘金 Hive SQL优化思路-腾讯云开发者社区-腾讯云 https://zhugezifang.blog.csdn.net/article/details/127447167 https://blog.51cto.com/u_15320818/3253292 3年数据工程师总结:Hive数据倾斜保姆教程(手册指南)三、HQL—Join优化
3.1Map Join
select /*+ mapjoin(b,c)*/--mapjoin hint 定义小表,多个小表用逗号分隔...from t0 aleft join t1 b on a.id = b.idleft join t2 con a.id = c.id;# MapJoin中多个小表用半角逗号(,)分隔,例如/*+ mapjoin(a,b,c)*/。
#设置自动选择 Mapjoin,默认为trueset hive.auto.convert.join = true; #大表小表的阈值设置(默认25M以下认为是小表)set hive.mapjoin.smalltable.filesize = 25*1000*1000;
3.2 Bucket Join
3.2.1Bucket Join
3.2.2SMB Join
set hive.optimize.bucketmapjoin = true;set hive.auto.convert.sortmerge.join=true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.auto.convert.sortmerge.join.noconditionaltask=true;
# 创建分桶表 bigtable_buck1create table bigtable_buck1( id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string)clustered by(id)sorted by(id)into 6 bucketsrow format delimited fields terminated by '\t'; # 加载数据load data local inpath '/opt/module/data/bigtable' into table bigtable_buck1;#创建分桶表bigtable_buck2,分桶数和bigtable_buck1的分桶数为倍数关系create table bigtable_buck2( id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string)clustered by(id)sorted by(id)into 6 bucketsrow format delimited fields terminated by '\t'; #加载数据load data local inpath '/opt/module/data/bigtable' into table bigtable_buck2;#================ SMB Join调优步骤#设置参数set hive.optimize.bucketmapjoin = true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;# SMB Joininsert overwrite table jointableselect b.id, b.t, b.uid,b.keyword,b.url_rank,b.click_num,b.click_urlfrom bigtable_buck1 sjoin bigtable_buck2 bon b.id = s.id;
3.3Skew Join
select /*+ skewJoin(
[([,,...])][((,)[,(,)...])]*/
select
语句中使用上述的Hint提示才会执行MapJoin,其中table_name
为倾斜表名,column_name
为倾斜列名,value
为倾斜Key值。使用示例如下:#=========性能效率 方法1 < 方法2 <方法3#=========下面三个级别的hint,指定热点信息越详细,效率越高。 #-- 方法1:Hint表名(注意Hint的是表的别名)。select /*+ skewjoin(a)*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code; #-- 方法2:Hint表名和认为可能产生倾斜的列,下面认为a表的id和code列 存在倾斜select /*+ skewjoin(a(id,code))*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code; #-- 方法3:Hint表名和列,并提供产生倾斜的列的值,如果是string类型需要加引号,#--此案例 认为 (a.id=1 and a.code='xxx') 和 (a.id=3 and a.code='yyy') 的值出现倾斜select /*+ skewjoin(a(id,code)((1,'xxx'),(3,'yyy')))*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code;
#-- 开启运行过程中skewjoinset hive.optimize.skewjoin=true;#-- 如果这个key的出现的次数超过这个范围set hive.skewjoin.key=100000;#-- 在编译时判断是否会产生数据倾斜set hive.optimize.skewjoin.compiletime=true;#-- 不合并,提升性能set hive.optimize.union.remove=true;#-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并set mapreduce.input.fileinputformat.input.dir.recursive=true;
3.4Reduce Join
SELECT pageid,age FROM page_view JOIN userinfo ON page_view.userid = userinfo.userid;
四、HQL—Group By
序号 方案 说明 方案一 开启Map端聚合 set hive.map.aggr=true; 方案二 数据倾斜时自动负载均衡 set hive.groupby.skewindata = true; 方案三 添加随机数,两阶段聚合 热点key加随机数,阶段拆分,两阶段聚合 优化方案一:开启Map端聚合
#--开启Map端聚合,默认为trueset hive.map.aggr = true;#--在Map 端预先聚合操作的条数set hive.groupby.mapaggr.checkinterval = 100000;
优化方案二:数据倾斜时自动负载均衡
#---有数据倾斜的时候自动负载均衡(默认是 false)set hive.groupby.skewindata = true;
优化方案三:添加随机数,两阶段聚合(推荐)
#===============优化前insert overwrite table tblB partition (dt = '2022-10-19')selectcookie_id,event_query,count(*)as cntfrom tblAwhere dt >= '20220718'and dt = '20220718' and dt R) group by concat_ws('_', cast(ceiling(rand() * 99) as string), cookie_id),event_query ) temp #--- 第二阶段1:对拼接的key值进行切分,还原原本的key值split(tkey, '_')[1] =cookie_id ( R -->R)group by split(tkey, '_')[1], event_query;
五、HQL—非count(distinct) 去重
6.1CBO优化器
set hive.cbo.enable=true;set hive.compute.query.using.stats=true;set hive.stats.fetch.column.stats=true;set hive.stats.fetch.partition.stats=true;
6.2Analyze分析器
-- 构建分区信息元数据ANALYZE TABLE tablename[PARTITION(partcol1[=val1], partcol2[=val2], ...)]COMPUTE STATISTICS [noscan];-- 构建列的元数据ANALYZE TABLE tablename[PARTITION(partcol1[=val1], partcol2[=val2], ...)]COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];-- 查看元数据DESC FORMATTED [tablename] [columnname];--分析优化器--构建表中分区数据的元数据信息ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;--构建表中列的数据的元数据信息ANALYZE TABLE tb_login_partCOMPUTE STATISTICS FOR COLUMNSuserid;--查看构建的列的元数据desc formatted tb_login_part userid;
七、HQL—谓词下推(PPD)
八、HQL—in/exists 语句
# in / exists 实现select a.id, a.name from a where a.id in (select b.id from b);select a.id, a.name from a where exists (select id from b where a.id =b.id);#可以使用 join 来改写select a.id, a.name from a join b on a.id = b.id;#left semi join 实现select a.id, a.name from a left semi join b on a.id = b.id;
九、HQL—CTE 公共表达式
#==============优化前selectuid,--每个用户一月份的订单数sum(if(dt = '2018-01', 1, 0)) as m1_count,--每个用户二月份的订单数sum(if(dt = '2018-02', 1, 0)) as m2_count,--每个用户三月份的订单数(当月订单金额超过10元的订单个数)sum(if(dt = '2018-03' and oamount > 10, 1, 0)) m3_count,--当月(3月份)首次下单的金额sum(if(dt = '2018-03' and rk = 1, oamount, 0)) m3_first_amount,--当月(3月份)末次下单的金额(rk =cnt小技巧)sum(if(dt = '2018-03' and rk = cnt, oamount, 0)) m3_last_amountfrom ( select oid, uid, otime, date_format(otime, 'yyyy-MM') asdt, oamount, ---计算rk的目的是为了获取记录中的第一条 row_number() over (partition by uid,date_format(otime, 'yyyy-MM') order by otime) rk, --- 计算cnt的目的是为了获取记录中的最后一条 count(*) over (partition by uid,date_format(otime, 'yyyy-MM'))cnt from t_order order by uid ) tmpgroup by uidhaving m1_count > 0 and m2_count = 0;#================优化后with tmp as (selectoid,uid,otime,date_format(otime, 'yyyy-MM') asdt,oamount,---计算rk的目的是为了获取记录中的第一条row_number() over (partition by uid,date_format(otime, 'yyyy-MM') order by otime) rk,--- 计算cnt的目的是为了获取记录中的最后一条count(*) over (partition by uid,date_format(otime, 'yyyy-MM'))cntfrom t_orderorder by uid)selectuid,--每个用户一月份的订单数sum(if(dt = '2018-01', 1, 0)) as m1_count,--每个用户二月份的订单数sum(if(dt = '2018-02', 1, 0)) as m2_count,--每个用户三月份的订单数(当月订单金额超过10元的订单个数)sum(if(dt = '2018-03' and oamount > 10, 1, 0)) m3_count,--当月(3月份)首次下单的金额sum(if(dt = '2018-03' and rk = 1, oamount, 0)) m3_first_amount,--当月(3月份)末次下单的金额(rk =cnt小技巧)sum(if(dt = '2018-03' and rk = cnt, oamount, 0))m3_last_amountfrom tmpgroup by uidhaving m1_count >0 and m2_count=0;
10.1 复杂文件增加Map数
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))
10.2 合理设置 Reduce 数
10.2.1 调整reduce数的方法一
# 每个reduce任务处理的数据量默认是 256MBhive.exec.reducers.bytes.per.reducer=256*1000*1000# 整个MR任务支持开启的reduce数的上限值,默认为1009个hive.exec.reducers.max=1009
10.2.2 调整reduce数的方法二
#设置MR job的Reduce总个数set mapreduce.job.reduces = 15;
十一、Hive的小文件合并