Flink 作业运行时,最常见的问题就是积压问题, 当作业出现积压时,如何才能快速定位到积压原因,并针对性解决呢?
积压的发现
通过我们会通过配置作业的积压报警来及时发现作用的积压情况,下面是一些常用的积压监控指标:
freshness
freshness 一般代表当前消费的消息体时间和当前时刻的差值,如果差值越大,说明积压也就越严重。
无论是消息队列还是数据湖,消息体本身就带有时间戳,因此可以非常方便计算当前消费的消息时间戳和当前时间的差值。
offsetLag
积压的条目数,适用于消息队列, 一般指当前消费的位点和相比消息队列的头节点的 offset 差值。
snapshotLag
snapshot 积压的个数,适用于数据湖,代表当前消费的 snapshot 和最新的 snapshot 版本的差异数量
splitLag
剩余的 split 数量,适用于数据湖, 代表剩余待消费的分片数量
积压问题的排查
反压排查时,一般分为两步:
- 发现存在积压的 task
- 结合 jstack 造成积压的具体原因
发现积压的 task
通常有多种手段来找到积压的 task
inpoolUsage/outpoolUsage
An estimate of the input/output buffers usage. (ignores LocalInputChannels)
一般情况下, 有这个一个原则:
- inpool 高的 task 处理比较慢 (可能原因:1. 自身处理慢 2. 下游处理慢导致反压)
- outpool 高的 task 下游处理比较慢
反压
If you see a back pressure warning (e.g. High) for a task, this means that it is producing data faster than the downstream operators can consume. Records in your job flow downstream (e.g. from sources to sinks) and back pressure is propagated in the opposite direction, up the stream.
通常如果 task 存在反压,并不是这个 task 处理慢了, 而是他的下游太慢,因此当作业存在多个 task 时, 反压是连续的,直到慢节点
如下图所示:一般情况是 task1, task2, task3 的反压比较高,从 task4 开始降低,则基本可以判断 task4 处理比较慢
checkpoint
数据积压通常会导致 checkpoint 超时, 因此通过 checkpoint 的耗时情况,也能反映出作业处理速度的快慢,从而能够定位到处理比较耗时的 task 。
结合 stack 分析原因
当找到了可能存在积压的 task 之后,再结合 stack 进一步确定原因。
常见的原因:
状态访问比较慢(rocksdb)
常见于使用 Rocksdb 的作业, 状态量比较大,作业的 stack 经常
GC 严重
针对不同的 statebackend, 常见的原因大不相同
使用 Rocksdb 的作业,确定主要占用在哪里(业务逻辑占用,broadcast state 占用),结合实际的需求,扩充堆内内存。
使用 Filesystem 的作业, 有较大概率是状态量增加,导致堆内内存不足,导致频繁 GC,及时扩内存即可
外部系统访问比较慢
在 Flink 作业中, 一般使用三种方式访问外部系统:source,sink,维度表
source
常见两种提速手段:
- 扩 source partition的个数
- 扩 source task 的并发数
sink
常见三种提速手段:
- 扩 sink 表的 partition
- 扩 sink task 的并发
- sink 修改逻辑, 使用异步 io
维度表
常见两种提速手段
- 扩 join 算子的并发
- 采用异步 join方式,提升 join 速度
作业处理达到瓶颈
一般 Flink 的 task 处理速度在 2~3w, 处理快的5~6w, 因此如果单个 task 的处理速度在预期范围, 但依然出现了积压, 则可以需要考虑扩容。