大家好,我叫王磊。是SelectDB 大数据研发。今天给大家带来的分享是《Apache Flink X Apache Doris构建极速易用的实时数仓架构》。
下面是我们的个人介绍:我是Apache Doris Contributor 和阿里云 MVP。同时著有《 图解 Spark 大数据快速分析实战》等书籍。

接下来咱们进入本次演讲的正题。本次演讲分:实时数仓需求和挑战、基于 Apache Doris 和 Apache Flink 构建实时数仓、用户案例与最佳实践分享、未来展望与计划四部分。

1. 实时数仓需求和挑战。

首先我们来看下第一部分:实时数仓需求和挑战。
首先我们站在数据流的角度分析下传统的数据架构。
从图中我们可以看到数据分为实时的数据流和离线数据流。

  • 在实时的数据数据流部分,我们通过binlog的方式将业务数据库RDS中的数据的变更采集到实时数仓;通过Flume Kafka Sink对文件中的数据变更进行实时采集。当数据源上不同来源的数据数据都实时接入后便可以开始构建实时数仓。在实时数仓内部的构建过程中依然遵守数仓分层理论:ODS层、DWD层、DWS层和ADS层。
  • 在离线数据流部分通过DataX定时同步的方式采集业务中的RDS数据。通过Flume File Sink采集日志数据。并构建离线数据。离线数仓的构建主要依赖HiveSQL或者Spark SQL对数据进行定时处理。分离出不同维度的数据,并将数据存储在HDFS或者对象存储中。
  • 同时在实时数据和离线数据中为了保障数据的一致性,我们需要一个数据清洗的逻辑使用离线数据对实时数据进行清洗,以保障最终数据的一致性。


接下来这张图是站在技术架构的角度对传统的数据仓库架构进行分析。从图中我们可以看到不同的应用我们采用了不同的技术栈。

  • 在湖仓部分我们基于Iceberg、Hudi、Hive等组件。

  • 基于湖仓之上的Ad hoc查询可能会采用impala或者Kudu。

  • OLAP的高并发报表使用Druid或者Kylin。

  • 同时业务还有一些半结构化的需求,这时我们可能会使用ES进行日志的检索和分析,使用HBase构建高效的点查服务。

  • 在有的业务中为了对外提供统一的数据服务,还有在各个数据服务之上构建一个查询网关服务或者联邦查询服务,业界比较常见的是Presto和Trino。
    那么这样的架构有什么问题呢?

  • 首先是组件多、架构复杂,不易运维。

  • 同时由于维护了实时和离线两套数据架构,因此在计算成本,存储成本和研发成本上都是比较高的。

  • 最后也是最核心的一点是他无法保证实时流和离线流数据的一致性,只能通过离线数据定时清洗实时数据的的方式实现最终的数据一致。
    基于以上的痛点,一个易用、统一和实时的架构的需求呼之欲出。这里的统一一般指数据结构的统一(结构化和半结构化数据统一),数据存储的统一,数据计算的统一。
    接下来我们看下如何通过基于 Apache Doris 和 Apache Flink 构建实时数仓来满足用户实时、易用和统一的架构需求。

2. 基于Apache Doris构建实时数仓

Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库,以极速易用的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。基于此,Apache Doris 能够较好的满足报表分析、即席查询、统一数仓构建、数据湖联邦查询加速等使用场景,用户可以在此之上构建用户行为分析、AB 实验平台、日志检索分析、用户画像分析、订单分析等应用。
从图中可以看到,数据源经过各种数据集成和加工处理后,通常会入库到实时数仓 Doris 和离线湖仓(Hive, Iceberg, Hudi 中),然后基于Apache Doris 去构建一个统一的数仓。

接下来咱们来看下如何基于Doris构建极速易用的实时数仓架构。
因为Doris可以承载数据仓库服务、AD-hoc、OLAP等多种场景的应用。因此整个实时数仓架构就变的简单起来。我们只需要通过Flink CDC将RDS数实时同步到Doris、通过Routine Load将Kafka等消息队列的数据实时同步到Doris。然后在Doris内部基于Doris的不同表模型(明细模型、主键模型、统计模型)和Rollup以及物化视图的能力来构建实时数仓。

  • 通常ODS层使用明细模型构建
  • DWD层通过定时调度SQL对ODS进行抽取而得到。
  • DWS和ADS则可以通过Rollup、视图或者物化视图等技术手段构建。
  • 除此之外Doris还支持基于Iceberg和Hudi等数据湖服务的联邦分析和湖仓加速。
    这样我们便完成了实时数仓的构建,在实时数仓之上我们可以构建BI分析、Ad-hoc查询、多维分析等应用。

3. Flink -> Doris 数据一致性解决方案

接下来咱们看在构建实时数仓中有哪些挑战?首先大家最关心的问题就是数据一致性如何保障。
数据一致性分为:最多一次、至少一次和精确一次。
最多一次:发送方仅仅发送消息而不期待得到任何确认,在这种模型中数据的生产和消费过程中都可能出现数据的丢失。
至少一次:发送方重试直到收到确认为止,在这种模型中数据的生产和消费过程中都可能出现数据的重复。
精确一次:消息只严格传递一次到接收方并能成功被接收方处理。这种数据模型能够严格保障数据的生产的准确一次性。
Doris基于两阶段提交(2PC)实现了数据的准确一致性。接下来我们看下Flink CDC ->Doris数据准确一致性。

Flink CDC ->Doris数据准确一致性是通过Flink CheckPoint 和 Doris的2PC来实现的。具体过程分为4步:

  1. 事务开启:Flink Job的启动和Doris事务的开启。当Flink任务启动后,Doris Sink会发起一个pre-commit的请求来开启一个写入事务。
  2. 数据传输:Flink Job的运行和数据的传输。在Fink运行过程中Doris Sink会不断的从上游算子获取数据并通过http chunked的方式持续将传输(写入)到Doris。
  3. 事务预提交:Flink 开始进行CheckPoint,当Flink发起一个CheckPoint任务后,Flink各个算子会进行barrier对齐,快照的保存。这时Doris Sink会发出一个停止 Stream Load数据的写入,并发起一个事务预提交(PreCommitted)的请求到Doris,这时这一批数据已经完全成功写入BE,只是BE还没有进行数据的发布(Publish),因此写入 BE的数据对用户是但不可见的。
  4. 事务提交:当Flink CheckPoint完成后完成后会通知各个算子,此时Doris Sink会发生一次事务提交(Commit)请求到BE,BE会对此次写入的数据进行发布,完成此次数据的写入过程。
    这便是Flink CDC到Doris数据的准确一致性原理。当我们预提交成功,但是Flink CheckPoint失败,此时Doris因为没有收到事务提交请求,Doris内部会对写入的数据进行事务的回滚。

介绍完了数据的一致性原理,接下来咱们看下如何基于Flink CDC实现全量和增量数据的同步。这个原理很简单,因为Flink CDC实现了基于Snapshot的全量数据同步和基于BINlog的增量数据同步,同时全量数据同步和增量数据同步可以自动切换。
因此在数据迁移的过程中用户只需要配置要同步的表即可,Flink 任务启动的时候会首先进行表历史数据的全量同步,当全量数据同步完成后会自动切换为增量数据同步。

4. Light Schema Change原理和使用

当我们完成实时数据同步后,用户又产生了RDS Schema变更同步的需求。因为随着业务的发展RDS的表结构会发生变化,用户希望CDC不但能将数据的变化实时同步到Doris,也希望能将RDS上的表结构变更也同步到Doris。这样的话用户就不用担心RDS表结构和Doris表结构不一致问题了。
要能满足DDL同步的前提是Doris能够支持快速的Schema Change,这样即使瞬间有很多Schema Change需求能够快速响应。之前Doris的Schema Change支持Hard Link Schema Change、Hard Sort Schema Change和Hard Sort Schema Change三中模式。这三种Schema Change的耗时都比较高。因此在最新的版本中我们引入了Light Schema Change,
相较于 Hard Linked Schema Change 的作业流程, Light Schema Change 的实现原理就要简单的多,只需要在加减 Value 列的时候,对 FE 中表的元数据进行修改并持久化。这样Schema Change的过程只需要更新FE 中的源数据,因此其效率很高,并且该响应是一个同步的过程。
由于 Light Schema Change 只修改了 FE 的元数据,没有同步给 BE,而 BE 对读写操作依赖于自身的 Schema,这时候就会出现 Schema 不一致的问题。为了解决此问题,我们对 BE 读写流程进行了修改。主要包含以下方面:

  1. 对于数据导入而言:FE会将 Schema 持久化到 Rowset 的元数据中。当FE 发起导入任务的时候,会把最新的 Schema 一起下发给 BE,BE 会根据最新的 Schema 对数据进行写入并与 Rowset 绑定,将该 Schema 持久化到 Rowset 的元数据之中,实现了 Rowset 数据的自解析,解决了写过程中 Schema 的不一致。
  2. 对于数据读取而言:FE 在生成查询计划时,会把最新的 Schema 附在其中,并一起发给 BE,BE 会拿最新的 Schema 读取数据,以此来解决读过程中 Schema 的不一致。
  3. 对于数据的Compaction而言:当BE进行 Compaction 的时候,选取需要进行 Compaction 的 Rowset 中最新的 Schema作为Compaction 之后 Rowset 所对应的 Schema,以此来解决拥有不同 Schema 的 Rowset 合并问题。
    经过Light Schema Change优化只有,Doris的Schema Change耗时从之前的310ms降低到7ms。整体性能提升近百倍。彻底解决了海量数据Schema Change难问题。


有了Light Schema Change的保障,我们的Flink CDC就同时支持DML的同步和DDL的同步,DML同步包括Insert、Update和Delete等语句,DDL语句包括ALTER TABLE ADD COLUMN 和ALTER TABLE DROP COLUMN。
有了DDL同步后,在Flink CDC中如何使用呢?

  1. 首先需要在CDC的MySQL Source 中开启 DDL 变更同步
  2. 接着Doris Sink会识别 DDL 操作(add/drop column)并解析
  3. 当发现有DDL语句后,Doris Sink会对Table进行校验,看其是否可以进行 Light Schema Change。具体需要在Doris的表的properties中设置light_schema_change = true。
  4. 校验通过后Doris Sink会发起Schema Change操作到Doris,完成Schema Change操作。

5. Flink->Doris进行数据集成

当我们解决了基于Flink和Doris数据同步过程中的数据一致性、全增量数据同步和DDL变更同步后。一个完整的数据同步方案就完成了。接下来我们站在数据模型的角度,看出如何基于Flink在Doris中构建不同的数据模型。

  1. 第一种情况是最简单的数据同步,也就是MYSQL一个表对应Doris中的一个表,通过MySQL Source + Doris Sink来实现数据和表结构的变更。
  2. 第二种情况是我们可以将MySQL中的两个表数据同步到Flink后,在Flink内部进行对流Join来完成数据的打宽,生成宽表并写入Doris
  3. 第三种情况是MYSQL中的数据和Kafka中的数据进行多流Join,生成大宽表数据,写入Doris。
  4. 第四种情况是,我们还可以对上游Kafka数据进行清洗,通过Doris Sink写入Doris表。
  5. 第五种情况是,我们可以在Doris侧基于unique模型建立一个大宽表,将MySQL中的不同表中的数据通过按列更新的方式写入到同一个Doris表中。
  6. 最后就是我们可以将MySQL中表数据同步到Doris后通过SQL ETL来抽取一个大宽表,来实现复杂业务查询。

6. oris实现高并发的写入和更新

介绍完数据模型后,我们来看下如何基于Doris实现高并发的写入和更新。
对于大数据架构来说。高并发的写入并不难,难点在于高并发的更新。如何在上亿数据中快速找到要更新的数据并对数据进行更新一直是大数据领域比较难处理的问题。在这个问题上Doris通过MVCC多版本并发控制的机制来实现。
特别的在unique模型中,当我们写入一个数据时,如果数据在库中不存在则会写入一个版本数据,当我们再次对该数据进行更新时会直接再写入一个版本,此时数据的变更(新增、修改、删除)在Doris中以多版本的形式存在。用户查询的时Doris会将最新的版本对应的数据返回给用户,并在Compaction时对历史变更数据进行清理。这种设计很好的解决了海量数据更新难问题。
同时Doris支持Merge On Write和Merge On Read两种模式。接下来我们分别进行介绍。

7. Merge On Read

首先我们来看Merge On Read。他的特点是
写入快:由于其无论是insert还是update,对于Doris来说都会以Append多版本的方式写入,因此其写入性能很高。
查询慢:因为查询过程中,需要对Key进行聚合、去重,然后查询,因此其查询性能慢。
接下来我们通过一个例子看下:

  1. 首先我们执行insert 写入3条的订单数据,此时Doris数据中不但包含了原始数据,还包含SEQUENCE列(DORIS_SEQUENCE_COL)和删除标记(DORIS_DELETE_SIGN)。SEQUENCE列用于支持在高并发更新时数据更新的顺序性问题,删除标记用于对删除的数据进行记录。
  2. 当我们将订单1的cost修改为30的时候,数据通过Append的形式以新的版本写入Doris。
  3. 当我们对订单2的数据进行删除时,数据仍然通过Append的形式以新的版本写入Doris,只不过此处写入的数据DELETE_SIGN字段变为1了,表示该条数据被删除了,当Doris读取数据的时候发现最新版本的数据被标记删除了,则会将该数据从查询结果中过滤。
  4. 当用户用户使用 Unique Key 模型进行查询的时候,会进行两次聚合操作。第一次是使用多路归并排序,将重复的 Key 排在一起,并使用高版本数据覆盖低版本数据的方式对数据进行聚合操作;第二次是按照查询的聚合条件对数据进行聚合。这就带来了比较严重的查询性能问题:
  • 多路归并排序代价高,对全量 Key 的比较非常消耗 CPU 的计算资源。
  • 数据读取过程中无法进行有效的数据裁剪,引入大量额外的数据 IO
    基于此我们引入和Merge On Write来优化查询速度。

8. Merge On Write

Merge On Write兼顾查询性能和写入性能。Merge On Write写入过程中引入了Delete Bitmap,用来标记Rowset中某一行是否被删除,使用了兼顾性能和存储空间的Roaring Bitmap,将Bitmap和tablet meta一起存储在RocksDB中,每个segment对应一个bitmap。为了保持Unique Key原有的version语义,Delete Bitmap也支持多版本。每次导入产生该版本的增量bitmap,查询时需要合并所有此前版本的delete bitmap。
接下来咱们看下基于Delete Bitmap的Merge On Write写入流程:

  1. DeltaWriter会先将数据flush到磁盘
  2. 在publish阶段去批量的点查所有的key,点查过程中经过首先会经过一个区间树找到对应的RowSet,然后在RowSet内部通过BloomFilter和index来提高查询效率。
  3. 当查询到Key对应的RowSet后,会更新被覆盖的key对应的bitmap。
  • 我们选择在publish阶段去更新bitmap,保证了批量点查key和更新bitmap期间不会有新增可见的rowset,保证了bitmap更新的正确性。
  • 另外,同时如果某个segment没有被修改,则不会有对应版本的bitmap记录。

  • Merge On Write在查询某个版本数据的过程中
  1. 首先会去LRU cache 的delete bitmap中查找该版本对应的delete bitmap缓存
  2. 如果缓存不存在再到RowSet中读取对应的delete bitmap。
  3. 最后使用delete bitmap对RowSet中的数据进行过滤,最终的数据返回。同时在这个阶段我们还可以通过bloomfilter和bitmap的二级索引来提高查询的效率,查询过程中Doris 会进行有效的谓词下推。

9. 数据模型介绍

Doris针对不同的应用场景提供了不同的数据模型,分别为:明细模型,主键模型和聚合模型。
明细模型:明细数据存储(日志分析、行为分析)
主键模型:相同key的数据覆盖更新(订单状态、用户状态)
聚合模型:相同key列value列合并(报表统计、指标计算)

10. Doris物化视图

接下来我们看下如何通过物化视图来加速查询。
物化视图的概念是: 根据预定义的SQL分析语句执行预计算并将计算结果物化(持久化)起来用以加速查询的一种手段。
他主要的应用场景是:数据预聚合、聚合数据和明细数据同时查询、匹配不同前缀索引三大场景。
我们这里创建一个名叫 advertiser_view_record的Base表和名为advertiser_uv的物化视图。
数据的更新过程首先会更新Base表,然后更新物化视图。

Doris通过物化视图来加速查询。并且在查询的过程中Doris会自动进行路由选择,当我们的查询在物化视图中能查询到数据时会直接走物化视图的数据。当数据在物化视图中不存在时会查询Base表,同时借助Doris的列式存储和向量化计算快速完成查询。
物化视图的智能路由选择遵循最小匹配原则,也就是说只有查询的数据的集合比物化事务小的时候才有可能走物化视图。其智能路由过程包括:最优选择和查询改写两个过程。
Doris 支持两级分区存储, 第一层为分区(partition), 第二层为 HASH 分桶(bucket)。例如我们可以按照时间首先对数据进行分区,在分区内部使用site进行分桶。当我们查询某个site上的数据的时候借助分区分桶裁剪能够快速命中数据。
除此之外为了加速查询,我们还可以使用索取来加速查询,Doris内部也提供智能的查询优化。

应用案例

用户案例-某跨境电商

  1. 新的准实时数仓,对数据计算效率、数据时效的提升是巨大的。以 On Time Delivery 业务场景报表计算为例,计算 1000w 单轨迹节点时效变化,使用 Apache Doris 之前需要计算 2 个多小时,并且计算消耗的资源非常大,只能在空闲时段进行错峰计算;使用 Apache Doris 之后,只需要 3 分钟就可以完成计算,之前每周更新一次的全链路物流时效报表,现在可以做到每 10 分钟更新最新的数据,达到了准实时的数据时效。2. 得益于 Apache Doris 的标准化 SQL,上手难度小,学习成本低,报表的迁移工作全员都可以参与。原来报表使用 PowerBI 进行开发,需要对 PowerBI 有非常深入的了解,学习成本很高,开发周期也很长,而且 PowerBI 不使用标准 SQL,代码可读性差;现在基于 Doris SQL 加上自研的拖拉拽形式的报表平台,报表的开发成本直线下降,大部分需求的开发周期从周下降到了天。

用户案例-某运营服务商


Apache Doris 构建的离线+实时数仓一体化,采用 SQL 开发,并用 Dolphinscheduler 一键部署调度,极大的降低开发难度和开发工作量,可进行快速迭代以满足目前行业日益增长的数据需求。
新架构采用 Flink+Doris 的架构体系,FlinkCDC+StreamLoad 可以做到流批一体化数据接入,减少了组件的使用,解决了数据的冗余存储,服务器资源节省了 30%,数据存储磁盘占用减少 40%,同时组件的运维成本大大减少。
Doris的易用性极高,支持 MySQL 协议和标准 SQL,各业务线均可通过查询 MySQL 的方式进行数据查询,极大的减少了学习成本。
从 2021 年 Apache Doris上线云积互动的第一个业务至今,Apache Doris 在云积互动内部已成为大数据服务的基础,承担了包括人群分析、报表查询、指标计算等场景下的在线/离线需求,在较小的集群规模下支持了每天近2万次的用户在线分析查询。

用户案例-某供应链企业

当前我们的集群配置为 5 台阿里云 ESC,16 核 64G。在相同集群配置下,1000 个表的每日增量数据合并任务,用 Hive 需要 3-5 小时,用 Spark需要 2-3 小时,然而同样的需求 Drois 运用 Unique Key 模型完成只需要 10 分钟,大大提前了后续计算任务的开始时间。
另外,因 Hive 跑得慢,我们后续的几百个 Hive 计算任务,排队情况很严重,不得不把一些优先级低的任务排到下午甚至晚上,日任务全部跑完需要十几个小时。而我们把全部批任务迁移到 Doris 上计算后,全部任务跑完只需要 2 小时不到,后续增加新的需求任务完全无压力。
总而言之,使用 Doris 后,报表数据的更新时间大大提前,临时的数据查询需求响应时长大大缩短,至少节约了每年几万的大数据集群扩容成本,同时获得了各部门的认可。
提升开发效率
随着公司业务快速的发展,会不断的有新的数据分析需求,就需要我们接入新库新表,给老表加字段等,这对于 Hive 数仓是非常痛苦的,表要重建、全量数据要抽,这就需要每周有半天时间都在处理这些事情。
在使用 Doris 作为数仓后,通过我们的数据易平台配置 Flink CDC 任务快速接入 MySQL 库表的全量+增量数据,同时利用 Doris 的 Online Schema Change 特性,实时同步 Binlog 里的 DDL 表结构变更到 Doris,数据接入数仓零开发成本。
另外因为 Doris 支持 MySQL 协议直接对接数据可视化应用,我们不需要再把结果数据从 Hive 推到 MySQL 里提供数据服务,节约了数据库资源,减少了开发步骤。
体现数据价值
Doris 有审计日志,我们可以通过日志,分析出每个表每天的查询使用情况,以便我们评估跟进数据价值、下线废弃报表及任务。另外还可以预警资源消耗多、查询慢的查询语句,帮助用户进行语法优化等。