Victor You know, I see

Flink水印理解

2020-05-28
Victor

水印的概念

水印简单的来说,就是衡量Event Time的一种机制,给定Event Time 减去一个可容忍的延迟时间,然后再触发窗口计算。对于存在延迟的元素,不能无限期的等下去,必须要有个机制来保证一个特定的时间后,触发Window去进行计算。

水印,用于告诉系统事件时间中的执行进度,时间戳的分配随着水印的生成。水印的依赖条件是窗口,水印只是决定了窗口的触发时间,watermark是用于处理乱序事件的。 有两种方式可以分配时间戳和生成水印:

  • 直接在数据流源中进行。标点水位线(Punctuated Watermark)-数据流中每一个递增的EventTime都会产生一个Watermark,只有在实时性要求非常高的场景才会选择Punctuated的方式进行 Watermark的生成。
  • 通过timestamp assigner和watermark generator生成;定期水位线(Periodic Watermark)-周期性的(一定时间间隔或者达到一定的记录条数)产生一个 Watermark。在实际的生产中 Periodic 的方式必须结合时间和积累条数两个维度继续周期性产生 Watermark,否则在极端情况下会有很大的延时。

    注意:如果指定多次watermark,后面指定的会覆盖前面的值;多并行度的情况下,watermark对齐会取所有channel最小的watermark;

Watermark工作原理(拷贝自tqz):

假设设置水印的时候允许最大延迟时间为 10s,则水印值会在 Event Time 上面减 10  
Event Time:2019-8-27 20:48:20  
Watermark:2019-8-27 20:48:10  
同时还设置了时间窗口为 3s。3s 意味着一分钟将被划分为如下的形式:  
[00:00:00, 00:00:03)、[00:00:03, 00:00:06)、... 、[00:00:57, 00:01:00)  
区间前面是 Window_start_time,后面是 Window_end_time,左闭右开  

假设现在有一个 Event Time :2019-8-27 20:34:33  
那么,他的 Watermark :2019-8-27 20:34:23  
那么按照 3s 的窗口划分的话,离当前 watermark 时间最近的窗口应该是:  
[2019-8-27 20:34:18, 2019-8-27 20:34:21)  
以及  
[2019-8-27 20:34:21, 2019-8-27 20:34:24)  
那么该 Watermark 时间最后的数字是 23s,则它落在 [21, 24) 这个区间里面,24 也叫 Window_end_time,但是 Watermark 的这个 23 < 24,所以无法触发窗口。  
当再来一条数据Event Time :2019-8-27 20:34:34  
那么,他的 Waretmark :2019-8-27 20:34:24  
此时离该 watermark 最近的窗口依然为 [21, 24),并且 watermark 的 24 >= 窗口的 24,于是触发了窗口  

Window划分

Window的设定无关数据本身,而是系统定义好了的。 Window是flink中划分数据一个基本单位,Window的划分方式是固定的,默认会根据自然时间划分Window,并且划分方式是前闭后开。 Window 的触发要符合以下几个条件
对于late element太多的数据而言 Event Time < Watermark时间 对于out-of-order以及正常的数据而言 Watermark时间 >= Window_end_time 在[Window_start_time,Window_end_time)中有数据存在

个人理解(后续有误再更改)

窗口的触发和水印之间有着不可分割的关系,其中,数据本身对于窗口来说可以分为:乱序数据和迟到太多的数据。

  • 乱序数据
    有水印公式为:Watermark=当前最大的时间戳-可容忍的delay,当乱序数据到达时,当前的Watermark在容忍时间内并没有大于等于该乱序数据的Window_end_time(窗口结束时间),则代表该乱序数据任然可以被正确处理,将其放到对应的Window区间内做计算即可。 举个例子,设置窗口间隔划分为5s,那么对应的划分规则就是:

    Window划分 w1 w2 w3 ……
    5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15) ……

    假设设置水印的时候允许最大延迟时间为10s,则水印值会在Event Time上面减10,例如:
    Event Time:2020-05-29 10:48:20
    Watermark:2020-05-29 10:48:10
    设当前数据包含的最大事件时间为2020-05-29 10:48:34,那么水印为2020-05-29 10:48:24,距离最近的窗口为[2020-05-29 10:48:20~2020-05-29 10:48:25)。那么当来了一个乱序数据流事件时间为2020-05-29 10:48:22时,水印时间尚未大于等于窗口结束时间,则将乱序数据归于改窗口做计算。

  • 迟到太多的数据
    按照水印公式,假如窗口划分标准也是如上述乱序数据,设当前数据包含的最大事件时间为2020-05-29 10:48:34,那么水印为2020-05-29 10:48:24,距离最近的窗口为[2020-05-29 10:48:20~2020-05-29 10:48:25)。那么当来了一个乱序数据流事件时间为2020-05-29 10:48:19时,该事件时间所在的最近窗口已经被触发,且事件时间 < Watermark,则最终该事件默认会被丢弃。

    迟到事件出现时窗口已经关闭并产出了计算结果,处理的方法有3种:
    重新激活已经关闭的窗口并重新计算以修正结果;
    将迟到事件收集起来另外处理;
    将迟到事件视为错误消息并丢弃;
    Flink 默认的处理方式是直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。 Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
    Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。


Similar Posts

Comments