4

如何使用Docker内的kafka服务

 2 years ago
source link: https://blog.51cto.com/zq2599/5512823
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

  • 基于Docker可以很轻松的搭建一个kafka集群,其他机器上的应用如何使用这个kafka集群服务呢?本次实战就来解决这个问题。
  • 整个实战环境一共有三台机器,各自的职责如下图所示:
IP地址 身份 备注
192.168.1.102 消息生产者 这是个spring boot应用,<br>应用名称是kafka01103producer,<br>01103代表kafka版本0.11.0.3
192.168.1.101 Docker server 此机器上安装了Docker,并且运行了两个容器:zookeeper和kafka
192.168.1.104 消息消费者 这是个spring boot应用,<br>应用名称是kafka01103consumer,<br>01103代表kafka版本0.11.0.3
  • 整个环境的部署情况如下图:
    如何使用Docker内的kafka服务_spring
  1. 操作系统:Centos7
  2. docker:17.03.2-ce
  3. docker-compose:1.23.2
  4. kafka:0.11.0.3
  5. zookeeper:3.4.9
  6. JDK:1.8.0_191
  7. spring boot:1.5.9.RELEASE
  8. spring-kafka:1.3.8.RELEASE
  • 本次实战有几处重点需要注意:
  1. spring-kafka和kafka的版本匹配问题,请关注官方文档: https://spring.io/projects/spring-kafka
  2. kafka的kafka的advertised.listeners配置,应用通过此配置来连接broker;
  3. 应用所在服务器要配置host,才能连接到broker;
  • 接下来开始实战吧;

配置host

  • 为了让生产和消费消息的应用能够连接kafka成功,需要配置应用所在服务器的/etc/hosts文件,增加以下一行内容:
192.168.1.101 kafka1
  • 192.168.1.101是docker所在机器的IP地址;

  • 请注意,生产和消费消息的应用所在服务器都要做上述配置;

  • 可能有的读者在此会有疑问:为什么要配置host呢?我把kafka配置的advertised.listeners配置成kafka的IP地址不就行了么?这样的配置我试过,但是用kafka-console-producer.sh和kafka-console-consumer.sh连接kafka的时候会报错"LEADER_NOT_AVAILABLE"。

在docker上部署kafka

  • 在docker机器上编写docker-compose.yml文件,内容如下:
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka1:
    image: wurstmeister/kafka:2.11-0.11.0.3
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic001:2:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • 上述配置中有两处需要注意:

  • 第一,KAFKA_ADVERTISED_LISTENERS的配置,这个参数会写到kafka配置的advertised.listeners这一项中,应用会用来连接broker;

  • 第二,KAFKA_CREATE_TOPICS的配置,表示容器启动时会创建名为"topic001"的主题,并且partition等于2,副本为1;

  • 在docker-compose.yml所在目录执行命令docker-compose up -d,启动容器;

  • 执行命令docker ps,可见容器情况,kafka的容器名为temp_kafka1_1:

[root@hedy temp]# docker ps
CONTAINER ID        IMAGE                              COMMAND                  CREATED             STATUS              PORTS                                                NAMES
ba5374d6245c        wurstmeister/zookeeper             "/bin/sh -c '/usr/..."   About an hour ago   Up About an hour    22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   temp_zookeeper_1
2c58f46bb772        wurstmeister/kafka:2.11-0.11.0.3   "start-kafka.sh"         About an hour ago   Up About an hour    0.0.0.0:9092->9092/tcp                               temp_kafka1_1
  • 执行以下命令可以查看topic001的基本情况:
docker exec temp_kafka1_1 \
kafka-topics.sh \
--describe \
--topic topic001 \
--zookeeper zookeeper:2181
  • 看到的信息如下:
Topic:topic001	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: topic001	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001
	Topic: topic001	Partition: 1	Leader: 1001	Replicas: 1001	Isr: 1001
  • 接下来的实战是编写生产消息和消费消息的两个应用的源码,您可以选择直接从GitHub下载这两个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页  https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https)  https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh)  [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在kafka01103consumer和kafka01103producer这两个文件夹下,如下图红框所示:

    如何使用Docker内的kafka服务_kafka_02
  • 接下来开始编码:

开发生产消息的应用

  • 创建一个maven工程,pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.bolingcavalry</groupId>
    <artifactId>kafka01103producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka01103producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.8.RELEASE</version>
        </dependency>
		<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  • 再次强调spring-kafka版本和kafka版本的匹配很重要;
  • 配置文件application.properties内容:
#kafka相关配置
spring.kafka.bootstrap-servers=kafka1:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
  • 发送消息的业务代码只有一个MessageController类:
package com.bolingcavalry.kafka01103producer.controller;

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

/**
 * @Description: 接收web请求,发送消息到kafka
 * @author: willzhao E-mail: [email protected]
 * @date: 2019/1/1 11:44
 */
@RestController
public class MessageController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping(value = "/send/{name}/{message}", method = RequestMethod.GET)
    public @ResponseBody
    String send(@PathVariable("name") final String name, @PathVariable("message") final String message) {
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String timeStr = simpleDateFormat.format(new Date());

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name", name);
        jsonObject.put("message", message);
        jsonObject.put("time", timeStr);
        jsonObject.put("timeLong", System.currentTimeMillis());
        jsonObject.put("bizID", UUID.randomUUID());

        String sendMessage = jsonObject.toJSONString();

        ListenableFuture future = kafkaTemplate.send("topic001", sendMessage);
        future.addCallback(o -> System.out.println("send message success : " + sendMessage),
                throwable -> System.out.println("send message fail : " + sendMessage));

        return "send message to [" +  name + "] success (" + timeStr + ")";
    }
}

  • 编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests,即可在target目录下发现文件kafka01103producer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.102机器上;

  • 登录192.168.1.102,在文件kafka01103producer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103producer-0.0.1-SNAPSHOT.jar,即可启动生产消息的应用;

开发消费消息的应用

  • 创建一个maven工程,pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.bolingcavalry</groupId>
    <artifactId>kafka01103consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka01103consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  • 再次强调spring-kafka版本和kafka版本的匹配很重要;
  • 配置文件application.properties内容:
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.1.101:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
  • 消费消息的业务代码只有一个Consumer类,收到消息后,会将内容内容和消息的详情打印出来:
@Component
public class Consumer {
    @KafkaListener(topics = {"topic001"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            System.out.println("----------------- record =" + record);
            System.out.println("------------------ message =" + message);
        }
    }
}
  • 编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests,即可在target目录下发现文件kafka01103consumer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.104机器上;

  • 登录192.168.1.104,在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar,即可启动消费消息的应用,控制台输出如下:

2019-01-01 13:41:41.747  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2
2019-01-01 13:41:41.748  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2
2019-01-01 13:41:41.787  INFO 1422 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2019-01-01 13:41:41.912  INFO 1422 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.876 seconds (JVM running for 16.06)
2019-01-01 13:41:42.699  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.
2019-01-01 13:41:42.721  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 0
2019-01-01 13:41:42.723  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2019-01-01 13:41:42.724  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 0
2019-01-01 13:41:42.782  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 5
2019-01-01 13:41:42.788  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1, topic001-0] for group 0
2019-01-01 13:41:42.805  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1, topic001-0]
2019-01-01 13:48:00.938  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [topic001-1, topic001-0] for group 0
2019-01-01 13:48:00.939  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[topic001-1, topic001-0]
  • 上述内容显示了当前应用消费了两个partition;

  • 再启动一个同样的应用,这样每个应用负责一个parititon的消费,做法是在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port=8081,看看控制台的输出:

2019-01-01 13:47:58.068  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2
2019-01-01 13:47:58.069  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2
2019-01-01 13:47:58.103  INFO 1460 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2019-01-01 13:47:58.226  INFO 1460 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.513 seconds (JVM running for 14.442)
2019-01-01 13:47:59.007  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.
2019-01-01 13:47:59.030  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 0
2019-01-01 13:47:59.031  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2019-01-01 13:47:59.032  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 0
2019-01-01 13:48:00.967  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 6
2019-01-01 13:48:00.985  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-0] for group 0
2019-01-01 13:48:01.015  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-0]
  • 可见新的进程消费的是0号partition,此时再去看看先启动的进程的控制台,见到了新的日志,显示该进程只消费1号pairtition了:
2019-01-01 13:48:00.955  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 6
2019-01-01 13:48:00.960  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1] for group 0
2019-01-01 13:48:00.967  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1]

验证消息的生产和消费

  • 在浏览器输入以下地址:192.168.1.102:8080/send/Tom/hello
  • 浏览器显示返回的结果是:send message to [Tom] success (2019-01-01 13:58:08),表示操作成功;
  • 去检查两个消费者进程的控制台,发现其中一个成功的消费了消息,如下:
----------------- record =ConsumerRecord(topic = topic001, partition = 0, offset = 0, CreateTime = 1546351226016, serialized key size = -1, serialized value size = 133, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"})
------------------ message ={"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"}

  • 至此,外部应用使用基于Docker的kafa服务实战就完成了,如果您也在用Docker部署kafka服务,给外部应用使用,希望本文能给您提供一些参考;

欢迎关注51CTO博客:程序员欣宸

 学习路上,你不孤单,欣宸原创一路相伴…


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK