一.引言
ProcessFunction 原始执行状态为每个 key 注册一个较长时间 TimeTimer 并在这期间将所有对应 key 的数据都收集起来,到期完成触发。现在接到新的需求,要求判断数据类型,当特殊标识的数据到达后,需要将 TimeTimer 到期的时间提前。因此需要删掉当前 key 之前注册的老的 TimeTimer,下面铺下自己踩坑的完整过程。
二.情景复现
1.数据源
为了测试数据,我们自定义数据流,其中 SourceInfo 为 CaseClass,包含了 key、sendTime 与 isRealTime 三个字段,前两个常规字段,ProcessFunction 会根据先到的 SourceInfo 获取其 SendTime 并注册 TimeTimer,isRealTime 为新增需求字段,该字段默认 False,为 True 时代表提前 TimeTimer 的触发时间。
case class SourceInfo(key: String, sendTime: Long, isRealTime: Boolean = false) { override def toString: String = { s"key: $key isRealTime: $isRealTime sendTime: $sendTime" } }