33

Hadoop平台进阶之路 | eBay Spark测试框架——Woody

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzA3MDMyNDUzOQ%3D%3D&%3Bmid=2650508767&%3Bidx=1&%3Bsn=d48f285471b735c59888033973032959
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

640?wx_fmt=png

供稿 |  eBay CCOE ADI Hadoop Team

作者 | 王斐

编辑 | 顾欣怡

本文7253字,预计阅读时间23分钟

更多干货请关注“eBay技术荟”公众号

640?wx_fmt=png

导读

新版本的Spark拥有更好的性能和稳定性,对于用户来说,如果长期停留在低版本的Spark,不仅会浪费集群资源,还会进一步加大平台管理团队的工作量。如果进行Spark大版本升级,考虑到版本间可能由于计算行为不一致而导致的数据质量问题,用户就要投入大量的精力去对比重要的job在不同版本下的数据质量,加大了版本升级的困难度。

ADI Hadoop team负责管理eBay的Hadoop集群、 Spark的版本升级和bug修复等事务。 为了提升Spark版本升级的效率,本团队开发了Spark测试框架——Woody。 该测试框架会将线上spark-sql job语句转换为和线上job隔离的测试语句,然后调用不同的Spark版本执行测试语句,最终对比版本间数据质量。Woody不仅可以用于Spark版本升级,也可用于job调优以及job pipeline的端到端测试。本文将分享Spark测试框架Woody的架构,实现以及使用场景,希望能对读者有所帮助。

01

640?wx_fmt=png

背景

Hadoop team目前管理两个大Spark分支,Spark-2.1和Spark-2.3,目前的版本开发均基于Spark-2.3,而对于Spark-2.1分支已经不再进行维护,未来会升级到Spark-3.0。

Hadoop team从两年前就着手进行从Spark-2.1 到Spark-2.3的迁移工作,用了将近两年时间完成了迁移。

为什么会用这么长时间呢?

因为大版本之间可能会存在不兼容问题,计算行为可能发生改变,也就是说两个版本间的计算结果可能不一致。

数据质量是至关重要的,特别是对于金融数据,业务团队需要在升级之前进行两个版本间的计算结果对比。

而这需要用户去手动修改线上代码,然后利用两个Spark版本进行双跑,最后再去手动对比两个版本的计算结果。eBay内部的spark-sql任务数不胜数,大版本升级会消耗大量的资源和人力。

Spark-2.1到Spark-2.3 已经耗费了这么长时间,那么将来升级到Spark-3.0想必也是一个浩大的工程。

为了解决这个问题,Hadoop team开发了一个Spark测试框架,命名为Woody。Woody的名字取自一个卡通啄木鸟,希望可以帮助找出不同Spark版本之间或者Spark job中的bug(虫子)。

Woody可以将线上的SQL语句进行转换,然后分别启动两个Spark版本运行转换后的SQL,从而对比两个版本的计算结果,判断两个版本计算结果是否一致,也可以用于比较两个版本的性能。

640?wx_fmt=png

图1(点击可查看大图)

02

640?wx_fmt=png

Woody的架构

Woody的架构如图2所示,提供restful api,使用mysql存储数据,支持多个集群。用户可以一次提交一批用于测试的job,Woody用一个workflow封装这批job,由workflow调度器进行调度,每个workflow调度时生成一个对应的jobSetManager,进入job调度器,job调度器会限制同时运行job的数量。

640?wx_fmt=png

图2(点击可查看大图)

一个job的生命周期为:

  1. 将job语句转换为测试语句

  2. 测试运行前准备工作

  3. 调用Spark版本1运行测试语句

  4. 计算Spark版本1结果的校验信息

  5. 调用Spark版本2运行测试语句

  6. 计算Spark版本2结果的校验信息

  7. 给出数据质量报告

关于job语句的转换,Woody为各个集群启动多个长运行的conversion executor,这些conversion executor会向Woody Server进行注册并定期汇报自己的心跳,由Conversion Service Manager来管理。

在job需要运行测试语句阶段,Spark App Submit Service会向相应的集群提交Spark任务, Woody会记录其ApplicationId存入mysql。

Woody使用mysql共享状态数据,支持HA, 是cloud-native的服务。在一台Woody服务关闭时会将其正在运行的workflow标记为游离状态,这些游离状态的workflow会被其他正在运行的Woody服务接管,或者由当前Woody服务重启后重新接管。

03

640?wx_fmt=png

Spark-sql Job

首先,介绍一下本文中对source表,working表和target表的定义:

  • source表是作为输入的表,是被select from的表;

  • working表是在job运行中被创建的表,包括(temporary)view;

  • target表是被写入数据的表,比如被load数据,或者被insert数据等等。

前面提到了用户在测试版本间数据质量的时候,需要手动对两个Spark版本间的计算进行对比,这一操作有以下三个要点:

  1. 更改SQL语句,至少需要更改insert数据的表名,避免影响线上job;

  2. 保持source表的数据一致;

  3. 手动检查两个Spark版本的计算结果。

Woody需要自动化完成以上三方面的工作。

首先,对于如何去自动地更改线上job语句,请参考以下这组简单的Spark-sql语句:

create or replace temporary view working_table as select c1, c2, …, cn from src_table;

insert overwrite target_table select working_table.c1 from working_table;

在这组语句里面,由src_table 作为job的输入,而working_table是在job运行时候生成的,target_table则作为job的输出。

如果要更改线上job语句,不被改变的src_table是不用更改的,作为输出的target_table是必须修改的,临时(temporary)的working表(temporary view)是不用更改的,非临时的working表则必须更改。

那么如何才能找出这些src, working和target表呢?

04

640?wx_fmt=png

Logical Plan

Catalyst是Spark-sql可扩展的SQL优化器。它会将一条SQL语句或者dataframe操作转换为logical plan,然后进行优化,最后转换成可执行的物理计划运行,因篇幅限制,此处不做过多展开。

640?wx_fmt=png

图3(点击可查看大图)

如上图所示,Woody会使用其中的unresolved logical plan 来进行分析,来找出其中的source表,working表,target表和一些location等信息。

比如,下面的这句SQL会转换为一个logical plan,可以从叶子节点拿到该语句的source表信息;而对于create table语句,它对应一个CreateTable类型的logical plan,可以从该plan拿到working表信息;相应的insert语句对应于一个InsertIntoTable类型的logical plan,可以从中获得target表等等。

640?wx_fmt=png

图4(点击可查看大图)

前文提到可以从一个LogicalPlan的叶子节点拿到其中的source表信息。每个logical plan对应一个抽象语法树,而这颗抽象语法树上面的每个节点也是一个logical plan,所以一条语句的logical plan对应的其实是一个森林,Woody要找到的就是这个森林里面的所有叶子节点。

比如说下面的语句:

SELECT c1, udf(c2) from (SELECT current_timestamp as c1, (SELECT count(1) from ta) as c2) tb;

其中subQuery的Project列表里面的第二列,包含source表信息,Woody需要将这个logical plan对应的森林各个分支都进行遍历,找到所有叶子节点,从中提取出表信息。

当然在一个sql job中是有很多条sql语句的,上一条语句的target表可能是下条语句的source表,前面创建的working表,可能是后面用到的source表,以及语句中会有AlterTableSetLocation等等语句,Woody会把解析过程中的context保存起来,用于串联前后表之间的依赖关系,从而进行语句转换。

不只是表信息,对于location的信息,Woody也要进行提取,用于后面转为测试环境中的location等等。

05

640?wx_fmt=png

ParserRuleContext

即使找到了语句中的表信息和location信息,那又要如何利用这些信息对原有的sql语句进行转换呢?单纯的利用logical plan是无法做到的,即使编辑了这个logical plan也无法将其映射为sql文本。

Spark使用antlr4 进行语法分析,每个sql文本初次转换之后会获得一个ParserRuleContext,然后catalyst会基于它生成相应的logical plan。

而ParserRuleContext包含一个inputStream对应原始的SQL文本,而且ParserRuleContext也是一颗树,其各个节点也都有自己的类型,每个节点有字段代表其在该inputStream上面的偏移量。

比如说表名对应TableIdentifierContext,location 对应LocationSpecContext。如果要替换一些表名或者location,只需要找到这些表名所在的ParserRuleContext节点,然后将其对应的文本进行替换,再和前后的文本进行拼接,即可对语句进行转换。

举个例子:

SELECT * FROM ta a JOIN tb b ON a.id=b.id;

对于这个语句,如果我们要转换它的source表ta和tb,就要先拿到对应的ParserRuleContext,找到表名节点,得到其在ParserRuleContext inputStream之上的偏移量;假设ta对应的TableIdentifierContext偏移量是50-52,而tb对应的偏移量是70-72。

那么替换的结果就是:

originText(0, 49) + replace(ta) + originText(52, 69) + replace(tb) + originText(72, length).

接下来讲Woody的转换规则。

06

640?wx_fmt=png

语句转换

640?wx_fmt=png

1. 隔离

首先,转换后的语句必定要与线上环境隔离开来,不能影响线上job和数据。Woody的策略是创建一个数据库专门用于对这个job的测试,然后将该job中要输出的数据全部保存在该数据库下面。

假设这个数据库命名为 WOODY_DB_${UNIQUE_ID}。

对于job中只读的source表,Woody不会去转换这些表的名字。对于job中的输出表,其原有的名字是dbName.tblName, Woody会将其表名转换为WOODY_DB_${UNIQUE_ID}.dbName__tblName, 也就是说把数据输出到前面提及 到的数据库中,原有的数据库名和表名用两条下划线拼接作为新的表名。

举个例子:

insert into gdw_tables.tgt_tbl select * from gdw_tables.src_tbl;

会被转换为

insert into WOODY_DB_{UNIQUE_ID}.gdw_tables__tgt_tbl select * from gdw_tables.src_tbl;

640?wx_fmt=png

2. Context

Woody保存转换过程中提取到的信息以及当前的上下文,包含有:

  1. source表以及这些表是否是global的表;

  2. 后续当做target表的source表;

  3. 语句中创建的working表;

  4. 被后续当做source表的working表;

  5. 被后续当做target表的working表;

  6. target表以及其被写入的partition信息;

  7. 在job中被写入数据的dir信息;

  8. 当前的数据库是什么,use database语句可更改当前数据库;

  9. 当前被alter(包括被写入数据,alter location等等)的表的信息等。

接下来介绍Woody如何提取信息,并将这些信息存入该context,串联整个job。

640?wx_fmt=png

3. 提取

Woody会按照job语句顺序,提取当前语句中的信息,存入context中,然后转换当前语句,如此循环。

对于一条语句,首先得到对应的unresolved logical plan。

此处解释下为什么一定是unresolved logical plan, 不能是resolved logical plan或者optimized logical plan。

因为这是发生在Woody提取信息过程中,并没有真正地运行sql,前置的sql都没有运行,而当前语句可能依赖前置sql的结果,比如依赖前面创建的temporary view,自然无法resolve logical plan 和后续的optimized logical plan(而且optimized logical plan经过catalyst优化,可能会丢失一些需要用于转换原语句的信息)。

拿到unresolved logical plan之后,Woody会去匹配该plan的类型,看其是DDL,DML还是DQL,首先保存不属于source表的信息(working表,target表, 需转换的location信息)。

  • CreateViewCommand/CreateViewUsing/CreateTable等创建表语句,将被创建的working表信息存入context,同时记录指定的location信息;

  • LoadDataCommand和insert语句,将被insert数据的表以及partition信息存入context;

  • AlterTable语句,将alter信息加入context...

之后,Woody会遍历该logical plan的每一个分支(logical plan的children方法不能覆盖所有分支,Woody参考了catalyst中的LogicalPlanVistor类),找到所有叶子节点,包括:

  • Project的project list的每一个元素,

  • Join条件的left表达式,right表达式,以及condition表达式,

  • Aggregate的grouping 表达式,aggregate表达式,

  • Sort的order表达式……

从这些叶子节点过滤得到 UnresolvedRelation类型的节点,这些节点对应的都是source表。

而接下来需要替换的target表,working表,source表或者location信息,均已从当前语句中提取出来,结合当前的context即可进行转换。

640?wx_fmt=png

4. 转换

接下来需要依据提取到的信息结合当前context进行转换。前面提到ParserRuleContext用于替换,只需要找到需要转换的节点。

那么,哪些节点是可能被替换的呢?

  1. 表名信息;

  2. 包含base和field信息的节点,例如语句‘select ta.id from ta’的‘ta.id’, 可能需要替换其中的表名ta部分;

  3. 表property节点,比如spark data source表property 里面的path就是表的路径,Woody可能需要转换这个路径;

  4. Location节点。

第三条和第四条,都是为了转换location信息,只需要做一个路径映射即可。

关于第一条:

  • 如果该表是source表且没有被alter过,则继续用原表;

  • 如果该表是source表,但是被alter过,则使用转换后的表名(参考第一小节:隔离)。

关于第二条包含base 和field信息的节点:

  • 这个base表可能是别名不需要转换;

  • 这个base表可能是With语句中的temp view,不用转换;

  • 参考第一条的转换规则。

除此之外,还有一些场景,无法在一次替换中解决, 比如,一个表可能既是source表又是target表。

insert into ta select ta.* from ta join tb on …

Woody会在完成第一次替换之后才将表ta标记为target表,然后再次替换被insert表的表名为测试表。

而对于以下语句,Woody可以直接跳过该转换部分。

  • use database, Woody会在context中改变当前database;

  • set/reset/clear cache语句,保留原语句。

此外,Woody在完成job中所有语句转换之后,会去校验context中的所有source表是否含有global view。

由于view只是将一个查询进行封装,底层仍然可能含有source表信息,Woody为了将所有表信息更好地串联起来,会去查询该view的创建语句,然后转换该view的创建语句,直到所有的source表不存在global view为止。

640?wx_fmt=png

5. 准备

经过前面的语句转换,Woody会直接读取线上表的数据,然后将计算结果输出到测试表中。

Woody会查询metastore, 获取这些测试表对应的线上表的建表语句,然后转换为测试表的建表语句,用于后续的测试前准备阶段。

07

640?wx_fmt=png

数据质量校验

进行完测试前准备之后,Woody按照要对比的两个Spark版本,顺序启动两个Spark 应用来运行转换后的语句。在每个版本的Spark运行结束之后去拿到job输出的信息(table count和checksum以及sample),然后对这些输出数据清空复原。在两个版本的Spark应用均运行结束之后,对比其结果,如果结果一致,则代表两个Spark版本的数据是一致的,是兼容的。

640?wx_fmt=png

1. 检验什么

首先,Woody校验数据有以下几种:

  1. insert 语句写入的表

  2. job中创建但是后续没有被使用到的working表

  3. select语句的结果

前两种场景很好理解,Woody会在语法分析的时候记录这些表,然后在Spark计算任务完成之后,去校验这些表。

640?wx_fmt=png

2. 关于select

经典的tpc-ds和tpc-h基准测试,都是使用select语句进行,Woody也支持对select语句进行结果校验。

Spark中在缓存(cache)某个RDD的时候,不会立即缓存这个RDD,而是先进行缓存标记,当RDD链触发到这个RDD时才会真正触发缓存操作,Woody对select语句的转换校验也是如此。

首先,Woody在进行语句转换时会识别出哪些语句是select语句,然后用一个计数器计数,先给这个select语句要存入的表分配一个表名,比如WOODY_SELECT_RESULT_{INDEX}。

Woody这样做是因为这个select语句会调用job中其他语句的结果,在转换分析时由于没有真正运行无法拿到该select语句 AnalyzedLogicalPlan, 而无法创建保存这个select结果的表。

在Spark执行这些select语句的时候,Woody会再次识别出这些select语句;然后拿到该select语句的AnalyzedLogicalPlan,从而拿到select结果的schema信息;再根据schema信息,创建用于保存select结果的表;之后再把select语句的结果保存到这个表中,用于后续的结果校验。

640?wx_fmt=png

3.如何校验

crc32是一种简单快速的校验算法,通常用于数据传输,它也被用于Hadoop中。Spark中提供了内置函数crc32。该函数的值是Long类型,最大值不会超过10^19。

Decimal是数据库中的一种数据类型,不属于浮点数类型,可以在定义时划定整数部分以及小数部分的位数。对于一个Decimal类型,scale表示其小数部分的位数,precision表示整数部分位数和小数部分位数之和。

一个Decimal类型可以表示为Decimal(precision, scale),在Spark中,precision和scale的上限都是38。因此可以将crc32的值转换为Decimal(19, 0)。

Woody将表中每行的各列数据concat起来,然后计算其crc32,将该crc32转换为Decimal(19, 0)。由于表的count也是Long类型,不会超过10^19,所以每行的crc32值之和不会超过10^19*10^19=10^38,也就是说不会超过Decimal(38, 0)可表示的范围。

在concat表中一行数据的每列数据时,使用‘\\u0000’来代表值为空的列,并且每列之间用‘\\u0001’隔开。

这样count和checksum就可以有效地表示一个表的检验信息,在获得这个校验信息之后,就可以把当前测试产生的数据清空。

值得注意的是,有些表中含有一些用于审计的列,比如使用current_timestamp来表示更新时间或者current_user来表示是由哪些用户来维护,这些列值会动态变化而且与数据质量无关,业务方提供了这些用于审计的列名,Woody在做checksum的时候会把这些列过滤掉。

08

640?wx_fmt=png

使用场景

640?wx_fmt=png

1. Spark版本升级测试

开发Woody的初衷就是为了让Spark版本升级更加流畅,以减少任务迁移过程中的工作量。Woody支持输入集群名称和成功过的Spark ApplicationId,从而自动拉取job的语句和配置,然后选择需要对比的版本即可进行数据质量对比。

后续会支持账号级别的升级测试,只需要选择用户的账号,Woody即可对该账号需要升级的所有job进行升级测试,测试通过即可将该账号迁移至新版本Spark。

640?wx_fmt=png

2. Spark-sql job调优

Spark-sql job调优通常分为:

  1. 参数调优;

  2. Sql语句调优。

Woody支持在选择Spark版本运行测试时,指定conf或者修改job语句来进行对比测试,用户可以利用Woody来进行job 调优。

640?wx_fmt=png

3. 端到端测试

前面章节提到的测试都是针对单个job进行的测试。然而线上的Spark-sql job通常是一个job pipeline,包含多个有前后依赖的job。在上线新的job pipleline时,需要进行充足的测试。不仅仅需要benchmark测试,也需要进行仿真测试,用线上的数据来进行测试是一个很好的选择。

但是这需要开发者手动修改job的语句,避免影响线上环境。而人工操作的引入又难免造成误操作,从而导致破坏线上数据,环境等。

针对以上问题,Woody的自动转换线上语句功能就有了用武之地。Hadoop team基于此继续开发,使Woody支持端到端的测试,帮助用户安全的使用线上数据进行测试,而不需要进行任何手动修改工作。

1)串联Job

640?wx_fmt=png

图5(点击可查看大图)

对于单个job来说,转换的context只针对当前job,而针对job pipeline, Woody要将这个转换context扩展为整个 job plan, 要让这个job plan 里面的job联系起来,后面的job要拿前面job的输出作为输入,而不是每次都读取线上的数据作为输入。

eBay内部使用的job pipeline调度框架,对于每个触发的job plan,会分配一个planId传给里面的每个Job, Woody会基于planId 将job pipeline里面的job串联起来。

对于端到端的测试,Woody基于用户的账号来创建一个数据库用于存放job的输出结果,数据库命名规则为pre_prod_{preprod_account}_db;其location为/preprod/{preprod_account}/analysis.db。

将job pipeline进行串联,也就是将前面job的输出作为后面job的输入。

一个job的输出表达有两种:

  • 数据写到了哪些表

  • 数据写到了哪些路径

关于数据写到了哪些表:

假设表gdw_tables.tba_merge的location 是‘/sys/edw/gdw_tables/tba_merge’,使用b_woody_sub进行测试。如下两条语句:

insert into gdw_tables.tba_merge partition(dt=’20201019‘) select…

alter table gdw_tables.tba set location ‘/sys/edw/gdw_tables/tba_merge/dt=20201019’;

将会被转换为:

insert into preprod_b_woody_sub_db.gdw_tables__tba_merge partition(dt=‘20201019’) select …

alter table preprod_b_woody_sub_db.gdw_tables__tba set location ‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’;

Woody在转换第一条insert 语句时候,会将要写入的表名转换为

preprod_b_woody_sub_db.gdw_tables__tba_merge, 

而这个转换后的表的location也会被转换为 

‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge’,

从而Woody得知该insert语句实际写入的location在测试环境里面是 

‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’,

Woody会将这个写入数据的路径记录下来。

在转换第二条AlterTableSetLocation语句时,由于这个被set的location是前面写入过数据的路径,因此,Woody也会知道测试表 preprod_b_woody_sub_db.gdw_tables__tba 被set到了一个被写入过数据的路径,也是一个被写入数据的表。

另外一种情况,如果第一条insert 语句和第二条AlterTableSetLocation语句分别属于两个job,但是有前后关系。

在第一条Insert 语句运行完之后, ‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’会被创建出来;

对于第二条AlterTableSetLocation语句,Woody判断得到:

‘/sys/edw/gdw_tables/tba_merge/dt=20201019’ 对应的测试location ‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’ 已经存在,也会将 preprod_b_woody_sub_db.gdw_tables__tba 标记为被写入数据的表。

最终Woody会将这个job中所有被写入数据的表跟这个jobPlan 的planId关联起来,存储到mysql数据库中,作为这个jobPlan的context。

在这个jobPlan中的其他job运行时,就可以优先读取这些上游产生的测试数据,而不是去读取线上数据,这样,一条pipeline就可以串联起来。

2)转换详情

为Woody开发了conversion history server以及runtime的conversions页面。图6是一个runtime的转换详情页面。

640?wx_fmt=png

图6(点击可查看大图)

09

640?wx_fmt=png

总结

eBay Hadoop team开发的Spark测试框架Woody能够利用LogicalPlan提取job语句中的有效信息,并利用ParserRuleContext进行转换,以线上数据作为输入,将结果输出到测试目录;支持使用不同的Spark版本,conf,甚至修改job sql语句来运行,在运行之后得到数据质量结果和性能比较。总的来说,测试框架Woody的功能主要有以下两个方面:一是用于Spark版本升级及Spark sql job调优,二是用于端到端测试,帮助用户在job pipeline上线之前安全的利用真实的线上数据进行端到端的测试,验证新feature和bug修复,而无需修改任何代码。未来,Hadoop team将进一步优化Woody,以期更好的使用体验。

640?wx_fmt=png

作者简介

王斐, eBay ADI Hadoop团队Spark平台开发工程师,大数据领域技术爱好者,对Spark有浓厚兴趣和一定的了解,目前主要专注于Spark和Woody的开发工作。

如对本文有任何问题或见解,欢迎移步评论区,作者大大在线回复,机不可失失不再来

您可能还感兴趣:

平台迁移那些事 | eBay百亿级流量迁移策略

数据之道 | Akka Actor及其在商业智能数据服务中的应用

eBay云计算“网”事|网络重传篇

分享 | eBay TESS,我心中的那朵“云”

前沿 | BERT在eBay推荐系统中的实践

eBay云计算“网”事|网络丢包篇

eBay云计算“网”事 | 网络超时篇

数据之道 | 属性图在增强分析平台中的实践

数据之道 | SLA/SLE监控与告警

数据之道 | 进阶版Spark执行计划图

重磅 | eBay提出强大的轻量级推荐算法——洛伦兹因子分解机

实战 | 利用Delta Lake使Spark SQL支持跨表CRUD操作

640?wx_fmt=png

:point_down:点击 阅读原文 ,一键投递 

eBay大量优质职位,等的就是你


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK