18

打开窗,让 Flink 驻进

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI4NjE2MTIyOQ%3D%3D&%3Bmid=2652608863&%3Bidx=1&%3Bsn=3cff50cf3311c8bec45f3200d2d33211
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

最近天气时好时坏,忽冷忽热,感冒的人有点多,注意防寒保暖。

笔者讲解 Apache Flink 培训系列课程已经有一段时间了,一些读者反馈完成了所有实验并应用到生产实际案例,这真的非常棒,学有所成。

笔者今天继续讲解 Apache Flink 培训系列课程中的 Window 生态的内容。

Window 引入

打开窗,让春风驻进。

哦,不,是打开窗,让 Flink 驻进。

对 Flink 有所了解的读者应该都知道,Flink 实现了批处理和流处理,即 Flink 批流一体。而 Flink 的批处理又是流处理的一个特例,其中窗口就是从流处理到批处理的一个重要的桥梁。

大家查看 Flink 官网时,会发现 Flink 提供了非常全面的窗口机制:

e67byeI.jpg!web

当然笔者不会按部就班,洋洋洒洒长篇大论,读者看起来太累,笔者也没有那么多完整的时间,笔者希望采取渐进式,即分章分层次渐入佳境式,最后以达到完全掌握窗口的知识。

Window 含义

从字面意思理解,窗口就是一段区间。

  • 比如我们想要统计在过去的 10 分钟内有多少车流量。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

  • 再比如我们想要监控事件流数据每一千个元素,发出报警。

从上面可以理解出,Flink 中窗口可以是基于时间的窗口(Count-based windows),也可以是基于计数的窗口(Time-based windows)。

其实笔者在前面的文章中提到过 Flink 的窗口,它是将一个无界数据流拆分成有界的数据集合,也是处理有限流的核心组件。

Window 是实时流应用程序中的常见操作,它支持在无界流的有界间隔上进行聚合之类的转换操作。通常,这些间隔是基于时间的逻辑定义的。Window operator 提供了一种方法来将事件分组到有限大小的 bucket 中(其实就是有界的数据集合),并对这些 bucket 的有界内容应用计算逻辑。例如 Window operator 可以将流的事件分组到 5 分钟的窗口中,并计算每个窗口已经接收了多少事件。

针对不同的需求场景,Flink 提供了几种不同的 Window 类型。

Window 类型

其实上面抛出了两个问题:

  • 1. 何时创建存储数据的 bucket

  • 2. 流数据分配到哪个 bucket

那么 Flink 定义了哪些策略来解决上面的问题呢?

其实 Flink 使用不同 window 类型的策略来解决这些问题,如下:

  • Tumbling Windows

    即滚动窗口,比如计算每隔 10 分钟的高铁客流量,需要使用滚动窗口,每 10 分钟累计一次。

  • Sliding Windows

    即滑动窗口,比如每 2 分钟计算一下最近 10 分钟的高铁客流量呢,需要使用滑动窗口,滑动的大小为 2 分钟。

  • Session Windows

    即会话窗口,比如统计用户在一次网页交互的会话内点击页面的次数,那么此时就需要用会话窗口了。

  • Global Windows

    即全局窗口,全局窗口分配器将具有相同键的所有元素分配给同一单个全局窗口。仅当指定自定义触发器时,此窗口模式才有用。否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然的结束点。

另外,Flink 也支持自定义 window 类型。如果需要定制数据分发策略,则可以实现一个 Class,继承自 WindowAssigner。

下面笔者会详细讲解 Flink 内置的 window 类型,即 WindowAssigner。

内置的 Window Assigners

Flink 为最常见的窗口使用案例提供了内置的 WindowAssigner。笔者在此处讨论的所有 assigners 都是基于时间的,基于时间的 WindowAssigner 根据 event-time 时间戳或当前 processing-time 向窗口分配元素。时间窗口有开始和结束的时间戳。

所有内置的 WindowAssigner 都提供一个默认触发器,一旦 processing-time 或 event-time 时间超过了窗口的结尾,它将触发窗口的计算。重要的是要注意,当第一个元素分配给它时,就会创建一个窗口。Flink 永远不会计算空的窗口。

基于计数的窗口

除了基于时间的窗口外,Flink 还支持基于计数的窗口,即按到达 window operator 的顺序将固定数量的元素分组的窗口。由于它们取决于摄取的顺序,因此基于计数的窗口不是确定性的。此外,如果在没有自定义触发器的情况下使用它们,它们可能会引起问题,该触发器在某个时候会丢弃不完整和过时的窗口。

Flink 的内置 WindowAssigner 创建 TimeWindow 类型的窗口。此窗口类型实质上表示两个时间戳之间的时间间隔,其中包含开始时间,不包含结束时间。

下面,笔者展示 DataStream API 的不同内置 WindowAssigner 以及如何使用它们来定义 窗口算子。

Tumbling Windows

即滚动窗口,滚动窗口分配器将元素放入不重叠的固定大小的窗口中,如图所示:

73ABJru.jpg!web

Datastream API 提供两个分配器:TumblingEventTimeWindows 和 TumblingProcessingTimeWindows

TumblingEventTimeWindows 分别用于滚动 event-time window 和 processing-time window。滚动窗口分配器接收一个参数,窗口大小以时间单位表示;可以使用分配器的 of(Time size)方法指定。时间间隔可以设置为毫秒、秒、分钟、小时或天。

以下代码显示如何在传感器数据测量流上定义 event-time 和 processing-time 滚动窗口:

可以使用简便的方式指定 window 时间:

默认情况下,滚动窗口与纪元时间 1970-01-01-00:00:00.000 对齐。例如,一个大小为 1 小时的 assigner 将在 00:00:00、01:00:00、02:00:00 等处定义窗口。当然,也可以在 assigner 中定义第二个参数,表示 offset。下面的代码显示了偏移量为 15 分钟的窗口,偏移量分别从 00:15:00、01:15:00、02:15:00 开始,依次类推:

Sliding Windows

即滑动窗口,滑动窗口分配器将元素分配给固定大小的窗口,这些窗口按指定的滑动间隔移动,如图所示:

na2aInV.jpg!web

对于滑动窗口,必须指定窗口大小和滑动间隔,以定义新窗口的启动频率。当滑动间隔小于窗口大小时,窗口重叠,可以将元素分配给多个窗口。如果滑动间隔大于窗口的大小,一些元素可能不会被分配到任何窗口,因此可能被删除。

下面的代码展示了如何将传感器读数分组到 1 小时大小的滑动窗口中,滑动间隔为15分钟。每个读数将被添加到四个窗口。DataStream API 提供了 event-time 和 processing-time 分配器,以及使用的快捷方法,并且可以将时间间隔 offset 设置为 window assigner 的第三个参数:

Session Windows

即会话窗口,会话窗口分配器将元素放入大小不同的活动的非重叠窗口中。会话窗口的边界由不活动的间隙定义,在这些间隙中没有接收到任何记录。下图说明了如何将元素分配给会话窗口:

qauqe22.png!web

笔者将在下面的示例中演示如何将传感器读数分组到会话窗口,其中每个会话都定义为 15 分钟的不活动时间:

由于会话窗口的开始和结束取决于接收到的元素,所以 window assigner 不能立即将所有元素分配到正确的窗口。取而代之的是,SessionWindows assigner 最初将每个传入元素映射到它自己的窗口中,以元素的时间戳作为开始时间,会话间隔作为窗口大小。随后,它将合并具有重叠范围的所有窗口。

Global Windows

mYv2Inb.png!web

全局窗口分配器将具有相同键的所有元素分配给同一单个全局窗口。仅当指定自定义 trigger 时,此窗口模式才有用。否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然的结束点。

为了方便大家理解,笔者举一个例子,就拿点餐来说:

  • 1. 用 户登录小程序或 APP,会进行一系列点点点操作,比如点击、浏览、搜索、购买等,用户的这些操作可以被记录为用户操作的事件流,也可以理解为数据流。

  • 2. 使 用小程序或 APP 的用户会有很多,每个用户的操作都是独立的,没有必然的联系,各自的数据流分别分配在单独的全局窗口。

  • 3. 如果该全局窗口没有指定 trigger 条件,永远不会发生计算。 所以需要指定自定义的 trigger 才会执行运算。

Window 使用方式

Flink 的 DataStream API 为最常见的窗口操作提供了各种内置方法,并提供了非常灵活的窗口机制来定义自定义窗口逻辑,如下为常用的 Keyed Windows 和 Non-Keyed Windows。

qqQv6bq.jpg!web

我们可以将 Window 应用于分组的流中,如下所示:

也可以将 window 应用在非分组的流中,如下所示:

大家先熟悉一下即可,里面有几个算子,笔者解释一下:

  • window/windowAll

    window 方法接收的输入是一个 WindowAssigner。WindowAssigner 负责将每条输入的事件数据分发到正确的窗口中。

  • trigger

    用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足需求,则可以自定义一个类,继承自 Trigger。笔者结合 Trigger 的抽象类定义,讲解具体的含义:

  • onElement

    每次往 window 增加一个元素的时候会触发

  • onProcessingTime

    当 processing-time timer 被触发的时候会调用

  • onEventTime

    当 event-time timer 被触发的时候会调用

  • onMerge

    对两个 trigger 的 state 进行 merge 的时候会调用

  • clear

    window 销毁的时候被调用

上面前三个会返回一个 TriggerResult enum,TriggerResult 有如下几种可能的选择:

  • CONTINUE

    Window 上不做任何事情

  • FIRE_ AND_ PURGE

    触发窗口,发送 window 结果,然后销毁窗口

  • FIRE

    触发 window,发送结果

  • PURGE

    清空整个 window 的元素并销毁窗口

  • evictor

    主要用作一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后。笔者带大家看一下 Evictor 的接口定义:

查看 Evictor 接口实现类时,发现 Flink 已经提供了三种的 Evictor:

  • 1. CountEvictor

    保持一定数量的元素。 其实就是在窗口中保留指定数量的元素,并从窗口头部开始丢弃其余元素。

  • 2. DeltaEvictor

    根据 DeltaFunction 和阈值保留元素。 Eviction 从 buffer 的第一个元素开始,并从 buffer 中删除所有比阈值具有更高增量的元素。如果不好理解的话,笔者换个说法,即计算窗口中最后一个元素与其余每个元素之间的增量,丢弃那些增量大于或等于阈值的元素。

  • 3. TimeEvictor

    保留窗口中最近一段时间内的元素,并丢弃其余元素。

CountEvictor 和 DeltaEvictor 其实还是很好理解的。这里笔者重点讲解一下 TimeEvictor。

为了更好地理解,除了实战外(但是笔者会在下篇文章中进行窗口方面的实战),剩下的就是直接上源码,重点看 evict 核心方法,evictBefore 和 evictAfter 都会调用 evict 方法:

evictor 是可选的方法,如果用户不选择,则默认没有。

Window 实现

最后,笔者会将窗口的整个实现过程梳理一遍,先看一张流传甚广的图:

jyIJfmn.jpg!web

上图描述了 Flink 的窗口机制以及各组件之间是如何相互工作的。估计图一眼看去,第一感觉是有点乱,不过不要着急,笔者给大家梳理清楚。

  • 1. 首先看图的最上面,Input Stream 源源不断地发送数据进入 window operator。

  • 2. 每一个到达的元素都会被交给 WindowAssigner。 WindowAssigner 会决定元素被放到哪个或哪些 Window,可能也会创建新窗口,如图中 Window3 为数字 6 创建一个新 Window。因为一个元素可以被放入多个窗口中,所以同时存在多个窗口是可能的。注意,Window 本身可以看作是一个 ID 标识符,其内部可能存储了一些元数据,如 TimeWindow 中有开始和结束时间,但是并不会存储窗口中的元素。窗口中的元素实际存储在 Key/Value State 中,Key 为 Window,Value 为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖了 Flink 的 State 机制。

  • 3. 图中的每一个窗口都拥有一个属于自己的 Trigger,Trigger 上会有定时器,用来决定一个窗口何时能够被计算或清除。 每当有元素加入到该 Window,或者之前注册的定时器超时了,那么 Trigger 都会被调用。 Trigger 的返回结果可以是 CONTINUE(不做任何操作)、FIRE(触发 Window,处理数据)、 PURGE(清空整个 window 的元素并销毁窗口)或者 FIRE AND PURGE(触发窗口,然后销毁窗口)。

  • 4. 当 Trigger 执行 FIRE 后,窗口中的元素集合就会交给 Evictor,如果没有设置 Evictor 则跳过。 Evictor 主要用来根据遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。

  • 5. Evictor 处理后,剩余的元素会交给用户指定的 UserFunction 进行窗口的计算。 如果没有 Evictor 的话,窗口中的所有元素会一起交给 UserFunction 进行计算。

总结

关于 Window 相关的大部分内容都有提及到或进行详细的讲解,包括 Window 引入、Window 中的三个核心组件(WindowAssigner、Trigger 和 Evictor),以及 Window 使用方式和实现。后续笔者会结合具体案例进行实战,并深入理解 Window 相关知识点。

参考

  • https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

  • Stream Processing with Apache+Flink

  • https://ververica.cn/developers/time-window


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK