60

零基础学 Flink:Data Source & Data Sink

 5 years ago
source link: https://www.tuicool.com/articles/RVbiqyb
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在上一篇讲述CEP的文章里,直接使用了自定义Source和Sink,我翻阅了一下以前的文章,似乎没有对这部分进行一个梳理,那么今天我们来就这上次的代码,来说说 Data Source 和 Data Sink吧。

从宏观上讲,flink的编程模型就可以概况成接入data source,然后进行数据转换操作,再讲处理结果sink出来。如下图所示。

Ybimmmv.png!web

其实这可以形成一个完美的闭环,将处理结果sink到另外一个流里的时候,那么这个sink就又可以变成下一个flink job的source了。当然也可以选择sink到一个csv文件里,或是通过jdbc写到数据库里。

Data Source

我们还是以上一篇文章的空气质量例子为例,我们制造一个发生器,来向制造数据,然后将数据写入kafka。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import wang.datahub.cep.event.AirQualityRecoder;
//import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class WriteIntoKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Map prop = new HashMap();
        prop.put("bootstrap.servers", "localhost:9092");
        prop.put("topic", "test1");
        ParameterTool parameterTool = ParameterTool.fromMap(prop);
        DataStream<AirQualityRecoder> messageStream = env.addSource(new SimpleGenerator());
        DataStreamSink<AirQualityRecoder> airQualityVODataStreamSink = messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
                parameterTool.getRequired("topic"),
                new SimpleAirQualityRecoderSchema()));
        messageStream.print();
        env.execute("write to kafka !!!");
    }

    public static class SimpleGenerator implements SourceFunction<AirQualityRecoder>{
        private static final long serialVersionUID = 1L;
        boolean running = true;
        @Override
        public void run(SourceContext<AirQualityRecoder> ctx) throws Exception {
            while(running) {
                ctx.collect(AirQualityRecoder.createOne());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

    }

    public static class SimpleAirQualityRecoderSchema implements DeserializationSchema<AirQualityRecoder>, SerializationSchema<AirQualityRecoder>{
        @Override
        public AirQualityRecoder deserialize(byte[] message) throws IOException {
            //System.out.println(new String(message));
            ByteArrayInputStream bi = new ByteArrayInputStream(message);
            ObjectInputStream oi = new ObjectInputStream(bi);
            AirQualityRecoder obj = null;
            try {
                obj = (AirQualityRecoder)oi.readObject();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            bi.close();
            oi.close();
            return obj;
        }

        @Override
        public boolean isEndOfStream(AirQualityRecoder nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(AirQualityRecoder element) {
            byte[] bytes = null;
            try {

                ByteArrayOutputStream bo = new ByteArrayOutputStream();
                ObjectOutputStream oo = new ObjectOutputStream(bo);
                oo.writeObject(element);
                bytes = bo.toByteArray();
                bo.close();
                oo.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return bytes;
//            return element.toCsvRec().getBytes();
        }

        @Override
        public TypeInformation<AirQualityRecoder> getProducedType() {
            return TypeInformation.of(new TypeHint<AirQualityRecoder>(){});
        }
    }

}

这里是完整代码,我们需要实现一个类,实现SourceFunction接口,重写run和cancel两个方法,run方法用于告诉上下文如何添加数据,而cancel方法则实现如何取消这个数据发生器。将这个类的实例addSource到当前环境实例上,就完成的数据的接入。这个例子,我们还使用了一个kafka connector提供的默认sink,将模拟数据写入kafka。

Data Sink

Sink部分会介绍两部分内容,1.Sink 到  JDBC 2.通过 Flink SQL Sink 到 CSV

Sink 到  JDBC

首先我们创建一个sink类,继承RichSinkFunction,需要实现其open,close,invoke三个方法,其中open和close用于初始化资源和释放资源,invoke用于实现具体的sink动作。

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import wang.datahub.cep.event.AirQualityRecoder;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MySQLSink extends RichSinkFunction<AirQualityRecoder> {
    PreparedStatement ps;
    private Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into sinktable(id, city,airquality,emmit,et) values(?, ?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    @Override
    public void invoke(AirQualityRecoder value, Context context) throws Exception {

        //组装数据,执行插入操作
        ps.setString(1, value.getId());
        ps.setString(2, value.getCity());
        ps.setInt(3, value.getAirQuality());
        ps.setDate(4,new java.sql.Date(value.getEmmit().getTime()));
        ps.setDate(5,new java.sql.Date(value.getEt()));
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flinksink?useUnicode=true&characterEncoding=UTF-8", "dafei1288", "dafei1288");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return con;
    }
}

构建一个测试类,直接读取kafka输入的数据,然后将其sink出来。

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import wang.datahub.cep.WriteIntoKafka;
import wang.datahub.cep.event.AirQualityRecoder;
import java.util.HashMap;
import java.util.Map;
public class SinkToMySQLApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Map properties= new HashMap();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
//        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("topic", "test1");
        ParameterTool parameterTool = ParameterTool.fromMap(properties);
        FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
                parameterTool.getRequired("topic"), new WriteIntoKafka.SimpleAirQualityRecoderSchema(), parameterTool.getProperties());
        DataStream<AirQualityRecoder> aqrStream = env
                .addSource(consumer010);
        MySQLSink mSink = new MySQLSink();
        aqrStream.addSink(mSink);
        env.execute("write to mysql");
    }
}

执行成功。

nQRbUvz.jpg!web

通过 Flink SQL Sink 到 CSV

这个sink比较特殊,是通过flink sql执行DML来,最终达到sink的目的,我们这个案例,使用了API提供的CsvTableSink。

这里我们将输入流的数据,注册成了AirQualityRecoder表,然后sink table ss以及其包含的字段名称和类型,,最后通过SQL语句

INSERT INTO ss SELECT id,city,airQuality,emmit,et FROM AirQualityRecoder

将数据写入csv文件中

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import wang.datahub.cep.WriteIntoKafka;
import wang.datahub.cep.event.AirQualityRecoder;
import java.util.HashMap;
import java.util.Map;
public class FlinkSQLSinkToMySQLApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
        Map properties= new HashMap();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
//        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("topic", "test1");
        ParameterTool parameterTool = ParameterTool.fromMap(properties);
        FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
                parameterTool.getRequired("topic"), new WriteIntoKafka.SimpleAirQualityRecoderSchema(), parameterTool.getProperties());
        DataStream<AirQualityRecoder> aqrStream = env
                .addSource(consumer010);
        tableEnv.registerDataStreamInternal("AirQualityRecoder",aqrStream);
        TableSink csvSink = new CsvTableSink("E:\\devlop\\sourcespace\\cept\\src\\test\\aa",",");
        String[] fieldNames = {"id", "city","airQuality","emmit","et"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.STRING,Types.INT,Types.SQL_DATE,Types.LONG};
        tableEnv.registerTableSink("ss", fieldNames, fieldTypes, csvSink);
        tableEnv.sqlUpdate(
                "INSERT INTO ss SELECT id,city,airQuality,emmit,et FROM AirQualityRecoder");
        env.execute("write to mysql");
    }
}

为了方便演示,我们将AirQualityRecoder里的emmit字段,变更为了java.sql.Date类型。下面是sink的结果。

6FbYjuR.jpg!web

好了,关于 Data Source 和 Data Sink 就先介绍到这里,欢迎大家和我交流。 


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK