4

Lagom: Lets Stream Kafka Messages And Process using Akka Actor

 3 years ago
source link: https://blog.knoldus.com/lagom-lets-stream-kafka-messages-and-process-using-akka-actor/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Lagom: Lets Stream Kafka Messages And Process using Akka Actor

Reading Time: 5 minutes

Lagom is an opensource framework for building reactive applications using Java or Scala and it is built on Akka and Play, which are well-known technologies performing in production in some of the most performance-centric and scalable application systems. The Lagom has been continuously proving itself as a user-friendly and convenient framework to design and develop scalable microservices. However, the microservices can either based on orchestration, choreography or hybrid architecture depending on use case. On the other hand, Kafka is a distributed streaming platform that supports streaming services out of the way in a scalable and reliable way and many applications are using Kafka as a backbone for highly distributed systems these days. A combination of both (Lagom and Kafka) can be used to build highly scalable and distributed systems using choreography. In this blog, we are going to look at the basics of building a Lagom application that can consume messages from Kafka and react based on messages/events.

lagom_kafka_template (1).png

Unlike the traditional way, consuming Kafka messages from Kafka is a bit different in Lagom applications. Lagom provides an abstraction over the Kafka consumer that can be leveraged to consume messages with a rich tooling support provided by Lagom for development and deployment. It uses alpakka kafka connector to consume messages from one or more kafka topics.

I will try to explain the Lagom way of consuming messages from Kafka with an example using a sample processor application.

Plugins.sbt:

As you can see we have lagom-sbt-plugin and few more required plugins like code coverage and conductr for deployment. Here is a plugin.sbt to include Lagom plugin into your project.

resolvers += Resolver.sonatypeRepo("public")

// IDE integration plugins
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")

// Use the Play sbt plugin for Play projects
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.12")

// The Lagom plugin
addSbtPlugin("com.lightbend.lagom" % "lagom-sbt-plugin" % "1.4.4")

// Code coverage plugin
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")

// Static code analysis tools
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

// Conductr
addSbtPlugin("com.lightbend.conductr" % "sbt-conductr" % "2.7.2")

Build.sbt:

So this sample project is divided into 3 modules:

  1. processor-api
  2. processor-impl
  3. external-service

Here is the build.sbt for it:

name := "lagom-kafka-template"

version := "0.1"

scalaVersion := "2.12.6"

import sbt.Keys._
import Dependencies.autoImport._

lazy val `processor` = (project in file("."))
  .aggregate(`processor-api`, `processor-impl`)

lazy val `processor-api` = (project in file("processor-api"))
  .settings(
    libraryDependencies ++= Seq(
      lagomScaladslApi
    )
  )

// An external service as kafka producer service
lazy val `external-Service` = (project in file("external-service"))
  .settings(
    libraryDependencies ++= Seq(
      lagomScaladslApi
    )
  )

lagomCassandraPort in ThisBuild := 9042
lagomKafkaEnabled in ThisBuild := false
lagomKafkaPort in ThisBuild := 9092

lazy val `processor-impl` = (project in file("processor-impl"))
  .enablePlugins(LagomScala)
  .settings(
    libraryDependencies ++= Seq(
      lagomScaladslPersistenceCassandra,
      lagomScaladslKafkaBroker,
      lagomScaladslTestKit,
      macwire
    )
  )
  .settings(lagomForkedTestSettings: _*)
  .dependsOn(`processor-api`)
  .dependsOn(`external-Service`)

Let’s have a look into all of them and there use:

Processor-Api:

This module contains the declaration of the processor api using lagom service descriptor.

import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.api.{Descriptor, Service}
import com.lightbend.lagom.scaladsl.api.Service.{ named, topic }

trait ExternalService extends Service {

  def inboundTopic: Topic[KafkaMessage]

  final override def descriptor: Descriptor = {
    named("external-service")
    .withTopics(
    topic("external-messages", this.inboundTopic)
    ).withAutoAcl(true)
  }

}

As you can see, the ProcessorService extends Service (a trait provided by lagom) which provide descriptor and we need to provide the implementation of the descriptor for the service. Aprat from it, the remaining functions can/should be declared only within the service and should be implemented in impl.

External-Service: 

This module contains the declaration of an external service that stands for the Kafka producer service. The Lagom is built on the concept of multiple microservices and assumes that all the microservices are build using lagom. So, the producer service should also build using lagom.

If you want to consume Kafka topic generated by a third party application which is a very common scenario then, of course, you can do that using an external service declaration like I am doing in this application.

Here the service name is “external-service” and the kafka topic from where you want to consume messages is “external-messages”. The “inboundTopic” is the declaration for a kafka topic including message type as KafkaMessage.

import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.api.{Descriptor, Service}
import com.lightbend.lagom.scaladsl.api.Service.{ named, topic }

trait ExternalService extends Service {

  def inboundTopic: Topic[KafkaMessage]

  final override def descriptor: Descriptor = {
    named("external-service")
    .withTopics(
    topic("external-messages", this.inboundTopic)
    ).withAutoAcl(true)
  }

}

Processor-Impl:

This module contains the actual implementation of the service defined in the processor-api module.

import akka.NotUsed
import com.knoldus.api.api.ProcessorService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.server.ServerServiceCall

import scala.concurrent.Future

class ProcessorServiceImpl extends ProcessorService{

  override def getTopicMessage(topicName: String): 
  ServiceCall[NotUsed, String] = ServerServiceCall { _ =>
    Future.successful("data...")
  }

}

As you can see here we extend the ProcessorService and have to provide the implementation for the function getTopicMessage which was declared in ProcessorService. This is one of the most important class for a lagom rest application. But to consume messages from kafka topic we also need to define one more class this is KafkaSubscriber.

KafkaSubscriber:

Here is the KafkaSubscriber:

class KafkaSubscriber(
                       externalService: ExternalService,
                       actorRef1: ActorRef
                     ) extends FlowHelper {

  override val actorRef: ActorRef = actorRef1

  val consumerGroup = "consumer-group-1"

  implicit val timeout = akka.util.Timeout(5, TimeUnit.SECONDS)

  // Start consuming messages from the kafka topic
  // Where inbound topic is the topic from where we need to consume messages
  // subscribe is used to obtain a subscriber to this topic
  // withGroupId returns A copy of this subscriber with the passed group id
  // withMetadata returns this subscriber, but message payloads are wrapped in [[Message]] instances to allow
  //  --- accessing any metadata associated with the message.
  // atLeastOnce : Applies the passed `flow` to the messages processed by this subscriber. Messages are delivered to the passed
  //   * `flow` at least once.
  externalService.inboundTopic.subscribe.withGroupId(consumerGroup).atLeastOnce {
    kafkaMessageFlow
  }
}

As you can see within KafkaSubscriber, we have defined following subscriber:

externalService.inboundTopic.subscribe.withGroupId(consumerGroup).atLeastOnce {  kafkaMessageFlow }

Here the external service is our external service which stands for lagom or third-party service. It contains a topic called inboundTopic. We can subscribe this topic and consume data from it. The next function declares the consumer group for this topic. Next part which is atLeastOnce, defines the consumer strategy. We can choose from the other strategies as well depending on requirements. This function requires the akka stream Flow to handle the processing logic which we can define in a separate trait.

FlowHelper:

The KafkaSubscriber extends FlowHelper which is a helper class where we can define the flow of kafka messages using akka streams.

trait FlowHelper {

  implicit val timeOut = Timeout(5000.milli)
  val actorRef: ActorRef
  val parallelism = 8

  val terminateFlow: Flow[Any, Done, NotUsed] = Flow[Any].map(_ => Done)

  val forwardKafkaMessageToWorker: Flow[KafkaMessage, Done, NotUsed] = Flow[KafkaMessage]
    .mapAsync(parallelism) { kafkaMessageWithMeta =>
      (actorRef ? kafkaMessageWithMeta)
        .map { _ =>
          println("Got response from actor : " + Done)
          Done
        }
        .recover {
          case ex: Exception =>
            println("Exception found while waiting for processor response: " + ex)
            Done
        }
    }

  val kafkaMessageFlow: Flow[KafkaMessage, Done, NotUsed] = Flow[KafkaMessage]
    .map { msg =>
      println(s"Got message from kafka : [$msg]")
      msg
    }
    .via(forwardKafkaMessageToWorker)
    .via(terminateFlow)
}

Here “forwardKafkaMessageToWorker” is the flow defined to pass the message to actorRef which can be a single actor, a router or a cluster sharded actor ref. After processing the message the actor returns Done and the flow is complete for the message.

Run the App:

Here are the steps to run the application to consume messages from kafka topic:

Step 1. Clone the repo from here.

Step 2. Run sbt from root directory.

Step 3. Change project to processor-impl

project processor-impl

Step 4. Start embeded cassandra and kafka

sbt:processor-impl> lagomKafkaStart
sbt:processor-impl> lagomCassandraStart

Step 5. Run application

sbt:processor-impl> run

Step 6. Start kafka console producer to populate sample messages fro root kafka directory on a separate terminal

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic external-messages
>{"name":"girish","date":"267 July 2020","message":"Hello"}
>{"name":"girish","date":"267 July 2020","message":"Hello"}

The output you will see is log messages for processing done:

22:17:06.211 [info] com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor [sourceThread=processor-impl-application-akka.actor.default-dispatcher-5, akkaTimestamp=16:47:06.211UTC, akkaSource=akka.tcp://[email protected]:34681/user/cassandraOffsetStorePrepare-singleton/singleton/cassandraOffsetStorePrepare, sourceActorSystem=processor-impl-application] - Cluster start task cassandraOffsetStorePrepare done.
Got message from kafka : [KafkaMessage(girish,267 July 2020,Hello)]
Message found: KafkaMessage(girish,267 July 2020,Hello)
Got response from actor : Done

You can play with this template more and more with different implementations of actor and message flow.

Hope you enjoyed the post. Thanks for reading!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK