6

flink(十):Table&Sql注册表和视图

 3 years ago
source link: https://my.oschina.net/wangzonghui/blog/5082062
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

flink(十):Table&Sql注册表和视图 - 羽落风起 - OSCHINA - 中文开源技术交流社区

  • 本文属于实战,讲解 Flink1.12 版本java代码注册表和视图的实现方法,开发环境搭建,参考上篇文章,这里不再赘述。
  • 代码结构分为5部分,
    • 准备环境 env
    • 数据输入 source
    • 数据处理 transformation
    • 数据输出 sink
    • 启动任务 execute
  • 所有代码基于java1.8
import static org.apache.flink.table.api.Expressions.$;

import java.util.Arrays;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/***
 * 
 * @Description Table API And SQl实例一:将DataStream注册为动态表或视图,再使用sql进行统计查询。
 */
public class DataStreamToTableAndView {
	
	public static void main(String[] args) throws Exception {
		//TODO 1. env环境准备
		StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
		
		//TODO 2. source
		DataStream<Order> orderA=env.fromCollection(Arrays.asList(
				new Order(1L,"beer",3),
				new Order(1L,"diaper",4),
				new Order(3L,"rubber",2)
				));
		
		DataStream<Order> orderB=env.fromCollection(Arrays.asList(
				new Order(2L,"beer",3),
				new Order(2L,"diaper",3),
				new Order(3L,"rubber",1)
				));
		
		
		//TODO 3. transformation 将DataStream数转换Table和View,然后查询
		Table tableA=tableEnv.fromDataStream(orderA,$("user"),$("product"),$("amount"));  //注册表
		
		tableEnv.createTemporaryView("tableB",orderB,$("user"),$("product"),$("amount")); //注册视图
		
		/**
		 * 查询:tableA中amount>2 和tableB中amount>1定时护甲并合并。
		 */
		String sql="select * from "+tableA+" where amount>2 union select * from tableB where amount>1 ";
//		sql="select * from "+tableA+" where amount>2 ";
		Table resultTable=tableEnv.sqlQuery(sql);
		System.out.println("结果表约束:");
		resultTable.printSchema();
		System.out.println("表名:"+resultTable);
		
		//将table转为DataStream
//		DataStream<Order> resultDs=tableEnv.toAppendStream(resultTable, Order.class);   //仅insert 操作修改动态表数据
		
		/**
		 * retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、
		 * 将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
		 */
		DataStream<Tuple2<Boolean, Order>> resultDsTwo=tableEnv.toRetractStream(resultTable, Order.class); //
		
		//TODO 4. sink
		resultDsTwo.print();
		
		//TODO 5. execute
		env.execute("");
	}

	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class Order{
		public Long user;
		public String product;
		public int amount;
	}
}
  • Flink 支持table和视图开发,功能上各有特色,使用上看个人习惯。
  • 个人倾向于Table开发。但整体上table接口相比传统java编码风格有很大不同,使用上需要适应。另外各个版本Flink接口变动较大,旧版本大量接口废弃,希望后期能稳定下来。不然后期版本升级,大量功能接口升级,无异于二次开发。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK