5

Flink: Join two Data Streams

 3 years ago
source link: https://blog.knoldus.com/flink-join-two-data-streams/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Flink: Join two Data Streams

Reading Time: 3 minutes

Apache Flink offers rich sources of API and operators which makes Flink application developers productive in terms of dealing with the multiple data streams. Flink provides many multi streams operations like UnionJoin, and so on. In this blog, we will explore the Window Join operator in Flink with an example. It joins two data streams on a given key and a common window.

Let say we have one stream which contains salary information of all the individual who belongs to an organization. The salary information has the id, name, and salary of an individual. This stream is available at port 9000 on the localhost.

 final DataStream<Tuple3<Integer, String, Double>> salaryStream = executionEnvironment
                    .socketTextStream("localhost", 9000)
                    .map((MapFunction<String, Tuple3<Integer, String, Double>>) salaryTextStream -> {
                        String[] salaryFields = salaryTextStream.split(" ");
                        if (salaryFields.length == 3 &&
                                !(salaryFields[0].isEmpty()
                                        || salaryFields[1].isEmpty()
                                        || salaryFields[2].isEmpty())) {
                            return new Tuple3<>(Integer.parseInt(salaryFields[0]),
                                    salaryFields[1],
                                    Double.parseDouble(salaryFields[2]));
                        } else {
                            throw new Exception("Not valid input passed");
                        }
                    }, TypeInformation.of(new TypeHint<Tuple3<Integer, String, Double>>() {
                    }));

We have another stream which contains department information of all the individual who belongs to the same organization. The department information has the id, and department of an individual. This stream is available at port 9001 on the localhost.

 final DataStream<Tuple2<Integer, String>> departmentStream = executionEnvironment
                    .socketTextStream("localhost", 9001)
                    .map((MapFunction<String, Tuple2<Integer, String>>) departmentTextStream -> {
                        String[] salaryFields = departmentTextStream.split(" ");
                        if (salaryFields.length == 2 &&
                                !(salaryFields[0].isEmpty()
                                        || salaryFields[1].isEmpty())) {
                            return new Tuple2<>(Integer.parseInt(salaryFields[0]), salaryFields[1]);
                        } else {
                            throw new Exception("Not valid input passed");
                        }
                    }, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {
                    }));

Now, join the salary data stream and department data stream on a key id of an individual which is common in both the streams. After joining, The resultant data stream will have all the information in one go -: id, name, salary, and department of an individual.

final DataStream<Tuple4<Integer, String, String, Double>> joinedStream =
                 salaryStream.join(departmentStream)
                 .where(getSalaryJoinKey -> getSalaryJoinKey.f0, TypeInformation.of(new TypeHint<Integer>() {}))
                 .equalTo((KeySelector<Tuple2<Integer, String>, Integer>) getDepartmentKey -> getDepartmentKey.f0)
                 .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                 .apply((JoinFunction<Tuple3<Integer, String, Double>,
                         Tuple2<Integer, String>, Tuple4<Integer, String, String, Double>>) (salaryDetail, departmentDetail) ->
                                    new Tuple4<>(salaryDetail.f0, salaryDetail.f1, departmentDetail.f1, salaryDetail.f2),
                                    TypeInformation.of(new TypeHint<Tuple4<Integer, String, String, Double>>() {}));

Here, using a common window for both the stream. We want a tumbling window and window to be based on processing time that’s why using TumblinProcessingTimeWindows Class. The window size is 30 sec which means all entities from both the streams that come within 30 seconds will be included in one window. Then apply JoinFunction to perform join on both the streams and get the resultant complete joined information of an individual.

You can find the complete code here

Join in Action

To run the application open two socket terminal one with port 9000 and another with port 9001. Streaming application is going to listen these ports.

nc -l 9000
nc -l 9001

Start the flink local cluster-

./bin/start-cluster.sh

Now run the flink application and also tail the log to see the output.

tail -f log/flink--taskexecutor-.out

Enter messages in both of these two netcat windows within a window of 30 seconds to join both the streams. The resultant data stream has complete information of an individual-: the id, name, department, and salary.

Happy Blogging and keep Learning!!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK