7

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享

 2 years ago
source link: https://blog.51cto.com/u_15137832/5614954
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享

精选 原创

数栈DTinsight 2022-08-24 10:49:46 ©著作权

文章标签 序列化 数据 数据传输 文章分类 Hadoop 大数据 阅读数148

课件获取:关注公众号 “ChunJun”,后台私信 “课件” 获得直播课件

视频回放:​ ​点击这里​

ChunJun 开源项目地址:​ ​github​​ 丨 ​ ​gitee​​ 喜欢我们的项目给我们点个__ STAR!STAR!!STAR!!!(重要的事情说三遍)__

技术交流钉钉 qun:30537511

本期我们带大家回顾一下六六同学的直播分享《ChunJun 数据传输模块介绍》。

一、ChunJun 数据类型转换

1、类型转换解决的问题

大家一听到「ChunJun 数据类型转换」这个概念,可能会联想到上下游之间进行数据交互时会涉及到的隐式转换。如果上游和下游数据类型一致,则不需要对数据进行任何干预,直接进行下发即可。

但是大多数情况下会涉及到两个问题,一是上游的数据源类型和下游的数据源类型不一致。比如 MySql 的 varchar 类型要写到 HdfsOrc 文件里的 string 类型的话,在上游的表示是 varchar,在下游的表示是 string,但实际上中间段 java 的类型都是 string。

另外一种情况则是,上下游之间不止数据源类型不一样,数据类型也不一样,除了要做类型的映射之外,还需要对数据本身进行改动。比如,MySql 的 date 类型要写到下游 timestamp 类型,我们需要进行的操作是把 date 中的毫秒级的时间戳拿出来,转换成 timestamp 的类型,再往下游去写。

这样就引出了一个问题,如何建立所有数据源类型之间的映射 / 转换关系?下面将为大家解答这个问题。

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据

2、类型映射概览

・client 端:在 Factory 类中通过 RawConverter 类建立映射关系

・source 端:将数据封装成 AbstractBaseColumn

・sink 端:通过 AbstractBaseColumn 中的转换方法将数据转换成对应类型

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_02

ChunJun 目前支持的数据类型映射关系图如下:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_03

3、类型映射详解

以 Timestamp 为例,如果要写入到 Long 类型的话,根据上文展示的 ChunJun 数据类型映射关系图,最终映射到 TimestampColumn 中,具体流程如下图:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_04

上面这个例子描述的是一个单独的字段,正常情况下,会处理多个字段,这时的类型映射详解情况如下图:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据传输_05

 as 方法就是数据类型转换的方法。使用这个机制之后,在下游可以只关心需要的数据类型,增加开发效率。

二、ChunJun 数据传输过程

了解完 ChunJun 数据类型转换后,我们来为大家分享 ChunJun 的数据传输过程。

1、上下游数据传输方式

在 ChunJun 中进行同步作业,有两种情况,一是算子链打开的情况,上游的 Source 和下游的 Sink 会被合并成一个 task,有同一个线程去做调度;二是把算子链进行关闭,Source 和 Sink 各自形成一个 task,也有各自的线程去进行调度。

在算子链打开的情况下,上下游数据传输方式可分为两种,对象重用和拷贝。

● 对象重用

・上下游数据传输使用方法调用的形式,将上游产生的数据的对象引用直接交给下游

・上下游算子需要形成算子链,作业开启对象重用

· env.getConfig().enableObjectReuse();

・上游传输给下游的数据,需要经过一次深拷贝

・上下游算子需要形成算子链

算子链的好处是可以减少序列化的操作,那么为什么我们还要引入序列化呢?因为 ChunJun 的特殊性。ChunJun 同步作业的话,只有上下游两个算子,且都对接了正式的数据源,读写的时候会导致线程堵塞。因此上限由网络 io 决定,如果断开算子链,cpu 会在一端线程阻塞的时候切换到另外一端。在序列化的性能较高时,线程上下文切换带来的性能下降完全可以被弥补。

经过测试,序列化的性能比对象重用和拷贝高 30% 左右。

・上下游数据传输依赖于网络传输。上游数据进行序列化成 byte 数组后进行网络传输,下游收到数据后需要进行反序列化

・上下游之间不形成算子链

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_序列化_06

知道要做序列化后,会产生一些思考,带着这些疑问,接着往下看。

・序列化和反序列化在什么时候发生?

・Flink 支持哪些序列化?

・序列化是怎么做的?

・怎么找到适合的序列化方式?

・如何实现自定义的序列化?

2、序列化传输过程

下图是 ChunJun 在进行序列化操作时的数据传输链路图:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_07

3、DataOutView

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据传输_08

4、TypeInformation 介绍

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_09

5、kryo 序列化 & BaseSerializer

同样是序列化一个 int 对象,对 kryo 来说,首先需要知道它的类型,然后从高位到低位依次去写入。

DataOutputView 则是直接调用一个 writeInt 的方法,写一句关键代码即可:

UNSAFE.putInt(

this.buffer,

BASE_OFFSET + this.position, v);

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_序列化_10

三、ChunJun 序列化实现

1、ColumnRowData 序列化过程

ColumnRowData 序列化过程采取标志位 + 实际数据的方式,具体流程如下图:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据传输_11

相对于 kryo 的序列化来说:

・实现了更密集的存储

・兼容 null 值

・减少了不必要的数据传输

2、BinaryRowData 结构

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据传输_12

 因为数据区一格只占 8 个字节,且每个 index 只能占到一位,所以肯定存在一些没法存储在 8 字节范围之内的数据,可变长度部分就是用来存放数据区无法存放的数据。

3、BinaryRowData-setNull 操作

看到上文的 null 值判断区,有些同学可能会好奇这是什么,又是怎么进行操作的。下图将对一个下标为 11 的数据去做 setnull 操作,进行简单介绍:

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_序列化_13

4、BinaryRowData 数据存储方式

开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数据_14
  • 打赏
  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK