一、Flink集群架构
1.1 Flink架构模型
主要包含四个不同的组件:
作业管理器(JobManager)
资源管理器(ResourceManager)
任务管理器(TaskManager)
分发器(Application)
Flink首先是由Scala和Java实现的,所有的组件都会运行在jvm上,当flink集群启动的时候,首先会启动一个JobManager和一个或多个TaskManager。由client提交任务给JobManager,JobManager再调度任务到一个或多个TaskManager上,然后TaskManager将心跳和统计信息汇报给JobManager,JobManager和TaskManager以流的形式进行数据传输。以上为一个独立的jvm进程。
1.2 角色分配
1.2.1 JobManager
作业管理器是管理整个应用程序执行的主进程
作业管理器首先会接收到各种需要执行的程序,包括作业图(JobGrap)、逻辑数据流图(logical dataflow Grap)和打包起来的各种jar包、类库等。
然后作业管理器会把JobGrap转换成一个物理层面的数据流图,这个图被称为执行图(ExecutionGrap),包含了可以并发执行的任务。
之后作业管理器会向资源管理器申请资源,一旦JobManager获取到所需的资源,就会将执行流图发送给TaskManager去执行。
在执行的过程种,作业管理器会负责所有需要中央协调的工作,比如说检查点的协调。
1.2.2 ResourceManager
资源管理器主要负责管理TaskManager种的slot。
作用:
资源管理器可以向JobManager分配有空闲solt的TaskManager。
如果ResourceManager无法满足作业管理器所需的资源的时候,Resource会向资源提供平台发起会话,以提供启动TaskManager的容器。
ResourceManager还负责结束终止空闲的TaskManager,释放计算机资源。
1.2.3 TaskManager
任务管理器是Flink种的工作进程。
TaskManager启动之后会向ResourceManager注册它的插槽
收到资源管理器的指令后,TaskManager会向JobManager提供一个或多个插槽,作业管理器就可以向JobManager插槽分配任务。
在执行过程中,一个TaskManager可以和同一应用程序种的其它TaskManager交换数据。
1.2.4 Dispatcher
分发器可以跨作业运行,它为应用提供了REST接口。
当一个应用被提交时,分发器就会启动并将移交给作业管理器。
由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能不受防火墙的阻碍。
二、Flink集群搭建
2.1 搭建略
2.2 FlinkOnYarn模式
2.2.1 运行原理
步骤1:当启动一个新的Flink Yarn Client会话,客户端会检查请求的资源是否可用,之后它会上传包含了Flink配置文件和jar包到HDFS。
步骤2:客户端会请求一个container去启动一个ApplicationMaster
步骤3:RsourceManager选一台JobManager去启动Application
注意:由于一些客户端已经将Flink配置文件和jar包注册到HDFS了,所以JobManager会负责准备一些初始化(例如,下载文件)。一旦这些工作完成了,ApplicationMaster就启动了
注意2:JobManager和Application运行在同一个容器中,一旦它们运行成功了,JobManager会根据自己的和Application的地址生成一个新的配置文件,这样才能和JobManager建立连接。该文件同样也会上传到HDFS,另外,AM容器提供了Flink的web界面服务。
步骤4:AM开始为Flink的TaskManager非陪容器containter,在对应的nodeManager上启动TaskManager
步骤5:初始化工作,从HDFS上下载jar文件和修改过的文件。一旦这些步骤完成了,Flink就安装完成了并准备接收任务了。
2.2.2 Yarn-Session模式
该模式是预先在yarn上面划分一部分资源给flink集群用,flink提交的所有任务,共用这些资源。
2.2.3 Single-job模式
该模式是每次提交任务都会创建一个新的flink集群,任务之间互不影响,方便管理,任务执行完成后,flink集群也会消失。
三、TaskSlot与Parallelism
3.1 逻辑执行计划图
图形解释
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
StreamGrap
根据用户代码生成的最初的图,用来表示程序的拓扑结构。
JobGrap
StreamGrap经过优化后生成JobGrap。提交给JobManager的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGrap
JobManager根据JobGrap生成ExecutionGrap
ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
物理执行图
JobManager根据ExecutionGrap对job进行调度后,在各个TaskManager上部署Task后形成的“图”。
3.2 Slot与Task
一个TaskManager可以同时执行多个任务(tasks)。
这些任务可以是同一个算子的子任务(数据并行)
这些任务可以是来自不同算子(任务并行)
这些任务可以是另一个不同应用程序(作业并行)。
TaskManager提供了一定数量的处理插槽(processing slots),用于控制可以并行执行的任务 数。 一个slot可以执行应用的一个分片,也就是应用中每一个算子的一个并行任务。 Slot对资源的隔离仅仅是对内存进行隔离,策略是均分 比如taskmanager的管理内存是3GB,假如有两个slot,那么每个slot就仅仅有1.5GB内存可 用。 插槽的数量通常与每个TaskManager的可用CPU内核数成比例。一般情况下你的slot数是你每个 TM的cpu的核数
3.3 Flink任务并行度
Flink程序由多个任务(转换/运算符,数据源和接收器)组成,Flink中的程序本质上是并行和分布 式的。 在执行期间,流具有一个或多个流分区,并且每个operator具有一个或多个operator子任务。 operator子任务彼此独立,并且可以在不同的线程中执行,这些线程又可能在不同的机器或容器上 执行。 operator子任务的数量是该特定operator的并行度。同一程序的不同operator可能具有不同的并 行度。
3.4 任务并行度设置
Flink程序的任务并行度设置分为四个级别。
3.4.1 算子级别
通过调用其setParallelism()方法来定义单个运算符,数据源或数据接收器的并行度 。
3.4.2 执行环境级别
执行环境级别的并行度是本次任务中所有的操作符,数据源和数据接收器的并行度。可以通过显式 的配置运算符并行度来覆盖执行环境并行度。
3.4.3 客户端级别
在向Flink提交作业时,可以在客户端设置并行度,通过使用指定的parallelism参数-p。例如: flink run -c com.xxxx.WordCount -p 6 /root/flink_wordcount-1.0-SNAPSHOT.jar
3.4.4 系统级别
通过设置 ${flink_home}/conf/flink-conf.yaml 配置文件中的 parallelism.default`配置项来定 义默认并行度。
3.5Operator Chains(操作链)
Flink出于分布式执行的目的,将operator的subtask链接在一起形成task。 每个task在一个线程中执行。 将operators链接成task是非常有效的优化: 它可以减少线程与线程间的切换和数据缓冲的开销,并在降低延迟的同时提高整体吞吐量。 链接的行为可以在编程API中进行指定开启操作链(默认) 和 禁用操作链的
四、Flink的Window窗口机制
4.1. 基本概念
Windows是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计 算。Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实 现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了 非常完善的窗口机制.
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们 可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内 有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的 数据,并对这个窗口内的数据进行计算.
窗口可以是
基于时间驱动的(Time Window,例如:每30秒钟),
基于数据驱动的(Count Window,例如:每一百个元素)。
基于不同事件驱动的窗口又可以分成以下几类:
翻滚窗口(Tumbling Window,无重叠)
滑动窗口(Sliding Window,有重叠)
会话窗口(Session Window,活动间隙)
全局窗口 (Global Window 全局窗口)
4.2. 翻滚窗口(Tumbling Window)
翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口. 翻滚窗具有固定的尺寸,不重叠。
4.2.1. 基于时间驱动
场景1:统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分 被成为翻滚时间窗口(Tumbling Time Window)。
4.2.2. 基于事件驱动
场景2:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满100个”相同”元素了,就会 对窗口进行计算。
4.3. 滑动窗口(Sliding Window)
滑动窗口和翻滚窗口类似,区别在于:滑动窗口可以有重叠的部分。在滑窗中,一个元素可以对应多个 窗口。如下图所示:
4.3.1. 基于时间驱动
场景3:我们可以每30秒计算一次最近一分钟用户购买的商品总数。
4.3.2. 基于事件驱动
场景4:每10个 “相同”元素计算一次最近100个元素的总和.
4.4. 会话窗口(session windown)
会话窗口不重叠,没有固定的开始和结束时间. 与翻滚窗口和滑动窗口相反,当会话窗口在一段时间内没 有接收到元素时,会关闭会话窗口。后续元素将分配给新的会话窗口.
4.4.1. 基于会话驱动
例子:计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开。
4.5. 全局窗口(Global Windows)
将所有相同的key的数据分配到单个窗口中计算结果,窗口没有起始和结束时间,窗口需要借助于 Triger来触发计算,如果不对Global Windows指定Triger,窗口是不会触发计算的。 使用Global Windows需要非常慎重,用户需要非常明确自己在整个窗口中统计出的结果是什么, 并指定对应的触发器,同时还需要有指定相应的数据清理机制,否则数据将一直留在内存中。
4.6. window窗口聚合函数
Flink提供了两大类窗口函数,分别为增量聚合函数和全量窗口函数。 增量聚合函数的性能要比全量窗口函数高,因为增量聚合窗口是基于中间结果状态计算最终结 果的,即窗口中只维护一个中间结果状态,不要缓存所有的窗口数据。 全量窗口函数,需要对所有进入该窗口的数据进行缓存,等到窗口触发时才会遍历窗口内所有 数据,进行结果计算。如果窗口数据量比较大或者窗口时间较长,就会耗费很多的资源缓存数 据,从而导致性能下降。
五、Time与WaterMark
事件时间和水位线诞生的背景: 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有2秒的延时,也就是在实际 时间的第1秒产生的数据有可能在第3秒中产生的数据之后到来。假设在一个5秒的滚动窗口中,有一个 EventTime是 9秒的数据,在第11秒时候到来了。图示第9秒的数据,在11秒到来了。
如下图所示:
5.1 时间
5.1.1. Processing time(处理时间)
处理时间是指当前机器处理该条事件的时间。 它是当数据流入到具体某个算子时候相应的系统时间。他提供了最小的延时和最佳的性能。但是在分布 式和异步环境中,处理时间不能提供确定性,因为它对事件到达系统的速度和数据流在系统的各个 operator之间处理的速度很敏感。
5.1.2. Event Time(事件时间)
事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入到记录中,并且可以 从每个记录中提取该事件时间戳。事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结 果。事件时间依赖于事件本身,而跟物理时钟没有关系。基于事件时间的程序必须指定如何生成事件时 间水位线(wartermark),这是指示事件时间进度的机制。水位线机制在后面部分中会描述。 事件时间处理通常存在一定的延时,因此需要为延时和无序的事件等待一段时间。因此,使用事件时间 编程通常需要与处理时间相结合。
5.1.3. Ingestion time(摄入时间)
摄入时间是数据进入Flink框架的时间,是在Source Operator中设置的。与ProcessingTime相比可以提 供更可预测的结果,因为摄入时间的时间戳比较稳定(在源处只记录一次),同一数据在流经不同窗口操作 时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。
5.2. WaterMark(水位线)
5.2.1. WaterMark产生背景
在使用EventTime处理Stream 数据的时候会遇到数据乱序的问题,流处理从Event(事件)产生, 流经Source,再到Operator,这中间需要一定的时间。虽然大部分情况下,传输到 Operator 的数据都 是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因而导致乱序的产生,特别是使用 Kafka 的时候,多个分区之间的数据无法保证有序。因此,在进行Window计算的时候,不能无限期地 等下去,必须要有个机制来保证在特定的时间后,必须触发 Window 进行计算,这个特别的机制就是 Watermark(水位线)。Watermark 是用于处理乱序事件的
Watermark是Flink为了处理EventTime时间类型的窗口计算提出的一种机制,本质上也是一种时间 戳。
Watermark是用于处理乱序事件的,通常用watermark机制结合window来实现。
5.2.2. Watermark原理 在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操 作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处 理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到 达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink系统时,会根据当前最大事件时间产 生 Watermarks 时间戳。
有 Watermark 的 Window 是怎么触发窗口函数的呢?
如果有窗口的停止时间等于或者小于 maxEventTime – t(当时的 warkmark),那么这个窗 口被触发执行。
注意:
Watermark 本质可以理解成一个延迟触发机制。
案例: 一个5秒的滚动窗口, 延时3秒 第一条进入4, watermark=4-3=1 < 5, 不触发 第二条进入6, watermark=6-3=3 < 5, 不触发 第三条进入8, watermark =8-3=5 = 5, 触发窗口执行 watermark只是让窗口延时触发,并不会将其他窗口数据计算,如本例中,只会计算4的 这条数据,其他数据保存在内存中 第四条进入5, watermark=8-3=5=5, 但是注意0~5秒的窗口已经触发了
5.2.3. Watermark三种使用情况
Flink内部传播水位线的策略可以归纳为3点: 首先,水位线是以广播的形式在算子之间进行传播 Long.MAX_VALUE表示事件时间的结束,即未来不会有数据到来了
单个分区的输入取最大值,多个分区的输入取最小值
本来有序的Stream中的Watermark 如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生 成 此时水位线的变化和事件时间保持一致(因为既然是有序的时间,就不需要设置延迟了,那么 t 就是 0。 所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位线。 当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。
乱序事件中的Watermark 现实情况下数据元素往往并不是按照其产生顺序接入到Flink系统中进行处理,而频繁出现乱 序或迟到的情况,这种情况就需要使用Watermarks来应对。
并行数据流中的Watermark 在多并行度的情况下,Watermark会有一个对齐机制,这个对齐机制会取所有Channel中最 小的Watermark。 有4个上游数据,watermark分为是4,7,6 分区数据watermark数据是2,4,3,6 第一张图: 当上游第1,2,3个数据都没来的时候,全部分区数据最小的是2,因此输出当前 事件时间时间为2 第二张图:上游第1个数据来了,也就是4数据把原来的2覆盖了,这时候数据变成了4,4, 3,6 最小的数据变成3,因此输出当前事件时间为3 第三张图:上游第2个数据7来了,也就是7把原来的4覆盖了,这时候数据变成了4,7,3,6 最小的数据仍是3,因此输出当前事件时间仍是3 第四张图:上游第3个数据6来了,也就是6把3给覆盖了,这时候数据变成了4,7,6,6 最小 的数据变成了4,因此输出当前事件时间是4
一个任务会为它的每个分区都维护一个分区水位线(partition watermark),当收到每个分区传 来的水位线时,任务首先会让当前分区水位线的值与接收的水位线值相比较,如果新接收的水 位线值大于当前分区水位线值,则会将对应的分区水位线值更新为较大的水位线值(如上图中 的2步骤),接着,任务会把事件时钟调整为当前分区水位线值的最小值,如上图步骤2 ,由于 当前分区水位线的最小值为3,所以将事件时间时钟更新为3,然后将值为3的水位线广播到下 游任务。步骤3与步骤4的处理逻辑同上。 同时我们可以注意到这种设计其实有一个局限,具体体现在没有对分区(partition)是否来自于 不同的流进行区分,比如对于两条流或多条流的Union或Connect操作,同样是按照全部分区 水位线中最小值来更新事件时间时钟,这就导致所有的输入记录都会按照基于同一个事件时间 时钟来处理,这种一刀切的做法对于同一个流的不同分区而言是无可厚非的,但是对于多条流 而言,强制使用一个时钟进行同步会对整个集群带来较大的性能开销,比如当两个流的水位线 相差很大是,其中的一个流要等待最慢的那条流,而较快的流的记录会在状态中缓存,直到事 件时间时钟到达允许处理它们的那个时间点。
5.2.4. Watermark的产生方式
目前Apache Flink 有两种生产Watermark的方式,如下: 其中一种方式为在数据源完成的,即利用SourceFunction在应用读入数据流的时候分配时间 戳与水位线。 另一种方式是通过实现接口的自定义函数,该方式又包括两种实现方式: 一种为周期性生成水位线,即实现AssignerWithPeriodicWatermarks接口, 周期性的生成水位线 另一种为定点生成水位线,即实AssignerWithPunctuatedWatermarks接口。 每条数据都会产生水位线
数据源方式
该方式主要是实现自定义数据源,数据源分配时间戳和水位线主要是通过内部的 SourceContext对象实现的
周期分配器(AssignerWithPeriodicWatermarks)
该分配器是实现了一个AssignerWithPeriodicWatermarks的用户自定义函数,通过重写 extractTimestamp()方法来提取时间戳,提取出来的时间戳会附加在各自的记录上,查询得 到的水位线会注入到数据流中。 周期性的生成水位线是指以固定的时间间隔来发出水位线并推进事件时间的前进,关于默认的 时间间隔在上文中也有提到,根据选择的时间语义确定默认的时间间隔,如果使用 Processing Time或者Event Time,默认的水位线间隔时间是200毫秒,当然用户也可以自己 设定时间间隔所以,如果要调整默认的200毫秒的间隔,可以调用setAutoWatermarkInterval()方法
上面指定了每隔3秒生成一次水位线,即每隔3秒会自动向流里注入一个水位线,在代码层 面,Flink会每隔3秒钟调用一次AssignerWithPeriodicWatermarks的 getCurrentWatermark()方法,每次调用该方法时,如果得到的值不为空并且大于上一个水位 线的时间戳,那么就会向流中注入一个新的水位线。这项检查可以有效地保证了事件时间的递 增的特性,一旦检查失败也就不会生成水位线。
定点水位线分配器(AssignerWithPunctuatedWatermarks)
定点水位线分配器是基于某些事件(指示系统进度的特殊元祖或标记)触发水位线的生成与发 送,基于特定的事件向流中注入一个水位线,流中的每一个元素都有机会判断是否生成一个水 位线,如果得到的水位线不为空并且大于之前的水位线,就生成水位线并注入流中。 实现AssignerWithPunctuatedWatermarks接口,重写checkAndGetNextWatermark()方 法,该方法会在针对每个事件的extractTimestamp()方法后立即调用,以此来决定是否生成 一个新的水位线,如果该方法返回一个非空并且大于之前值的水位线,就会将这个新的水位线 发出。
5.2.5. Watermark的迟到的数据
现实中很难生成一个完美的水位线,水位线就是在延迟与准确性之前做的一种权衡。那么,如果生 成的水位线过于紧迫,即水位线可能会大于后来数据的时间戳,这就意味着数据有延迟,关于延迟 数据的处理,Flink提供了一些机制,具体如下: 直接将迟到的数据丢弃 将迟到的数据输出到单独的数据流中,即使用sideOutputLateData(new OutputTag())实 现侧输出 根据迟到的事件更新并发出结果