59

真实案例 | Flink实时计算处理脏数据问题

 4 years ago
source link: https://www.tuicool.com/articles/3eiUjif
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

场景描述:Flink在处理实时数据时,假如其中一条数据是脏数据,例如格式错误,字段缺少等会报错,这时候该怎么处理呢?

关键词:Flink 脏数据

声明: 本文不含工作纪律要求的保密信息,严格遵循公司关于数据资产的保密规定,图文都做过脱敏处理。

这是我最近在调试一个Flink任务中出现的问题。

问题描述

我们线上的一个任务今天报错,业务场景是:

Flink消费消息队列中的消息然后做简单的维表联合查询。今天报警发现类似如下错误:

ZfMNrmb.jpg!web

任务fail-over重试几次,然后失败。

报错很明显,出现了NumberFormatException,null不能转为Long。

解决办法

解决办法更简单。

这个问题在Spark和Flink中都会存在,最直接的办法就是过滤掉。

阿里云上的Blink同样给出了文档,如下:

使用:

或者:

在数据源头直接过滤掉,不要参与计算。

自定义一个UDF

按照上面的处理办法,在SQL中处理当然没有问题,但是我们在实际环境中会遇到非常多的这种情况,我个人建议自定义一个UDF,这个UDF的作用就是专门处理null或者空串或者其他各种异常情况的。

官方给出的一个经典的UDF案例如下:

public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

照着实现一个自己的处理逻辑即可!

欢迎点赞+收藏

AbuMfeA.jpg!web

VFRR7j6.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK