A Quick Demo: Kafka to Flink to Cassandra

 3 years ago
source link: https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/
A Quick Demo: Kafka to Flink to Cassandra

Reading Time: 3 minutes

Hi Folks!! In this blog, we are going to learn how we can integrate Flink with Kafka and Cassandra to build a simple streaming data pipeline.

Apache Flink is a framework and distributed processing engine. it is used for stateful computations over unbounded and bounded data streams.
Kafka is a scalable, high performance, low latency platform. It allows reading and writing streams of data like a messaging system.
Cassandra: A distributed and wide-column NoSQL data store.

Minimum Requirements and Installations

To start the application, you will need Kafka, and Cassandra installed locally on your machine. The minimum requirements for the application:

Java 1.8+, scala 2.12.2, Flink 1.9.0 , sbt 1.3.12, Kafka 2.3.0 , Cassandra 3.10.


libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.9.0" ,
  "org.json4s" %% "json4s-native" % "3.6.10",

  // cassandra
  "org.apache.flink" %% "flink-connector-cassandra" % "1.9.0"

Connecting to Kafka and reading streams.

import org.apache.flink.streaming.api.scala._
implicit lazy val formats = org.json4s.DefaultFormats

// Open Kafka connection and Streaming car data through topic.    
val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testKafka");

val kafkaConsumer = new FlinkKafkaConsumer[String]("car.create", new SimpleStringSchema(), properties)

// parsing JSON string into Car case class using json4s
val carDataStream = streamExecutionEnvironment.addSource(kafkaConsumer)
	.flatMap(raw => JsonMethods.parse(raw).toOption)

In the above code snippet, reading JSON data from Kafka Topic “car.create” which contains information about Cars. And deserializes the message as a JSON string using SimpleStringSchema. Then parsing JSON string into Scala case class using json4s. The Car model looks like below:

case class Car(
	Name: String,
	Miles_per_Gallon: Option[Double],
	Cylinders: Option[Long],
	Displacement: Option[Double],
	Horsepower: Option[Long],
	Weight_in_lbs: Option[Long],
	Acceleration: Option[Double],
	Year: String,
	Origin: String)

By the use of the Flink streaming engine and reading the JSON data from the Kafka topic, we will get DataStream[Car] as a result. You can apply some Transformations to the Car DataStream. Then sink the resultant DataStream to the Cassandra Database.

Writing Flink DataStream to CassandraDB.

import org.apache.flink.streaming.api.scala._
createTypeInformation[(String, Option[Long], Option[Long])]

//Creating car data to sink into cassandraDB.
val sinkCarDataStream = sinkCarStream.map(car =>
 (car.Name, car.Cylinders.orNull, car.Horsepower.orNull))

//Open Cassandra connection and Sinking car data into cassandraDB.
.setQuery("INSERT INTO example.car(Name, Cylinders, Horsepower) values (?, ?, ?);")

We are all set with our handy code. You can find complete source code here.

Now lets start the Kafka and Cassandra services locally to test it.

Running Cassandra:

Go to the Cassandra bin directory and run the below command to start cassandra server:


Then, go inside the cassandra shell by running command:


In the shell, run below commands to create Keyspace example and table car into cassandra-

      'class' : 'SimpleStrategy', 'replication_factor' : 1 } 

CREATE TABLE [IF NOT EXISTS] example.car("Name" text primary key, "Cylinders" int, "Horsepower" int)

Running Kafka:

Go inside your kafka directory:

  • Start Zookeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start Kafka server:
bin/kafka-server-start.sh config/server.properties
  • Create Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic car.create
  • Start Kafka Producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car.create

Runnuing Flink application

Go inside the project and open a terminal and run the below commands:

sbt clean compile
sbt run

Produce some sample messages in the kafka topic car.create

{“Name”:”saab 99e”, “Miles_per_Gallon”:25, “Cylinders”:4, “Displacement”:104, “Horsepower”:95, “Weight_in_lbs”:2375, “Acceleration”:17.5, “Year”:”1970-01-01″, “Origin”:”Europe”} {“Name”:”amc gremlin”, “Miles_per_Gallon”:21, “Cylinders”:6, “Displacement”:199, “Horsepower”:90, “Weight_in_lbs”:2648, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”} {“Name”:”chevy c20″, “Miles_per_Gallon”:10, “Cylinders”:8, “Displacement”:307, “Horsepower”:200, “Weight_in_lbs”:4376, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”}


Go to the cassandra shell and run the below command:

select * from example.car;

You will get Name of the cars, Number of Cylinders used, and Horsepower of a cars into the cassandra Database that streams from kafka.

Thanks for reading. Stay connected for more future blogs.

