Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI...
source link: https://dev.to/ksingh7/connecting-to-kafka-cluster-running-on-kubernetes-from-your-local-machine-cli-programatic-access-37ld
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Introduction
Why do you need this?
- For local development you want to connect to a remote Kafka Cluster running on OpenShift , that is deployed using Strimzi Operator
Prerequisite
OpenShift Container Platform or OKD
Strimzi Operator deployed
Deploy Kafka Cluster
- Create a YAML file with these contents (only for dev/test clusters)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: nestjs-testing
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
inter.broker.protocol.version: "2.8"
log.message.format.version: "2.8"
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
- name: route
port: 9094
tls: true
type: route
replicas: 3
storage:
type: ephemeral
version: 2.8.0
zookeeper:
replicas: 3
storage:
type: ephemeral
Enter fullscreen mode
Exit fullscreen mode
Preparing to Connect
oc get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
# This should create 2 files in PWD
ls -l *.crt *.jks
Enter fullscreen mode
Exit fullscreen mode
Grab Kafka Endpoint
KAFKA_ENDPOINT=$(oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="route")].bootstrapServers}{"\n"}')
Enter fullscreen mode
Exit fullscreen mode
Connecting from CLI (Kafka Console Producer/Consumer)
- Get Kafka Console Producer & Consumer script files
wget [https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz](https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz) ; tar -xvf kafka_2.13-3.0.0.tgz
Enter fullscreen mode
Exit fullscreen mode
- Console Producer
kafka_2.13-3.0.0/bin/kafka-console-producer.sh --broker-list $KAFKA_ENDPOINT --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=truststore.jks --topic my-topic
Enter fullscreen mode
Exit fullscreen mode
- Console Consumer
kafka_2.13-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_ENDPOINT --topic my-topic --from-beginning --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=truststore.jks
Enter fullscreen mode
Exit fullscreen mode
Connecting from Python Client (running locally)
from kafka import KafkaProducer, KafkaConsumer
import json
from bson import json_util
bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443'
print("Producing messages to Kafka topic ...")
producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL")
for i in range(10):
message = {'value': i}
producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8'))
print("Consuming messages from Kafka topic ...")
consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True)
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value))
Enter fullscreen mode
Exit fullscreen mode
This is how you can connect to a remote Kafka cluster from your local machine. This is handy when you are developing locally and eventually deploying that to your OpenShift environment.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK