3

Fllink实时计算运用(四)Flink Table API & SQL 深入详解

 3 years ago
source link: https://segmentfault.com/a/1190000040133047
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. Flink Table API的整体实现流程

主要操作流程如下:

// 创建表的执行环境
val tableEnv = ...     

// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable")

// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable")

// 通过 Table API 查询算子,得到一张结果表
val result = tableEnv.from("inputTable").select(...)

// 通过 SQL查询语句,得到一张结果表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")

// 将结果表写入输出表中
result.insertInto("outputTable")

2. 流处理执行环境的创建配置

  1. 创建表环境

    基于流处理执行环境,调create方法直接创建:

    val tableEnv = StreamTableEnvironment.create(env)

表环境(TableEnvironment)是flink中集成Table API & SQL的核心概念。它主要负责:

  • 注册catalog
  • 在内部 catalog 中注册表
  • 执行 SQL 查询
  • 注册用户自定义函数
  • 将 DataStream 或 DataSet 转换为表
  • 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
  1. 创建TableEnv的时候,可以通过一些参数来配置 TableEnvironment的特性。

    • 老版本的流式查询配置

      EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useOldPlanner()      // 使用老版本planner
        .inStreamingMode()    // 流处理模式
        .build()
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
      
  • 老版本的批处理配置

    EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useOldPlanner()      // 使用老版本planner
                    .inBatchMode()    // 使用老版本的流处理模式
                    .build()
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)    
  • blink版本的流处理配置

    EnvironmentSettings settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
    .inStreamingMode().build()
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
  • blink版本的批处理配置

    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

3. Catalog的操作使用

1)Catalog的类型:

  1. GenericInMemoryCatalog: 内置Catalog。名为default_catalog,默认数据库名为default_database。默认,如用TableEnvironment#registerTable注册的表,均会注册到这个Catalog中。
  2. User-Defined Catalog: 用户自定义Catalog。如flink-connector-hive中的HiveCatalog。

GenericInMemoryCatalog 中的元数据对象名区分大小写。HiveCatalog以小写存储所有元数据对象名。

默认使用的Catalog: default_catalog;Database: default_database。

2)Catalog的用法:

  1. 获取当前使用的Catalog

    tableEnv.getCurrentCatalog()
  2. 获取当前使用的Database

    tableEnv.getCurrentDatabase()
  3. 注册自定义Catalog

    tableEnv.registerCatalog("custom-catalog",new CustomCatalog("customCatalog"))
  4. 列出所有Catalog

    tableEnv.listCatalogs()
  5. 列出所有Database

    tableEnv.listDatabases()
  6. 切换Catalog

    tableEnv.useCatalog("catalogName")
  7. 切换Database

    tableEnv.useDatabase("databaseName")

4. 文件系统的读取操作实现(csv)

  1. POM依赖

    <!-- 导入csv格式描述器的依赖包-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
  2. public static void main(String[] args) throws Exception {
        //1. 创建流式程序运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. 没有指定EnviromentSettings,默认使用的是老版本的Planner的流式查询
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        //3. 指定读取csv文件的路径
        String filePath = "./data/user.csv";
    
        //4. 读取csv的文件,配置属性类型
        tabEnv.connect(new FileSystem().path(filePath))//读取指定文件目录的文件数据,该对象一定是实现了ConnectorDescriptor的实现类
        .withFormat(new Csv()) //定义从外部文件读取数据的格式化方法,需要传入继承自FormatDescriptor抽象类的实现类
        .withSchema(new Schema()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
        )//定义表的结构
        .createTemporaryTable("inputTable");
    
        System.out.println(tabEnv.getCurrentCatalog());
        System.out.println(tabEnv.getCurrentDatabase());
        //5. 将table表的数据转换成table对象
        Table inputTable = tabEnv.from("inputTable");
    
        //6. 打印测试
        tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
    
        env.execute();
    }

5. 消息队列的读取操作实现(kafka)

  1. POM依赖

    <!-- 导入kafka连接器jar包-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <!-- flink json序列化jar包-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
  2. public static void main(String[] args) throws Exception {
        //创建流式程序运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //没有指定EnviromentSettings,默认使用的是老版本的Planner的流式查询
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        //接入kafka的connect消费数据
        tabEnv.connect(new Kafka() //从kafka中读取数据
                .version("universal") //指定当前环境采用的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
                .topic("rate")  //指定消费的topic名字
                .property("zookeeper.connect", "10.10.20.15:2181") //指定zookeeper的集群地址
                .property("bootstrap.servers", "10.10.20.15:9092") //指定消费kafka的集群地址
        ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("timestamp", DataTypes.BIGINT())
                        .field("type", DataTypes.STRING())
                        .field("rate", DataTypes.INT())
                ).createTemporaryTable("RateInputTable");
    
        Table rateInputTable = tabEnv.from("RateInputTable");
    
        tabEnv.toAppendStream(rateInputTable, Rate.class).print();
    
        env.execute();
    }
  3. 开启kafka消费端:

    bin/kafka-console-producer.sh --broker-list 10.10.20.15:9092 --topic rate

发送数据:

1618388392479, 'REF', 9
1618388392480, 'USD', 4
1618388392580, 'HKD', 9

6. 如何进行条件查询操作

6.1 Table API的实现方式

Table API是集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。

Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。

代码实现:

...
//基于TableAPI进行数据的查询转换操作,所以要求注册的临时表需要读取出来,赋值给一个Table对象实例才可以操作
Table resultTable = inputTable.filter("id == 1").select("id,name");

//使用TableAPI对Table对象进行聚合计算
Table aggResultTable = inputTable.groupBy("id").select("id,id.count as count");

//打印测试
tabEnv.toAppendStream(resultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print("resultTable>>>").setParallelism(1) ;
tabEnv.toRetractStream(aggResultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("aggResultTable>>>").setParallelism(1) ;
...

输出结果:

resultTable>>>> (1,zhangsan)
aggResultTable>>>> (true,(2,1))
aggResultTable>>>> (false,(2,1))
aggResultTable>>>> (true,(2,2))
aggResultTable>>>> (true,(1,1))

true代表新的数据, false代表已存在历史数据,然后再次打印“true,(2,2)“ 进行累积统计。

6.2 SQL的实现方式

Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。

// 使用SQL对表的数据进行操作
Table resultTableBySQL = tabEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id");
tabEnv.toRetractStream(resultTableBySQL, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("sql result>>>").setParallelism(1) ;

7. 实现数据的输出操作

表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。

具体实现,输出表最直接的方法,就是通过 Table.executeInsert() 方法将一个 Table 写入注册过的 TableSink 中。

7.1 输出到文件

代码实现:

//1. 创建流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 没有指定EnviromentSettings,默认使用的是老版本的Planner的流式查询
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

env.setParallelism(1);

//3. 指定读取csv文件的路径
String filePath = "./data/user.csv";

//4. 读取csv的文件,配置属性类型
tabEnv.connect(new FileSystem().path(filePath))//读取指定文件目录的文件数据,该对象一定是实现了ConnectorDescriptor的实现类
 .withFormat(new Csv()) //定义从外部文件读取数据的格式化方法,需要传入继承自FormatDescriptor抽象类的实现类
 .withSchema(new Schema()
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )//定义表的结构
 .createTemporaryTable("inputTable");

//5. 将table表的数据转换成table对象
Table inputTable = tabEnv.from("inputTable");

Table resultTable = inputTable.select("id,name");

//定义结果表,将文件数据写入到结果文件中
tabEnv.connect(new FileSystem().path("./data/user.log"))
 .withFormat(new Csv())
 .withSchema(new Schema() //这个方法一定要指定
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )
 .createTemporaryTable("outputTable");

resultTable.executeInsert("outputTable");

//6. 打印测试
tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);

env.execute();

7.2 输出到KAFKA

处理的数据可以支持输出到Kafka,结合前面的Kafka作为输入数据, 创建数据管道,再输出至Kafka消息队列:

String kafkaNode = "10.10.20.15:2181";
String kafkaNodeServer = "10.10.20.15:9092";
//创建流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//没有指定EnviromentSettings,默认使用的是老版本的Planner的流式查询
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

//接入kafka的connect消费数据
tabEnv.connect(new Kafka() //从kafka中读取数据
        .version("universal") //指定当前环境采用的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
        .startFromEarliest()
        .topic("rate")  //指定消费的topic名字
        .property("zookeeper.connect", kafkaNode) //指定zookeeper的集群地址
        .property("bootstrap.servers", kafkaNodeServer) //指定消费kafka的集群地址
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateInputTable");

Table rateInputTable = tabEnv.from("RateInputTable");

//接入kafka的connect消费数据
tabEnv.connect(new Kafka() //从kafka中读取数据
        .version("universal") //指定当前环境采用的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
        .topic("output_rate")  //指定消费的topic名字
        .property("zookeeper.connect", kafkaNode) //指定zookeeper的集群地址
        .property("bootstrap.servers", kafkaNodeServer) //指定消费kafka的集群地址
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateOutputTable");

// 将table数据写入kafka消息队列
rateInputTable.executeInsert("RateOutputTable");

// 打印数据信息
tabEnv.toAppendStream(rateInputTable, StreamOutputKafkaApplication.Rate.class).print();

env.execute();

本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK