Kubernetes Messaging with Java and KubeMQ – Piotr's TechBlog
source link: https://piotrminkowski.wordpress.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Kubernetes Messaging with Java and KubeMQ
Have you ever tried to run any message broker on Kubernetes? KubeMQ is relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without preparing any additional templates or manifests. This convinced me to take a closer look at KubeMQ.
KubeMQ is an enterprise-grade, scalable, high available and secure message broker and message queue, designed as Kubernetes native solution in a lightweight container. It is written in Go. Therefore is being advertised as very fast solution running inside small Docker container, which has about 30MB. It may be easily integrate with some popular third-party tools for observability like Zipkin, Prometheus or Datadog.
When I’m reading comparison with competitive tools like RabbitMQ or Redis available on KubeMQ site (https://kubemq.io/product-overview/) it looks pretty amazing (for KubeMQ of course). It seems the authors wanted to merge some useful features of RabbitMQ and Kafka in the single product. In fact, KubeMQ provides many interested mechanisms like delayed delivery, message peeking, message batch sending and receiving for queues, and consumer groups, load balancing and offsetting support for pub/sub.
Ok, when I’m looking on their SDK Java I see that it’s a new product, and there are still some things to do. However, all the features listed above seems to be very useful. Of course I won’t be able to demonstrate all of them in this article, but I’m going to show you a simple Java application that uses message queue with transactions, and pub/sub event store. Let’s begin.
Example
The example application is written in Java 11, and uses Spring Boot. The source code is available as usual on GitHub. The repository address is https://github.com/piomin/sample-java-kubemq.git.
Before start
Before starting with KubeMQ you need to have running instance of Minikube. I have tested it on version 1.6.1
.
$ minikube start --vm-driver=virtualbox
Running KubeMQ
First, you need to install KubeMQ. For Windows, you just need to download the latest version of CLI available on address https://github.com/kubemq-io/kubemqctl/releases/download/latest/kubemqctl.exe and copy it to the directory under PATH. Before installing KubeMQ on your Minikube instance we need to register on site https://account.kubemq.io/login/register. You will receive there a token required for the installation. Installation is very easy with CLI. You just need to execute command kubemqctl cluster create
with the registration token as shown below.
By default, KubeMQ creates a cluster consisting of three instances (pods). It is deployed as Kubernetes StatefulSet
. The deployment is available inside newly created namespace – kubemq
. We can easily check the list of running pods with kubectl get pod
command.
The list of pods is not very important for us. We can easily scale up and scale down the number of instances in the cluster using command kubemqctl cluster scale
. KubeMQ is exposed in the cluster under different interfaces. KubeMQ Java SDK is using GRPC protocol for communication, so we use service kubemq-cluster-grpc
available under port 50000
.
Since KubeMQ is a native Kubernetes message broker starting with it on Minikube is very simple. After executing a single command, we may now focus on development.
Example Architecture
We have example application deployed on Kubernetes, which integrates with KubeMQ queue and event store. The diagram visible below illustrates an architecture of the application. It exposes REST endpoint POST /orders
for creating new orders. Each order signify a transfer between two in-memory accounts. The incoming order is sent to the queue orders
(1). Then it is received by the listener (2), which is responsible for updating account balances using AccountRepository
bean (3). If the transaction is finished, the event is sent to the pub/sub topic transactions
. Incoming events may be listened by many subscribers (4). In the example application we have two listeners: TransactionAmountListener
and TransactionCountListener
(5). They are responsible for adding extra money to the target order’s account basing on the different criteria. The first criteria is an amount of a given transaction, while the second is number of processed transactions per account.
On the described example application I’m going to show you the following features of KubeMQ and its SDK for Java:
- Sending messages to a queue
- Listening for incoming queue messages and handling transactions
- Sending messages to pub/sub via Channel
- Subscribing to pub/sub events and reading older events from a store
- Using Spring Boot for integration with KubeMQ for standalone Java application
Let’s proceed to the implementation.
Implementation with Spring Boot and KubeMQ SDK
We are beginning with configuration. The URL to KubeMQ GRPC has been externalized in the application.yml
.
spring:
application:
name: sampleapp-kubemq
kubemq:
address: kubemq-cluster-grpc:
50000
In the @Configuration
class we are defining all required KubeMQ resources as Spring beans. Each of them requires KubeMQ cluster address. We need to declare queue, channel for sending events and subscriber for subscribing to pub/sub events and events store.
@Configuration
@ConfigurationProperties
(
"kubemq"
)
public
class
KubeMQConfiguration {
private
String address;
@Bean
public
Queue queue()
throws
ServerAddressNotSuppliedException, SSLException {
return
new
Queue(
"transactions"
,
"orders"
, address);
}
@Bean
public
Subscriber subscriber() {
return
new
Subscriber(address);
}
@Bean
public
Channel channel() {
return
new
Channel(
"transactions"
,
"orders"
,
true
, address);
}
String getAddress() {
return
address;
}
void
setAddress(String address) {
this
.address = address;
}
}
The first component in our architecture is a controller. It exposes HTTP endpoint for placing an order. OrderController
injects Queue
bean and uses it for sending message to KubeMQ queue. After receiving response that message has been delivered it returns order with id and status=ACCEPTED
.
@RestController
@RequestMapping
(
"/orders"
)
public
class
OrderController {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(OrderController.
class
);
private
Queue queue;
public
OrderController(Queue queue) {
this
.queue = queue;
}
@PostMapping
public
Order sendOrder(
@RequestBody
Order order) {
try
{
LOGGER.info(
"Sending: {}"
, order);
final
SendMessageResult result = queue.SendQueueMessage(
new
Message()
.setBody(Converter.ToByteArray(order)));
order.setId(result.getMessageID());
order.setStatus(OrderStatus.ACCEPTED);
LOGGER.info(
"Sent: {}"
, order);
}
catch
(ServerAddressNotSuppliedException | IOException e) {
LOGGER.error(
"Error sending"
, e);
order.setStatus(OrderStatus.ERROR);
}
return
order;
}
}
The message is processed asynchronously. Since the current KubeMQ Java SDK does not provide any message listener for asynchronous processing, we use synchronous method inside infinitive loop. The loop is started inside a new thread handled using Spring TaskExecutor
. When a new message is received, we are starting KubeMQ transaction. It may acknowledged or rejected. A transaction is confirmed if source account has a sufficient funds to perform a transfer to a target account. If a transaction is confirmed it sends an event to KubeMQ transactions pub/sub with information about it using Channel
bean.
@Component
public
class
OrderListener {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(OrderListener.
class
);
private
Queue queue;
private
Channel channel;
private
OrderProcessor orderProcessor;
private
TaskExecutor taskExecutor;
public
OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
this
.queue = queue;
this
.channel = channel;
this
.orderProcessor = orderProcessor;
this
.taskExecutor = taskExecutor;
}
@PostConstruct
public
void
listen() {
taskExecutor.execute(() -> {
while
(
true
) {
try
{
Transaction transaction = queue.CreateTransaction();
TransactionMessagesResponse response = transaction.Receive(
10
,
10
);
if
(response.getMessage().getBody().length >
0
) {
Order order = orderProcessor
.process((Order) Converter.FromByteArray(response.getMessage().getBody()));
LOGGER.info(
"Processed: {}"
, order);
if
(order.getStatus().equals(OrderStatus.CONFIRMED)) {
transaction.AckMessage();
Event event =
new
Event();
event.setEventId(response.getMessage().getMessageID());
event.setBody(Converter.ToByteArray(order));
LOGGER.info(
"Sending event: id={}"
, event.getEventId());
channel.SendEvent(event);
}
else
{
transaction.RejectMessage();
}
}
else
{
LOGGER.info(
"No messages"
);
}
Thread.sleep(
10000
);
}
catch
(Exception e) {
LOGGER.error(
"Error"
, e);
}
}
});
}
}
OrderListener
class is using AccountRepository
bean for account balance management. It is a simple in-memory store just for a demo purpose.
@Repository
public
class
AccountRepository {
private
List<Account> accounts =
new
ArrayList<>();
public
Account updateBalance(Integer id,
int
amount)
throws
InsufficientFundsException {
Optional<Account> accOptional = accounts.stream().filter(a -> a.getId().equals(id)).findFirst();
if
(accOptional.isPresent()) {
Account account = accOptional.get();
account.setBalance(account.getBalance() + amount);
if
(account.getBalance() <
0
)
throw
new
InsufficientFundsException();
int
index = accounts.indexOf(account);
accounts.set(index, account);
return
account;
}
return
null
;
}
public
Account add(Account account) {
account.setId(accounts.size() +
1
);
accounts.add(account);
return
account;
}
public
List<Account> getAccounts() {
return
accounts;
}
@PostConstruct
public
void
init() {
add(
new
Account(
null
,
"123456"
,
2000
));
add(
new
Account(
null
,
"123457"
,
2000
));
add(
new
Account(
null
,
"123458"
,
2000
));
}
}
And the last components in our architecture – event listeners. Both of them are subscribing to the same EventsStore
transactions. The TransactionAmountListener
is the simpler one. It is processing only a single event in order transfer percentage bonus counter from transaction amount to a target account. That’s why we have defined it should listener just for new events (EventsStoreType.StartNewOnly
).
@Component
public
class
TransactionAmountListener
implements
StreamObserver<EventReceive> {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.
class
);
private
Subscriber subscriber;
private
AccountRepository accountRepository;
public
TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
this
.subscriber = subscriber;
this
.accountRepository = accountRepository;
}
@Override
public
void
onNext(EventReceive eventReceive) {
try
{
Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
LOGGER.info(
"Amount event: {}"
, order);
accountRepository.updateBalance(order.getAccountIdTo(), (
int
) (order.getAmount() *
0.1
));
}
catch
(IOException | ClassNotFoundException | InsufficientFundsException e) {
LOGGER.error(
"Error"
, e);
}
}
@Override
public
void
onError(Throwable throwable) {
}
@Override
public
void
onCompleted() {
}
@PostConstruct
public
void
init() {
SubscribeRequest subscribeRequest =
new
SubscribeRequest();
subscribeRequest.setChannel(
"transactions"
);
subscribeRequest.setClientID(
"amount-listener"
);
subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
try
{
subscriber.SubscribeToEvents(subscribeRequest,
this
);
}
catch
(ServerAddressNotSuppliedException | SSLException e) {
e.printStackTrace();
}
}
}
The another situation us with TransactionCountListener
. It should be able to retrieve a list of all events published on pub/sub after every startup of our application. That’s why we are defining StartFromFirst
as EventStoreType
for Subscriber
. Also a clientId
needs to be dynamically generated on apply startup in order to retrieve all stored events. The listener send bonus to a target account after fifth transaction addressed to that account succesfully processed by the application.
@Component
public
class
TransactionCountListener
implements
StreamObserver<EventReceive> {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.
class
);
private
Map<Integer, Integer> transactionsCount =
new
HashMap<>();
private
Subscriber subscriber;
private
AccountRepository accountRepository;
public
TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) {
this
.subscriber = subscriber;
this
.accountRepository = accountRepository;
}
@Override
public
void
onNext(EventReceive eventReceive) {
try
{
Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
LOGGER.info(
"Count event: {}"
, order);
Integer accountIdTo = order.getAccountIdTo();
Integer noOfTransactions = transactionsCount.get(accountIdTo);
if
(noOfTransactions ==
null
)
transactionsCount.put(accountIdTo,
1
);
else
{
transactionsCount.put(accountIdTo, ++noOfTransactions);
if
(noOfTransactions >
5
) {
accountRepository.updateBalance(order.getAccountIdTo(), (
int
) (order.getAmount() *
0.1
));
LOGGER.info(
"Adding extra to: id={}"
, order.getAccountIdTo());
}
}
}
catch
(IOException | ClassNotFoundException | InsufficientFundsException e) {
LOGGER.error(
"Error"
, e);
}
}
@Override
public
void
onError(Throwable throwable) {
}
@Override
public
void
onCompleted() {
}
@PostConstruct
public
void
init() {
final
SubscribeRequest subscribeRequest =
new
SubscribeRequest();
subscribeRequest.setChannel(
"transactions"
);
subscribeRequest.setClientID(
"count-listener-"
+ System.currentTimeMillis());
subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst);
try
{
subscriber.SubscribeToEvents(subscribeRequest,
this
);
}
catch
(ServerAddressNotSuppliedException | SSLException e) {
e.printStackTrace();
}
}
}
Running on Minikube
The easiest way to run our sample application on Minikube is with Skaffold and Jib. We don’t have to prepare any Dockerfile, only a single deployment manifest in k8s
directory. Here’s our deployment.yaml
file.
apiVersion: apps/v1
kind: Deployment
metadata:
name: sampleapp-kubemq
namespace: kubemq
labels:
app: sampleapp-kubemq
spec:
replicas:
1
selector:
matchLabels:
app: sampleapp-kubemq
template:
metadata:
labels:
app: sampleapp-kubemq
spec:
containers:
- name: sampleapp-kubemq
image: piomin/sampleapp-kubemq
ports:
- containerPort:
8080
---
apiVersion: v1
kind: Service
metadata:
name: sampleapp-kubemq
namespace: kubemq
labels:
app: sampleapp-kubemq
spec:
ports:
- port:
8080
protocol: TCP
selector:
app: sampleapp-kubemq
type: NodePort
The source code is prepared to use Skaffold and Jib. It contains skaffold.yaml
file in the project root directory.
apiVersion: skaffold/v2alpha1
kind: Config
build:
artifacts:
- image: piomin/sampleapp-kubemq
jib: {}
tagPolicy:
gitCommit: {}
We also need to have jib-maven-plugin
Maven plugin in our pom.xml
.
<
plugin
>
<
groupId
>com.google.cloud.tools</
groupId
>
<
artifactId
>jib-maven-plugin</
artifactId
>
<
version
>1.8.0</
version
>
</
plugin
>
Now, we only have to execute the following command.
$ skaffold dev
Since our application is deployed on Minikube, we may perform some test calls. Assuming that Minikube node is available under address 192.168.99.100
, here’s the example of test request and response from application.
$ curl -s http:
//192
.168.99.100:30833
/orders
-d
'{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"amount":300,"status":"NEW"}'
-H
'Content-Type: application/json'
{
"type"
:
"TRANSFER"
,
"accountIdFrom"
:1,
"accountIdTo"
:2,
"date"
:null,
"amount"
:300,
"id"
:
"10"
,
"status"
:
"ACCEPTED"
}
We may check a list of queues created on KubeMQ using command kubemqctl queues list
as shown below.
After sending some another test requests and performing some restarts of application pod we may take a look on event_store
list using command kubemqctl events_store list
as shown below. We may see that there are multiple clients with id count-listener*
registered, but only the current is active.
Let’s take a look on application logs. They are automatically displayed on the screen after running skaffold dev
command. As you see each message sent to the queue is received by the listener, which performs transfer between accounts and then sends event to pub/sub. Finally both event_store
listeners receives the event.
If you restart the pod with the application TransactionCountListener
receives all events available inside event_store
and counts them for each target account id. If a total number of transaction for a single account extends 5 it sends extra funds to that account.
If transaction is rejected by OrderListener
due to lack of funds on source account the message is re-delivered to the queue.
Conclusion
In this article I show you a sample application that integrates with KubeMQ to realize standard use cases basing on queues and topics (pub/sub). Start with KubeMQ on Kubernetes and management is extremely easy with KubeMQ CLI. It has many interested features described in quite well prepared documentation available on site https://docs.kubemq.io/. As a modern, cloud native message broker KubeMQ is able to transfer billions of messages daily. However, we should bear in mind, it is relatively new product, and features are not completely refined as in competition. For example, you can compare KubeMQ dashboard (available after executing command kubemqctl cluster dashboard
) with RabbitMQ Web Admin. Of course, everything takes a little time, and I will follow a progress in KubeMQ development.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK