本文分享自华为云社区《走向批处理-交互式分析一体化: Trino容错模式深度测评与思考》,作者:HetuEngine九级代言 。

本文系华为云大数据研发团队原创,原创作者:文博,梦月

1 Trino简介

2020年12月27日,Presto社区大佬们——Martin Traverso、Dain Sundstrom以及David Phillips宣布将开源项目PrestoSQL的名字更名为TrinoDB(本文简称Trino)。

Trino是一款开源的高性能、分布式SQL查询引擎,专门用于对各种异构数据源运行交互式分析查询,支持从GB到PB的数据量范围。Trino专门为交互式分析而设计,可以对来自不同数据源的数据(包括:Hive、AWS S3、Alluxio、MySQL、Kafka、ES等等)进行合并查询,并提供良好的自定义连接器编程扩展框架。适用于期望响应时间从亚秒到数分钟不等的分析师场景。

在诞生之初,Trino是为了填补当时Facebook内部实时查询和ETL处理之间的空白。Trino的核心目标就是提供交互式查询,也就是我们常说的Ad-Hoc Query,很多公司都使用它作为OLAP计算引擎。近年来业务场景越来越复杂,除了交互式查询场景,很多公司也需要兼顾批处理作业,技术大佬们开始思考如何用Trino来进行大数据集的批加工处理。

2 传统Trino架构的局限性

在传统Trino运行架构中,Trino预先规划了处理特定查询的所有task。这些task彼此关联,一项task的结果是下一项task的输入。对于MPP引擎来说,这种相互依赖是必要的。一旦任何任务在此过程中失败,就会破坏整个任务链条,导致整个SQL执行退出。

Trino执行SQL任务过程如下图(来自Trino官网):

优点:

数据通过task进行流式传输,没有中间检查点,高吞吐低延迟

不足:

  • 缺乏细粒度的故障回复,出现问题只能从头运行整个Query
  • 完全依赖内存资源进行数据装载和交换
  • 执行规划一旦确定就无法根据实际执行进展灵活调整

3 Trino容错执行架构(FTE)

Trino开源社区设计了一种新的容错执行架构(fault-tolerant execution architecture),它允许我们实现具有细粒度重试的高级资源感知调度(advanced resource-awarescheduling)。该项目代号为“Tardigrade”。

Tardigrade项目旨在打破原有的全有或全无的执行障碍。它为资源管理、自适应查询优化和故障恢复带来了许多新的机会。该项目以水熊虫命名,水熊虫是世界上最坚不可摧的生物,类似于FTE为Trino带来的鲁棒性。

以下是Tardigrade项目带来的一些直观效果:

  • 当长时间运行的SQL Query遇到故障时,不必从头开始运行;
  • 当Query需要的内存超过集群中当前可用的内存时,仍然能够运行成功;
  • 当多个Query同时提交时,它们能够以公平的方式共享资源,并稳步运行

从代码实现角度看,Trino直接在内核中实现了task级容错、自动重试、shuffle等核心功能。如下图所示(来自Trino官网):

Trino会将一个Query执行分成多个stage。在容错模式下,上游stage的shuffle数据会进行落盘(支持写到AWS S3、HDFS及本地存储)。下游stage从中间存储里读取所需要的数据,并在该过程中对后续task任务进行重新优化与分配。

带来的改进:

  • 适应性规划:可以在缓冲数据时,动态调整查询计划
  • 资源管理:在查询运行时调整资源分配。当集群空闲时,我们可以允许单个查询利用集群上的所有可用资源。当更多工作负载开始时,可​​以逐渐减少初始查询的资源分配。
  • 细粒度的故障恢复:允许透明地重启失败的任务,使得ETL完成时间更可预测。

接下来,本文将带各位深入体验Trino容错执行模式。

4 基础性能测试

首先在计算资源充足的场景下进行基础性能测试。选取1TB数据量的TPCDS,计算资源规格为2CN+16Worker 136GB/进程,测试开启容错前后,执行TPCDS99,耗时统计如下:

测试写入性能选择TPCDS表中最大的表catalog_sales测试写入性能,SQL为:

— create table catalog_sales_copy as select * from catalog_sales;

测试数据如下:

数据集

计算资源

执行耗时(单位:秒)

不开容错和spill

Task容错

Task容错+spill

1TB

1CN+2Worker,20GB/进程

622.2

673

687

10TB

1CN+3Worker,136GB/进程

3445

1485

1486

小结:

  • 开启Task容错会进行中间交换区结果落盘,存在性能损耗,执行耗时约为之前的2倍;
  • Query容错没有落盘的过程,与不开启容错性能持平。
  • 1TB数据集时,Task容错写入性能也会有8%-10%损耗,但在10TB数据集时反而有性能提升,待深入分析;

5 大数据量场景的稳定性测试

本节将在计算资源严重不足的场景下进行TPCDS压力测试。测试结果如下:

数据量

计算资源

错误率

不开容错

Task容错

Task容错+
spill to disk

1TB

1CN+2Worker,40GB/进程

7.07%

0%

0%

1CN+2Worker,20GB/进程

12.12%

0%

0%

1CN+2Worker,10GB/进程

16.16%

4.04%

0%

10TB

1CN+3Worker,136GB/进程

8.08%

0%

0%

50TB

1CN+16Worker,136GB/进程

13.13%

6.06%

5.05%

小结:

  • 内存不足情况下使用Task容错,能够大幅度提高SQL执行成功率。与spill to disk特性结合使用能带来更好的容错效果;
  • 在50TB数据集时,Task容错仍然能够提高执行成功率,但某些复杂SQL可能会存在单点瓶颈。目前观察到主要是单点聚合瓶颈。

6 高并发场景测试6.1 1TB TPCD标准数据集

计算资源规格:1CN+8Worker,136GB/进程

测试SQL用例:Q01(多事实表关联查询,即TPCDS99中的Q29)

测试结果如下表所示:

测试场景

1并发

100并发

200并发

不开启容错

QUERY容错

TASK容错

不开启容错

QUERY容错

TASK容错

不开启容错

QUERY容错

TASK容错

多表关联查询(多事实表)Q01-1轮

4.1/min

5.2/min

2.6/min

7.3/min

7.2/min

8.1/min

17.50%失败

18%失败

7.9/min

多表关联查询(多事实表)Q01-5轮

5.2/min

4.8/min

3.4/min

8.3/min

8.6/min

8.6/min

64.9%失败

74.9%失败

8.5/min

6.2 10TB TPCD标准数据集

计算资源规格:1CN+8Worker,136GB/进程

测试SQL用例:

单表多列聚合排序查询Q02:

select

  • ws_item_sk,
  • ws_web_site_sk,
  • sum(ws_sales_price) total

from

  • web_sales

where

  • ws_sold_date_sk >= 2450815
  • and ws_sold_date_sk <= 2451179

group by

  • ws_item_sk,
  • ws_web_site_sk

having

  • sum(ws_sales_price) > 0

order by

  • total desc

limit 100;

开启TASK容错全部能够执行成功。测结果如下表所示:

测试场景

1并发

100并发

200并发

300并发

400并发

不开容错

TASK容错

不开容错

TASK容错

不开容错

TASK容错

不开容错

TASK容错

不开容错

TASK容错

单表多列聚合排序查询Q02_1轮

3.3/min

1.3/min

7.9/min

5.7/min

9.7/min

8.8/min

8.5/min

5.9/min

97.25%失败

6.8/min

单表多列聚合排序查询Q02_5轮

7.1/min

2.0/min

10.7/min

9.5/min

10.3/min

9.3/min

8.20%失败

8.0/min

99.1%失败

6.6/min

小结:

Task容错能够提升Trino引擎的并发上限,很大程度上减少诸如“Encountered too many errors talking to a worker node.”报错的产生。

7 多个引擎横向对比测试

首先从TPCDS99中挑选出计算资源受限前提下,Trino不开启容错100%会跑失败的SQL用例,包括:

Q04,Q11,Q23,Q38,Q64,Q65,Q67,Q74,Q75,Q78,Q80,Q81,Q85,Q87,Q93,Q95,Q97

基于相同计算资源(内存、CPU、Container个数),横向对比Trino、Spark、Hive(TEZ)的性能表现。

注:测试Trino时实际采用的是华为云HetuEngine 2.0的内核版本。

7.1 1TB TPCD标准数据集

可看出,在1TB数据量、使用相同资源情况下,开启Task容错,Trino能够将原先跑失败的SQL执行成功,且性能约为Spark的3倍左右,是Hive(TEZ)的数十倍。

7.2 10TB TPCDS标准数据集

针对10TB TPCDS标准数据集,进行对比测试:

可看出,在10TB数据量、使用相同资源情况下,开启Task容错,Trino能够将原先跑失败的SQL执行成功,且性能约为Spark的3倍左右。

8 综合评价

综上,基于测试数据总结归纳如下——

单并发基础性能

  • 内存资源充足:不开启容错= Query容错> Task容错
  • 内存资源不足:Task容错可以跑过,不开启容错/Query容错跑不出结果

大数据量场景的稳定性

Task容错+ spill to disk > Task容错>不开启容错

  • 1-10TB数据集:Task容错的表现很稳定,通过率100%
  • 50TB数据集: 结合使用Task容错、spill to disk相比单独用Task容错表现更好(少失败1个用例)

并发场景的稳定性

Task容错>不开启容错

多个引擎横向性能对比

  • 1TBTPCDS数据集:Trino(Task容错) > Spark > Hive(TEZ)
  • 10TBTPCDS数据集:Trino (Task容错) > Spark

总体而言,Trino的FTE功能在性能、稳定性维度的测试表现超出了预期。随着该能力的逐步演进与完善,相信Trino将在一站式数据加工与分析场景发挥出更大的价值。

9 思考与改进

在拥有了第一手的测试数据与分析结论后,接下来我们将思考如何利用好Trino容错模式,最大化的发挥其价值,同时要提前识别可能存在的问题,探索解决之道。

9.1 容错模式启用决策

从前面的测试数据可以看出,开启容错模式对于短查询性能存在一定的影响(对大查询性能反而存在优化的可能)。因此需要思考何时、何种方式来开启容错模式。

有如下思路可供选择——

  • 用户自主择机启用

最简单的办法就是让业务用户自主择机选择启用或者关闭容错模式。通常情况下,有经验的用户知道哪些查询可能是计算量大或者运行时间久的查询。他们可以通过改变JDBC连接的session参数来实现在“交互式模式”和“容错模式”之间灵活切换;

  • 基于代价决策

可以基于SQL执行的预测代价来决定是否开启“容错模式”。一般来说,这个技术需要依赖实现统计获得的列级别统计信息。然而,列级别统计信息有时候是不可用的,而且基于代价估算的预测精度往往不够理想;

  • 自适应选择技术

默认情况下,查询可以“交互式模式”启动,然后在运行N分钟后,经过一段时间学习后,由引擎内核根据可用资源情况、业务特点等维度信息,自主决策启动或关闭“容错模式”。这个思路需要将Trino引擎与机器学习、AI技术结合起来,践行数智融合路线;

  • 基于历史信息决策

针对特定数据源的某些类型的查询,可以预先收集历史运行记录并进行分析建模。基于事先学习掌握的先验知识模型,在SQL执行前选择最优的执行模式。

9.2 水平扩展规模应用

Trino具备了容错执行模式,测试数据显示效果不错,那么接下来大家就会思考:是否可以基于该能力提供更大规模的分析查询加速服务呢?

实际业务场景中,企业可能需要按需进行任务提交与弹性资源调度,尤其是在大规模、云原生环境中,即使开启容错模式,对于单个Trino集群,其协调节点(Coordinator)依然可能存在并发能力的瓶颈。此外,从软件架构角度看,单一Trino集群的可用性也存在一定的风险,影响云服务环境下的SLA目标达成。

针对上述问题,华为云交互式分析引擎HetuEngine提供了三层分布式架构,通过统一的SQL访问入口——HSFabric来向业务提供全局唯一的JDBC服务地址。

通过HSFabric统一SQL访问入口,HetuEngine实现了将业务层逻辑与某个特定的计算实例解耦,单个资源租户内部可以横向扩展多个计算实例,同一个租户内部的SQL任务可以在不同计算实例间灵活分配。

无论从多租户还是单一租户角度看,HetuEngine的并发容量可水平扩展,同时也提升了服务可用性和资源利用率。

基于上述架构,HetuEngine支持服务管理员自由决定是否开启/关闭单个租户的容错执行模式,以便更好的满足不同场景的业务诉求。

9.3 故障处理与恢复

在Trino容错执行过程中,Stage间的Shuffle数据会大量落入到分布式文件系统上。这里以HDFS为例进行讨论可能存在问题。

假设——1个大SQL在执行过程中,Trino正在往HDFS上写shuffle数据,突然Trino所在物理机节点发生意外(比如,停电、断网、操作系统崩溃等),或者Trino本身出现故障停止工作(比如,过载等)。这可能会导致整个Trino集群都彻底停止工作。此时,需要管理员人工介入才能重新恢复Trino集群的正常工作状态。

显而易见,对Trino来说,至少存在2个问题需要思考和解决:

  • 如何实现Trino集群的应急快速恢复
  • 确保HDFS上的残留文件及时被清理,避免存储空间耗尽

华为云交互式分析引擎HetuEngine基于三层服务化+容器化架构,可有效应对上述挑战:

针对问题1

借助于全容器化的部署架构,HetuEngine的任一计算实例(对应于1个分布式Trino集群)中的任一软件进程在发生故障/意外时,均可由Service层快速自动拉起新的容器进程来接管和补齐服务缺失,在人工介入前快速完成故障自愈。

在可用资源可能存在不足时,HetuEngine支持计算实例在线弹性伸缩,通过自动调整Worker数量来动态平衡资源利用率,快速补充因故障而丢失的Worker节点资源。

在Coordinator节点发生故障时,HetuEngine从三方面入手进行应对——

  1. 同一计算实例中的Worker节点立即与备Coordinator进行组网;
  2. 备Coordinator升为新的主Coordinator;
  3. 统一SQL入口立即将新的SQL请求引流到新的主Coordinator

针对问题2

HetuEngine的Service层全天24小时不间断监控,跟踪并及时发现、清理各层级作业残留(包括:数据、文件、目录、元数据等)。

同时针对历史任务进行多维度地深入洞察,生成高价值SQL运维图表和决策推荐信息,最终呈现在控制台页面。

Service层提供的全方位贴心服务,极大降低了对数据分析平台管理员的专业知识要求,解决管理员对于长期运营的后顾之忧。

9.4 大数据平台业务无损的弹性扩缩容

通常来说,大数据平台的弹性伸缩方案只会涵盖Hive、Spark这类批处理引擎。因Hive、Spark本身具备了容错执行能力,即使因为大数据平台的管控面下发指令强制缩容一个正在运行Hive/Spark作业的物理节点,也不会影响相关作业的最终执行成功,最多只是引发了局部task的重试,增加了执行时长。因此,面向Hive、Spark引擎的大数据平台弹性伸缩方案相对来说比较容易,只需要关注资源层面的管理操作即可。

但对Trino这类MPP架构引擎来说,上述大数据平台的弹性伸缩管理模式就可能会面临如下几个方面的挑战:

  • MPP架构的SQL引擎一般都是常驻形态,在缩容过程中任何一个节点被强杀都可能导致该节点上正在运行中的SQL任务失败;
  • Trino的协调节点Coordinator默认为1个,在缩容过程中,强杀Coordinator所在的节点会导致整个Trino集群不可用,运行中的所有SQL任务失败;
  • Trino集群的扩容,需要平台管理面深入理解Trino集群的内部服务发现与工作机制,针对具体集群的IP和端口号定制配置,才能顺利的将新节点加入到一个已经存在的Trino集群中。

综上,要想在大数据平台服务上实现对Trino生态引擎的弹性伸缩,且做到业务无损,需要在大数据平台服务层和Trino内核层之间抽象出一个面向多资源租户+多个计算实例(Trino集群)的资源管理&业务接入service层。

HetuEngine的service层对大数据平台服务层屏蔽底层Trino内核细节,对上提供Rest API调用,并将大数据平台服务层的管理运维诉求转换为对具体Trino集群的实际变更。同时要做到对多个Trino集群的日常状态监控与自维护。

在上述架构基础之上,可以基于Trino容错执行的能力,在开启弹性伸缩时,进一步降低大数据平台层面弹性伸缩的等待时间。

一种可行的思路大致是——

大数据平台服务层向HetuEngine的service层下发缩容指令,service确定即将被缩容的节点上正在运行的计算实例,并将其动态切换到容错模式。在通常情况下,service层可以快速向上层服务层答复缩容操作准备继续,不用等待SQL任务执行完。

9.5 小结

基于上述架构与思路,华为云HetuEngine能很好地应对容错执行模式可能引入的新问题,显著提升生产环境实际运维效率,助力用户很方便地享受容错执行的新红利。

接下来,HetuEngine将逐步引入和完善在两个不同执行模式间的智能切换能力,进一步完善对大数据云服务弹性伸缩的场景适配,在数据湖内一站式SQL分析领域持续创新、长期演进。

10 HetuEngine 2.0版本预告

预计2023年9月30日,HetuEngine 2.0将随华为云MRS 3.3.0-LTS正式发布。在该版本中,可以看到一系列的新能力,例如——

  • 基于Java17运行全新内核,基础性能、稳定性再上一个新台阶,TPCDS提速30%
  • 大SQL主动防御:事前提示/拦截,事中熔断,事后统计
  • 支持容错执行模式:适用范围更广泛,使能一站式SQL加工 & 分析
  • 租户内多计算实例架构:自动负载均衡、针对单个业务的并发能力可水平扩展
  • 新增数据源类型:Hudi,MySQL
  • 新增支持新建Hudi表、Insert数据
  • 新增支持Hue对接HetuEngine,提供可视化SQL编辑页面
  • 新增支持代理用户模式,支持对客户的自有用户体系的代理鉴权及审计

相关链接:https://support.huaweicloud.com/intl/zh-cn/cmpntguide-lts-mrs/mrs_01_1711.html

点击关注,第一时间了解华为云新鲜技术~