9

A Quick Demo: Kafka to Flink to Cassandra

 3 years ago
source link: https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

A Quick Demo: Kafka to Flink to Cassandra

Reading Time: 3 minutes

Hi Folks!! In this blog, we are going to learn how we can integrate Flink with Kafka and Cassandra to build a simple streaming data pipeline.

Apache Flink is a framework and distributed processing engine. it is used for stateful computations over unbounded and bounded data streams.
Kafka is a scalable, high performance, low latency platform. It allows reading and writing streams of data like a messaging system.
Cassandra: A distributed and wide-column NoSQL data store.

Minimum Requirements and Installations

To start the application, you will need Kafka, and Cassandra installed locally on your machine. The minimum requirements for the application:

Java 1.8+, scala 2.12.2, Flink 1.9.0 , sbt 1.3.12, Kafka 2.3.0 , Cassandra 3.10.

Dependencies

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.9.0" ,
  
  "org.json4s" %% "json4s-native" % "3.6.10",

  // cassandra
  "org.apache.flink" %% "flink-connector-cassandra" % "1.9.0"
)

Connecting to Kafka and reading streams.

import org.apache.flink.streaming.api.scala._
implicit lazy val formats = org.json4s.DefaultFormats

// Open Kafka connection and Streaming car data through topic.    
val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testKafka");

val kafkaConsumer = new FlinkKafkaConsumer[String]("car.create", new SimpleStringSchema(), properties)

// parsing JSON string into Car case class using json4s
val carDataStream = streamExecutionEnvironment.addSource(kafkaConsumer)
	.flatMap(raw => JsonMethods.parse(raw).toOption)
      	.map(_.extract[Car])

In the above code snippet, reading JSON data from Kafka Topic “car.create” which contains information about Cars. And deserializes the message as a JSON string using SimpleStringSchema. Then parsing JSON string into Scala case class using json4s. The Car model looks like below:

case class Car(
	Name: String,
	Miles_per_Gallon: Option[Double],
	Cylinders: Option[Long],
	Displacement: Option[Double],
	Horsepower: Option[Long],
	Weight_in_lbs: Option[Long],
	Acceleration: Option[Double],
	Year: String,
	Origin: String)

By the use of the Flink streaming engine and reading the JSON data from the Kafka topic, we will get DataStream[Car] as a result. You can apply some Transformations to the Car DataStream. Then sink the resultant DataStream to the Cassandra Database.

Writing Flink DataStream to CassandraDB.

import org.apache.flink.streaming.api.scala._
createTypeInformation[(String, Option[Long], Option[Long])]

//Creating car data to sink into cassandraDB.
val sinkCarDataStream = sinkCarStream.map(car =>
 (car.Name, car.Cylinders.orNull, car.Horsepower.orNull))

//Open Cassandra connection and Sinking car data into cassandraDB.
CassandraSink.addSink(sinkCarDataStream)
.setHost("127.0.0.1")
.setQuery("INSERT INTO example.car(Name, Cylinders, Horsepower) values (?, ?, ?);")
.build

We are all set with our handy code. You can find complete source code here.

Now lets start the Kafka and Cassandra services locally to test it.

Running Cassandra:

Go to the Cassandra bin directory and run the below command to start cassandra server:

./cassandra 

Then, go inside the cassandra shell by running command:

./cqlsh

In the shell, run below commands to create Keyspace example and table car into cassandra-

CREATE  KEYSPACE [IF NOT EXISTS] example 
   WITH REPLICATION = { 
      'class' : 'SimpleStrategy', 'replication_factor' : 1 } 
   };

CREATE TABLE [IF NOT EXISTS] example.car("Name" text primary key, "Cylinders" int, "Horsepower" int)

Running Kafka:

Go inside your kafka directory:

  • Start Zookeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start Kafka server:
bin/kafka-server-start.sh config/server.properties
  • Create Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic car.create
  • Start Kafka Producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car.create

Runnuing Flink application

Go inside the project and open a terminal and run the below commands:

sbt clean compile
sbt run

Produce some sample messages in the kafka topic car.create

{“Name”:”saab 99e”, “Miles_per_Gallon”:25, “Cylinders”:4, “Displacement”:104, “Horsepower”:95, “Weight_in_lbs”:2375, “Acceleration”:17.5, “Year”:”1970-01-01″, “Origin”:”Europe”} {“Name”:”amc gremlin”, “Miles_per_Gallon”:21, “Cylinders”:6, “Displacement”:199, “Horsepower”:90, “Weight_in_lbs”:2648, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”} {“Name”:”chevy c20″, “Miles_per_Gallon”:10, “Cylinders”:8, “Displacement”:307, “Horsepower”:200, “Weight_in_lbs”:4376, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”}

Result

Go to the cassandra shell and run the below command:

select * from example.car;

You will get Name of the cars, Number of Cylinders used, and Horsepower of a cars into the cassandra Database that streams from kafka.

Thanks for reading. Stay connected for more future blogs.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK