1

《关于我因为flink成为spark源码贡献者这件小事》 - 是奉壹呀

 1 year ago
source link: https://www.cnblogs.com/eryuan/p/17119805.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

各位读者老爷请放下手上的板砖,我可真没有标题党,且容老弟慢慢道来。

600147-20230213172537950-2114284960.jpg

spark和flink本身相信我不用做过多的介绍,后端同学不管搞没搞过大数据,应该都多多少少听过。
如果没听过,简单说,spark和flink之于大数据,就好比vue和react之于前端,就好比spring家族之于java。

从2015年开始接触大数据,2016年开始使用spark,到2022年初能够为spark社区贡献一点微博的贡献,成为spark项目的contributor,对我来说是一段奇特的经历。

这段经历来源于一次spark窗口计算,由于我觉得并不能完全满足要求,想通过源码改造一下。
这一下子进了源码就误入歧途了。

什么要求之类的按不表,进入正题。

什么是窗口计算(可跳过)

在大数据领域,基于窗口的计算是非常常见的场景,特别在于流式计算(flink只不叫实时和离线,只区分有界无界)。

如果这说起来还是有点抽象,那举个例子,相信你很快就能明白。

比如微博热搜,我需要每分钟计算过去半小时的热搜词取top50;
比如新能源车,行驶过程中每秒或每两秒钟上报各信号项,如果30秒钟内没有收到该车的信号项,我们认为该车出现故障,便进行预警(假设场景);
等等。

前者微博热搜是一个典型的时间窗口,后者新能源车是一个典型的会话窗口。

时间窗口(timewindow)

时间窗口又分为滑动窗口(sliding window)和滚动窗口(tumbling window)。反正意思就是这么个意思,在不同的大数据引擎里叫法略有不同,在同一个引擎里不同的API里叫法也略有区别(比如,flink 滑动窗口在DataSet&DataStream api和Table(sql) api里分别叫作sliding window 和HOP window)。

总之时间窗口有一个长度距离(m)和滑动距离(n),当m=n时,这就是一个滚动窗口,相邻窗口两两并不相交重叠。

600147-20230215113944827-115407813.png

当m>n时,称为滑动窗口,这时相邻的两个窗口就有了重叠部份。

600147-20230215114026516-1839242546.png

在多数场景下,m为n的正整数倍。即m%n=0;除非产品经理认为我们应该每61秒统计过去7.3分钟的微博热搜(???)。
这个例子可能极端了些,但m%n != 0的实际应用场景肯定是有的。

会话窗口(sessionwindow)

session窗口相对抽象一点。大家可以把session对应到web应用上,理解为一个连接session。
当大数据引擎接收到一条数据相当于一个连接session,当在设定的时间范围内连续没有接收到数据,相当于session会话已断开,这里触发窗口结束。
因此会话窗口长度是不固定的,没有固定的开始和结束。而且相邻的窗口也不会相交重叠。

600147-20230215114818800-1526395610.png

到这里,大家对大数据的窗口计算应该有了一个简单的感性认识,我们今天讨论的重点是时间窗口,而且只是时间窗口下的一个小小的切点。

今天的主题

大数据引擎是怎样划分窗口的,当接收到一条数据的时候,数据的时间戳会落到哪些窗口?

先来简单看一点源码。不多,就一点点。

spark获取窗口的主要代码逻辑:

600147-20230214153424923-1995345579.png

一时看不懂没关系,我第一次看到spark这段代码的时候也有点懵。借助spark的注释来梳理一下。

为了不水字数把spark源代码注释折叠

* The windows are calculated as below:
   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
   * for (i <- 0 until maxNumOverlapping)
   *   windowId <- ceil((timestamp - startTime) / slideDuration)
   *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
   *   windowEnd <- windowStart + windowDuration
   *   return windowStart, windowEnd

然后,我们再假设一个简单的场景,将原伪代码进行微调,并配合注释讲解一下。

假设窗口长度(windowDuration )为10滑动距离(slideDuration)为5,即每5分钟计算过去10分钟的数据。简单化流程,窗口偏移时间为0。
现在spark集群收到一条数据,它的事件时间戳为13,然后需要计算13会落到哪些窗口里面。

// `获取窗口个数,窗口长度(m)/滑动长度(n),当两者相等时,就1个窗口;
// 当m%n=0时,窗口长度为除数;当m%n!=0时,窗口长度为除数向下的最小整数
// 这里为2个窗口
maxNumOverlapping <- ceil(windowDuration / slideDuration)
// 循环获取当前时间戳在每个窗口的边界,即开始时间和结束时间
for (i <- 0 until maxNumOverlapping)
  // 13/5 -> 2.6 通过ceil向下取整得到2,再+1 = 3
  windowId <- ceil(timestamp / slideDuration)
   // 第1次循环时,计算第1个窗口开始时间 :3 * 5 +(0 - 2)* 5 = 5
   // 第2次循环时,计算第2个窗口开始时间: 3 * 5 + (1 - 2) * 5 = 10
  windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration 
   // 第1次循环时,计算第1个窗口结束时间:5+10 = 15
   // 第2次循环时,计算第2个窗口结束时间:10+10 = 20
  windowEnd <- windowStart + windowDuration
  return windowStart, windowEnd

通过上面的代码,我们知道,时间戳13最终会落到[5-15],[10-20]两个窗口区间。

我们再来看看flink的实现逻辑。

600147-20230215144357137-1476412242.png
600147-20230214153709638-1273052681.png

可以看到其实原理类似,先求得窗口个数,略有区别的是,spark是先求得窗口编号windowId,再根据窗口编号求得每一个窗口的开始结束时间。
而spark是直接得到一个窗口开始时间lastWindowStart ,然后根据窗口开始时间+滑动距离=窗口结束时间。
再然后,窗口开始时间-窗口长度=另一个窗口的开始时间,再求得窗口的结束时间。

而不管是哪种方法,都有一个线头。
spark是windowId

windowId <- ceil((timestamp - startTime) / slideDuration)

flink是lastWindowStart .

timestamp - (timestamp - offset + windowSize) % windowSize;

大家发现上面两边代码对比有问题了吗?

spark的两个问题

===========================================5秒钟思考线

600147-20230215120253475-1578076010.png

点击查看问题答案

问题1:重复计算。`windowId`只需要计算一次就够了。
      乃至于`windowStart`也只需要计算一次,根据它,可以计算出当次windowEnd,同样也可以计算出其它的窗口边界。
问题2:ceil和mod(%模运算)的差异。

这两个问题都不是BUG,是性能问题。
第1个问题,直接观察代码就可以得出结论。
第2个问题,需要通过代码测试一下。
因为scala本身也是JVM生态语言,底层都一样。所以我直接使用java写了一个基准测试,内容为ceil和求模的性能差异。

@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 3, time = 4)
@Threads(1)
@Fork(1)
@State(value = Scope.Benchmark)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MathTest {
    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(MathTest.class.getSimpleName())
                .mode(Mode.All)
                .result("MathTest.json")
                .resultFormat(ResultFormatType.JSON).build();

        new Runner(opt).run();
    }
    @Benchmark
    public void ceil() {
        Math.ceil((double)1000/20);
        Math.ceil((double)1000/34);
    }

    @Benchmark
    public void mod() {
        int a = 1000 % 20;
        int b = 1000 % 34;
    }
}

大家感兴趣的话,也可以将代码复制到本地,引入JMH就可以运行。
得到的结果:

600147-20230215150451229-758867328.png

上图表示的是执行耗时,柱图越高性能越低。更多可以参考我的另一篇文章hashmap的一些性能测试

那么可以从图中明显看到使用ceil的spark比使用mod的flink在这获取窗口这块功能的性能上肯定要差一些。

不管是第1还是第2个问题,都会随着窗口长度放大。如果我需要每1分钟计算过去60分钟的数据

那么每1条数据进来,它都会进行这样的60次无效计算。

如果一个窗口批次有一万条数据,它就会进行60万次无效计算。

大数据场景下,它的性能损耗是多少呢?

是吧?


既然有性能损耗,它必然可以优化。对于我这种开源爱(投)好(机)者来说,这都是一个大好的机会啊。
原创是不可能原创的,这辈子都不可能,只能靠抄抄flink代码才能成为spark contributor什么的。

好了,我们现在已经学会了两个主流大数据引擎的窗口计算基本原理了,现在我们来写一个大数据引擎吧。

600147-20230215214207195-36765253.png

不是,来重(抄)构(袭)spark获取时间窗口的代码吧。

第一个PR

怀着忐忑的心情,给spark社区提了一个PR。想想还有点小激动。

https://github.com/apache/spark/pull/35362

第一次给这种级别的开源社区提交PR。两眼一摸黑,比如PR要怎样写才规范,要不要写test case。要不要@社区大佬等等。

等了两天后,终于有大佬理我了。

600147-20230216144318565-2022793485.png
600147-20230216144527401-895677386.png

我这点渣渣英语,借助翻译软件才完成了PR描述。自然不懂cc是啥,FYI是啥,nit.是啥?看到不认识的自然复制粘贴到翻译软件。

600147-20230216144621126-616018681.png

嗯?
现在翻译软件都这样先进了吗?它居然看穿了我是个废物!

第1次提交犯了很多小错误,这是没看源码贡献指南的后果。(其实是看不太懂)

https://spark.apache.org/contributing.html

这里面很详细,怎样拉下源码编译,PR标题格式怎样,PR描述规范,代码stylecheck插件等等,事无巨细,是立志于成为spark社区大佬的新手启航必备。

如果英语老师没有骗我, could you please表示的应该是委婉,客气。
社区大佬都很有礼貌,说话又好听,我超喜欢的。

If you don't mind, could you please

600147-20230216145728927-1002838493.png

在这位大佬发现我没做性能测试后,(真的,现在想想,性能改进的代码没有基准测试你敢信),温柔提醒我,在看穿我是个新(废)手(物)后,亲自写了benchmark基准测试代码。并得出新的计算逻辑比原有的性能提升30%到60%。

600147-20230216145852741-2066402067.png

然后经过一番沟通修改代码注释什么的,最终合并到了master.

600147-20230216153127433-225978476.png

以上就是我的第一次spark PR之旅了。

如果你要问我感受的话。短暂的兴奋过后就是空虚。
600147-20230216081724840-1633487247.png

不玩梗地说,spark社区氛围真的很好。在后面陆陆续续又给sparkT和flink提交了几个PR。没有对比就没有伤害,比起flink,spark真的对新手非常友好了。
这过程中,踩了很多坑。也收获很多。比如,这次30%到60%的性能提高,对于一个比较成熟的大数据产品来说,应该算是比较大的提升了吧?
但在后面的PR中,我做出远超此次的性能提升,而且不是借助flink的既有逻辑,完全独立完成。

后面有时间也可以把这些写出来,水水文章。

本来到这里就结束。但是,偶然在flink社区PR区看到一个熟悉timewindow,哟呵,这个我熟啊。

https://github.com/apache/flink/pull/18982

点进去一看,尴尬了。
大家看一下,这次PR提交的主要代码更改逻辑就知道了。

600147-20230216154722448-2130562141.png

没错,就是之前我给spark 提交的代码的借(抄)鉴(袭)来源。flink时间窗口分配窗口的核心代码。而且这不是优化,而是修复BUG。

哦,原来这特么的是彩蛋,这特么的是惊喜啊!

这就好比,照着隔壁班里第一名抄作业,老师给了个高分,然后被高手自爆,老师,我写得有问题。

600147-20230216155637882-1648387471.png

它是怎炸的呢?

原文已经说得非常清楚,我在这里长话短说。
简单画了个图:

600147-20230215103846036-731418199.png

假设时间戳13在一个长度15,滑动长度5的窗口逻辑里,我们要知道它会分配到哪2个窗口里,只需求得最后一个窗口开始的长度即可。

它最后一个窗口的开始长度为 13%5 = 3,为时间戳对滑动距离求模。即把上图中红色部份减去,或者向左偏移余数部份,就是它最后一个窗口的开始长度。

不管怎样,时间戳必须得落到开始时间后面,窗口必须包含时间戳。

好,很好,很有精神。没有问题!no problem!


但是!如果时间戳是负数呢?比如-1呢?

我们开始求它的最后一个窗口开始时间,时间戳对滑动距离求模,即-1%5 = -1

600147-20230216164721790-797597562.png

-1 - (-1) = 0

600147-20230216165151388-1696363156.png

这样就导致不管是-1还是13都应该向左偏移的,结果跑向右边了。

13:???
开始时间大于了时间戳本身,时间戳跑到窗口外面去了,这肯定是不正常的。

其实不仅仅是负的时间戳,是(timestamp - window.starttime)% window.slideduration <0的情况下都会有这种问题。
只说负的时间戳有问题,就显得我的上个PR很无脑。显得我无脑没关系,这其实也是小看了spark,flink这种大范围流行的开源框架。

通过spark的测试案例也能很清楚的看到,肯定是考虑到了时间戳落到1970-01-01之前的。
随手截一个测试案例

600147-20230216170536424-2116221059.png

只不过它的时间戳都在1970-01-01前后几秒钟范围,落在了滑动距离之内。所以这个问题没有及时暴露出来。

而且在我提交优化的PR之前,spark本身的代码是不会出现这种问题的。所以这个锅完全是我的,必须背了。

然后给社区提交了一个fix

https://github.com/apache/spark/pull/36737

由于种种原因,我倒是放了鸽子了。后来,被另一哥们重新提交合并。

https://github.com/apache/spark/pull/39843#issuecomment-1418436041


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK