11

Flink流式处理百万数据量CSV文件

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzI1NjEzMjg3NQ%3D%3D&%3Bmid=2247492431&%3Bidx=1&%3Bsn=07eaf46ed07d3a3349df712aae5b4162
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言

最近公司让做一个'没有必要'的需求

需求针对的对象

这是同一个csv文件的不同展示形式

  • Excel展示形式

JvaYn2q.jpg!mobile
  • 文本展示形式

这个csv文件中可能有数百万条数据

需求

将所有的异常数据检测出来

  • 什么样的数据是异常数据

圈红的数据我手动添加了一个a

原本是数字类型

现在变成了一个字符串类型

那么程序中将字符串类型转换为数字类型的话

就会报错

那这个值就是异常数据

  • 为什么我说是 '没有必要的需求'

    • 百万级别的数据量的csv一般都是由数据库导出来的

    • 数据库的列字段都是定义好的 比如是decimal类型的数据类型 导出来的话 那么也肯定是数字而不会是字符串

    • 而出现数字成字符串 是由于人工手动录入的情况下 才会出现 而这种情况又比较少

    • 该csv数据集用于跑python算法 比如通过pandas读取csv 统计某一个列数据的和 若全是数字则可以统计 若某一行数据是字符串 则会出现异常 那么可以通过pandas的方式做异常值数据处理 比如剔除这一行即可

  • 综上 花费人力物力去处理这一个没有必要的需求真的有些'没必要'

但领导发话了呀 这是客户的需求

所以do it

大致实现思路

读取该csv文件

解析csv每一行数据

检验每一行数据是否是异常数据

实现方式

  • 普通方式

通过java读取csv 然后一行一行处理

这种方式 若单机内存太小 很容易造成内存溢出

而且方式很low 没有多大挑战性

对个人技术能力没有提升

所以这种方式pass

  • Flink 流式处理

刚好头段时间 自己学习到了Flink

之前一直是纸上谈兵

现在终于有了用武之地

选好了技术方案 let's do it!

业务逻辑图

EBZZraV.png!mobile

接下来简要说说此流程上的核心技术的实现原理

rabbitmq

DEMO源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq

发送消息

ZRnmEnn.png!mobile3Yvaumb.png!mobile

重点配置说明

  • durable

    是否持久化,是否将队列持久化到mnesia数据库中,有专门的表保存队列声明

  • exclusive

    ①当前定义的队列是connection的channel是共享的,其他的connection是访问不到的

    ②当connection关闭的时候,队列将被删除

  • autoDelete

    当最后一个consumer(消费者)断开之后,队列将自动删除

监听消息

方式一

zumeEn.jpg!mobile3a67zyE.png!mobile

重要参数说明

  • autoack

jqAJvaR.jpg!mobile
autoAck(同no-ack)为true的时候
消息发送到操作系统的套接字缓冲区时即任务消息已经被消费
但如果此时套接字缓冲区崩溃
消息在未被消费者应用程序消费的情况下就被队列删除

所以,如果想要保证消息可靠的达到消费者端
建议将autoAck字段设置为false
这样当上面套接字缓冲区崩溃的情况同样出现
仍然能保证消息被重新消费

方式二 注解方式

ZZRFNjY.jpg!mobile
  • 对类添加@RabbitListener(queues = "${java.flink.queue}")注解

    • 指定队列名称 可从配置文件中读取

  • 对方法添加 @RabbitHandler 注解

三个参数

  • Object message

任意类型的消息

# 解析mq消息
String messageString=JsonUtils.toJson(message);
Message message1=JsonUtils.fromJsonObject(messageString,Message.class);
String message2 = new String(message1.getBody(), "UTF-8");
  • Message msg

手动确认

//如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = msg.getMessageProperties().getDeliveryTag();

//通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);

  • Channel channel

// 处理失败,重新压入MQ
channel.basicRecover();

线程池

源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread
ZNVjyy6.jpg!mobile

spring线程相关注解

  • @EnableAsync

    使用多线程

  • @Async

加在线程任务的方法上(需要异步执行的任务)
定义一个线程任务
通过spring提供的ThreadPoolTaskExecutor就可以使用线程池

重要参数

  • corePoolSize

    核心线程数

  • maxPoolSize

    最大线程数

  • queueCapacity

    队列容量

  • keepAliveSeconds

    活跃时间

  • waitForTasksToCompleteOnShutdown

    设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean

  • rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())

    • setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务

    • CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

使用线程池中的线程去执行异步任务

BbyUZvF.jpg!mobile

分布式内存文件系统Alluxio

环境搭建

  • 自定义dokcer网络

docker network create alluxio_nw
  • 安装alluxio master

docker run -d  --rm \
-p 19999:19999 \
-p 19998:19998 \
--net=alluxio_nw \
--name=alluxio-master \
-e ALLUXIO_JAVA_OPTS=" \
-Dalluxio.master.hostname=alluxio-master"
\
alluxio/alluxio master
  • 安装alluxio worker

docker run -d --rm \
-p 29999:29999 \
-p 30000:30000 \
--net=alluxio_nw \
--name=alluxio-worker \
--shm-size=3971.64MB \
-e ALLUXIO_JAVA_OPTS=" \
-Dalluxio.worker.memory.size=3971.64MB \
-Dalluxio.master.hostname=alluxio-master \
-Dalluxio.worker.hostname=alluxio-worker"
\
alluxio/alluxio worker

域名转发配置

sudo vim /etc/hosts

127.0.0.1 alluxio-worker

上传alluxio文件

7RFBNj.jpg!mobile

下载alluxio文件

Vnuuyy7.jpg!mobile

将文件流写入本地

ZfArE37.jpg!mobile

源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio

Flink流式处理数据

N3yyM37.png!mobile

结合当前业务梳理流程

来源数据源:数百万数据量的CSV文件
结果保存数据:CSV或Mysql
nuqI3qV.png!mobile

读取目标数据

nIZ3Ufe.jpg!mobile
  • 略过表头

  • 在已知几列的情况下 执行上图代码

    比如有6列
    那么读取csv的时候
    flink均认为是String类型去读取(人为指定的类型)

筛选异常数据

VBFVne3.jpg!mobile

异常数据的判断标准

比如输入数据源CSV中一行数据为

若认定圈红的那一列是数字类型

那么此时因为是字符串 无法转换为数字类型

那么这一行是异常数据

将异常数据保存

根据业务灵活处理

  • 第一个全红的 2: 表示第二行

  • 第二个圈红的部分 表示 当前列数据应为Double类型但实际上是非数字类型 所以该行是异常数据

在方法内部对于全局变量的使用仅限于在方法内部使用 不会对方法之后的作用域有效

比如

iErmYv7.jpg!mobile

过滤函数

filter 是过滤函数 
对每一行数据进行过滤
返回false则会被过滤掉

全局变量

List<Integer> rowList=new ArrayList<>();
在filter函数作用域之外
可以在filter函数中使用
仅限于filter函数之间才有效
在filter函数之后 则无法使用filter对该变量的修改
  • 保存到CSV

yE7NNn.jpg!mobile
  • 缺陷

需要指定Tuple类

比如生成的csv文件一行有25列 那么就使用Tuple25

还需要定义25个泛型 比如Tuple25<String,String,....>

最多可支持25列

如果是超过25列那么就不支持

所以使用起来非常不方便 而且使用范围有限

我当时在这块费了时间,因为csv列数超过了25列 比如26列,我想着在增加一个Tuple26或TupleN 尝试了之后 不可以 后来找到了国内Flink钉钉群 请教了下里面的大佬 说是建议保存到Mysql中

  • 保存到Mysql

ZfeQvii.jpg!mobile
配置mysql信息和要执行的sql语句

局限性

假如我有1000个列 那么需要建立一个表有1000个列吗

如果有5000个列呢 所以这种方式 也不太好

此时已经到了项目的最后期限了 很多同事在等着我的结果 我的压力也倍增 差点准备放弃flink 用low的方式实现 最后灵机一动看到了保存到txt文本文件的方法

  • 保存到Text

这种方式简单有效

DEMO源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink

Flink国内钉钉群号

群号 : 23138101

后记

上面这点东西 忙活了我3-4天时间 

自我感觉 真是太笨了

国内相关的资料目前还比较少

写下这点心得和经验给需要的朋友们

避免走弯路

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK