0

Spark Stream-Stream Join

 2 years ago
source link: https://blog.knoldus.com/spark-stream-stream-join/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Spark Stream-Stream Join

Reading Time: 2 minutes

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to learn about Spark Stream-Stream Join and see how beautifully spark now give support for joining the two streaming dataframes.

I this example, I am going to use

Apache Spark 2.3.0
Apache Kafka 0.11.0.1
Scala 2.11.8

The build.sbt looks like the following:-

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "0.11.0.1")

To create the two streaming dataframes, I am going to send the data to Kafka with some regular time interval in two separate topics, here I name them as ‘dataTopic1’ & ‘dataTopic2’.

For sending the data, first I simply make the list of integers & send these integers from the list to Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
}

In the same way, I send the data to 2nd Kafka topic called “dataTopic2”.

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
})

After sending data to Kafka, I start reading the dataframe from the topic like I read the dataFrame1 from Kafka topic “dataTopic1” as follow:-

val dataFrame1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I read the dataFrame2 from Kafka topic “dataTopic2”. After that, apply the join on the dataframe column called “data” as follows:-

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming dataframes into one, whose schema is as follow:-

root
 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the dataframes, it will give us the 2 timeStamps, one for each of the data. Because of this, the final output is as follows:-

image

If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Pasted image at 2017_11_27 04_17 PM


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK