2

在K8S中部署SpringBoot项目

 8 months ago
source link: https://jasonkayzk.github.io/2023/12/19/%E5%9C%A8K8S%E4%B8%AD%E9%83%A8%E7%BD%B2SpringBoot%E9%A1%B9%E7%9B%AE/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在前几篇文章中,我们在Kubernetes中部署了StorageClass、StatefulSet等等;

在这篇文章中,我们会部署实际的SpringBoot项目,来利用这些有状态的服务;

在K8S中部署SpringBoot项目

部署第一个SpringBoot-Web项目

编写服务

作为开始,我们先来部署一个最简单的 SpringBoot HelloWorld 级别的项目;

使用K8S部署Go项目,参考:

代码非常简单:

src/main/java/io/jasonkayzk/github/controller/HelloController.java

@RestController
public class HelloController {
    @GetMapping("/")
    public String index() throws UnknownHostException {
        return "Greetings from Spring Boot on: " + InetAddress.getLocalHost().getHostName() + "\n";
    }
}

访问 / 就会输出当前机器的 HostName;

构建镜像

打包项目:

mvn clean package

打包结果输出到 target 下面:

➜  target git:(proj/springboot-deploy-demo) tree -L 1
.
├── ch01-hello-1.0.0.jar
├── ch01-hello-1.0.0.jar.original
├── classes
├── generated-sources
├── maven-archiver
└── maven-status

5 directories, 2 files

编写 Dockerfile:

FROM openjdk:8-jre-slim
MAINTAINER [email protected]
RUN mkdir /app
COPY target/*.jar /app/app.jar
EXPOSE 8080
ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -jar /app/app.jar" ]

在 Dockerfile 中我们定义了容器镜像,为 OpenJDK 官方提供的 JRE-8;

然后创建了 /app 目录,并将打包的 jar 复制进镜像中:/app/app.jar

最后对外暴露服务端口 8080(注意:这个是在 K8S 中为Pod在集群中访问的端口,并非K8S对外的端口!

最后使用 java -jar 启动服务;

Dockerfile 编写完成后,可以打包并上传镜像了:

docker build -t jasonkay/java-deploy-app:v1.0.0 .

docker push jasonkay/java-deploy-app:v1.0.0

实际开发中,上面几步基本上是由 CI 组件做的,比如:Github Actions、Jenkins、Spinnaker 等等;

部署服务到 K8S 中

编写 YAML 配置:

deploy/deployment.yaml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: java-deploy-app
  namespace: workspace # 声明工作空间,默认为default
spec:
  replicas: 3
  selector:
    matchLabels:
      name: java-deploy-app
  template:
    metadata:
      labels:
        name: java-deploy-app
    spec:
      containers:
        - name: java-deploy-container
          image: jasonkay/java-deploy-app:v1.0.0
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8080 # containerPort是声明容器内部的port

---
apiVersion: v1
kind: Service
metadata:
  name: java-deploy-app-service
  namespace: workspace # 声明工作空间,默认为default
spec:
  type: NodePort
  ports:
    - name: http
      port: 18888 # Service暴露在cluster-ip上的端口,通过<cluster-ip>:port访问服务,通过此端口集群内的服务可以相互访问
      targetPort: 8080 # Pod的外部访问端口,port和nodePort的数据通过这个端口进入到Pod内部,Pod里面的containers的端口映射到这个端口,提供服务
      nodePort: 32080 # Node节点的端口,<nodeIP>:nodePort 是提供给集群外部客户访问service的入口
  selector:
    name: java-deploy-app

主要是 Deployment 和 Service 两个部分;

Deployment 中定义:

  • 部署名称;
  • 部署用到的镜像;
  • 部署镜像拉取策略;
  • 容器暴露的端口;

Service 定义:

  • 服务暴露的方式:NodePort;
  • 暴露的端口配置:
    • port: 18888:Service暴露在cluster-ip上的端口,通过<cluster-ip>:port访问服务,通过此端口集群内的服务可以相互访问;
    • targetPort: 8080:Pod的外部访问端口,port和nodePort的数据通过这个端口进入到Pod内部,Pod里面的containers的端口映射到这个端口,提供服务;
    • nodePort: 32080:Node节点的端口,<nodeIP>:nodePort 是提供给集群外部客户访问service的入口;

端口的配置比较多,不要搞混了!

定义好之后,就可以部署了:

kubectl apply -f deploy/deployment.yaml

查看结果:

➜  kubernetes-learn git:(proj/springboot-deploy-demo) k get pods -n workspace

NAME                               READY   STATUS    RESTARTS      AGE
java-deploy-app-7475c6f558-mzcbq   1/1     Running   0 (95m ago)   16h
java-deploy-app-7475c6f558-x7prr   1/1     Running   0 (96m ago)   16h
java-deploy-app-7475c6f558-zvcc7   1/1     Running   0 (94m ago)   16h

测试服务

最后通过 Curl 来测试我们的服务:

# curl <k8s-node-ip>:32080

curl 192.168.31.201:32080
Greetings from Spring Boot on: java-deploy-app-7475c6f558-zvcc7

curl 192.168.31.202:32080
Greetings from Spring Boot on: java-deploy-app-7475c6f558-zvcc7

curl 192.168.31.203:32080
Greetings from Spring Boot on: java-deploy-app-7475c6f558-zvcc7

curl 192.168.31.203:32080
Greetings from Spring Boot on: java-deploy-app-7475c6f558-mzcbq

curl 192.168.31.203:32080
Greetings from Spring Boot on: java-deploy-app-7475c6f558-x7prr

连接有状态StatefulSet服务

上一小节中,我们在 K8S 中部署了一个非常简单的服务;

接下来,将会将我们的服务连接到之前我们部署的 Kafka 集群;

EmbeddedKafka

在开始之前,先说一下,Kafka 为开发者提供了 @EmbeddedKafka 测试;

在开发过程中下,我们可以使用它来测试我们的代码;

ch02-kafka-integrate/src/test/java/ApplicationTests.java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 3, ports = {9092, 9093, 9094})
public class ApplicationTests {
    @Test
    public void contextLoads() throws IOException {
        System.in.read();
    }
}

只需要声明:@EmbeddedKafka(count = 3, ports = {9092, 9093, 9094}) 即可!

执行测试即可创建 Kafka 集群!

创建环境配置文件

在 SpringBoot 项目中,基本上都会用 application-{env} 来区分环境;

总配置入口:

ch02-kafka-integrate/src/main/resources/application.yaml

spring:
  profiles:
    active: dev

Dev 环境:

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 'localhost:9092'
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      auto-offset-reset: earliest

# custom kafka topic config
kafka:
  sasl-enable: false
  topic:
    my-topic: my-topic
    my-topic2: my-topic2
  topics:
    - name: topic0
      num-partitions: 3
      replication-factor: 1
    - name: topic1
      num-partitions: 1
      replication-factor: 1
    - name: topic2
      num-partitions: 2
      replication-factor: 1

上面的配置文件声明了开发环境的配置:

  • 服务端口:8080(默认);
  • Spring Kafka 配置:
    • bootstrap-servers: 'localhost:9092':配置地址;
    • 生产者:
      • value-serializer:Value 序列化方式;
    • 消费者:
      • auto-offset-reset: earliest:从最早的消息开始消费;

kafka 中:自定义了我们的 Topic,这里会用两种方法创建 Topic:

  • sasl-enable: false 表示 Kafka 连接使用 SASL 认证,开发环境连接不需要,但是在连接部署在 K8S 中的 Kafka 集群需要;

Prod 环境:

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 'kafka-broker-0.kafka-broker-headless.workspace.svc.cluster.local:9092,kafka-broker-1.kafka-broker-headless.workspace.svc.cluster.local:9092,kafka-broker-2.kafka-broker-headless.workspace.svc.cluster.local:9092'

# custom kafka topic config
kafka:
  sasl-enable: true
  topic:
    my-topic: my-topic
    my-topic2: my-topic2
  topics:
    - name: topic0
      num-partitions: 3
      replication-factor: 1
    - name: topic1
      num-partitions: 1
      replication-factor: 1
    - name: topic2
      num-partitions: 2
      replication-factor: 1

生产环境中的配置主要是通过 Spring 提供的配置类的形式配置;

这里只配置了 Kafka server 在集群中的地址;

这里能够使用 Kafka 的服务名是因为我们的应用和 Kafka 服务在同一个 K8S 集群中;

同时,将 sasl-enable 开启;

创建Kafka配置类

Kafka 整体配置:

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/configure/kafka/KafkaConfigure.java

package io.jasonkayzk.github.configure.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zk
 */
@Configuration
public class KafkaConfigure {

    @Value("${spring.kafka.bootstrap-servers}")
    private List<String> bootstrapAddresses;

    @Value("${kafka.sasl-enable}")
    private boolean saslEnable;

    @Value("${KAFKA_USER}")
    private String kafkaUsername;

    @Value("${KAFKA_PASSWORD}")
    private String kafkaPassword;

    /**
     * Use Config in application.yaml
     */
    @Value("${kafka.topic.my-topic}")
    String myTopic;
    @Value("${kafka.topic.my-topic2}")
    String myTopic2;

    /**
     * Kafka connection config
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>(8);
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);

        if (saslEnable) {
            configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

            configs.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), kafkaUsername, kafkaPassword
            ));
        }

        return new KafkaAdmin(configs);
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        Map<String, Object> configs = new HashMap<>(8);
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        if (saslEnable) {
            configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

            configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            configs.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), kafkaUsername, kafkaPassword
            ));
        }

        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean(name = "bookContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> configs = new HashMap<>(8);
        configs.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        if (saslEnable) {
            configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            configs.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "%s required username='%s' " + "password='%s';", PlainLoginModule.class.getName(), kafkaUsername, kafkaPassword
            ));
        }

        return new DefaultKafkaConsumerFactory<>(configs);
    }

    /**
     * JSON消息转换器
     */
    @Bean
    public RecordMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    /**
     * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
     */
    @Bean
    public NewTopic myTopic() {
        return new NewTopic(myTopic, 2, (short) 1);
    }

    @Bean
    public NewTopic myTopic2() {
        return new NewTopic(myTopic2, 1, (short) 1);
    }
}

配置属性:

  • bootstrapAddresses:Kafka 服务地址,由 Yaml 配置提供;
  • saslEnable:Kafka 服务连接是否开启SASL,由 Yaml 配置提供;
  • myTopic、myTopic2:自定义topic名称,由 Yaml 配置提供;
  • kafkaUsername:Kafka 连接 Username,由环境变量提供;
  • kafkaPassword:Kafka 连接 Password,由环境变量提供;

注:Kafka 部署在 K8S 后,集群的连接密码会存储在 Secrets 中;

在实际使用时,我们可以将 Secrets 中的变量挂载到我们 Pod 的环境变量中来使用!

此外配置了:

  • kafkaAdmin:Kafka 管理总配置;
  • producerFactory:生产者配置;
  • consumerFactory:消费者配置;
  • kafkaListenerContainerFactory:Kafka Listener 配置;
  • jsonConverter:消息序列化转换器;

此外还创建了两个 Bean:myTopic、myTopic2

通过注入一个 NewTopic 类型的 Bean 来直接创建 topic,如果 topic 已存在,则会忽略;

Kafka Topic 配置:

除了通过注入一个 NewTopic 类型的 Bean 来创建 topic 的方式,还可以使用配置类来创建;

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/configure/kafka/KafkaTopicConfigure.java

package io.jasonkayzk.github.configure.kafka;

import lombok.Data;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.support.GenericWebApplicationContext;

import javax.annotation.PostConstruct;
import java.util.List;

@Configuration
public class KafkaTopicConfigure {

    private final TopicConfiguration configuration;

    private final GenericWebApplicationContext context;

    public KafkaTopicConfigure(TopicConfiguration configuration, GenericWebApplicationContext genericContext) {
        this.configuration = configuration;
        this.context = genericContext;
    }

    @PostConstruct
    public void init() {
        initializeBeans(configuration.getTopics());
    }

    private void initializeBeans(List<TopicConfiguration.Topic> topics) {
        topics.forEach(t -> context.registerBean(t.name, NewTopic.class, t::toNewTopic));
    }
}

@Data
@Configuration
@ConfigurationProperties(prefix = "kafka")
class TopicConfiguration {
    private List<Topic> topics;

    @Data
    static class Topic {
        String name;
        Integer numPartitions = 3;
        Short replicationFactor = 1;

        NewTopic toNewTopic() {
            return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
        }
    }
}

TopicConfiguration 类用来解析 kafka.topics 配置;

在 KafkaTopicConfigure 中,通过 @PostConstruct 来创建多个 NewTopic;

编写生产者、消费者

编写 Book 模型:

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/entity/Book.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Book {
    private Long id;

    private String name;
}

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/kafka/BookProducer.java

@Service
public class BookProducer {

    private static final Logger logger = LoggerFactory.getLogger(BookProducer.class);

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public BookProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, Object o) {
        // 分区编号最好为 null,交给 kafka 自己去分配
        ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o);

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(producerRecord);
        future.addCallback(result -> {
                    if (result != null) {
                        logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition());
                    }
                },
                ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
    }
}

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/kafka/BookConsumer.java

@Service
public class BookConsumer {

    @Value("${kafka.topic.my-topic}")
    private String myTopic;
    @Value("${kafka.topic.my-topic2}")
    private String myTopic2;

    private final Logger logger = LoggerFactory.getLogger(BookConsumer.class);

    private final ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1", containerFactory = "bookContainerFactory")
    public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {
        try {
            Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);
            logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());
        } catch (JsonProcessingException e) {
            logger.error(e.toString());
        }
    }

    @KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2", containerFactory = "bookContainerFactory")
    public void consumeMessage2(Book book) {
        logger.info("消费者消费topic:{} 的消息 -> {}", myTopic2, book.toString());
    }
}

逻辑比较简单,这里不再赘述;

编写服务

最后来编写一个 Web 接口:

ch02-kafka-integrate/src/main/java/io/jasonkayzk/github/controller/BookController.java

@RestController
@RequestMapping(value = "/book")
public class BookController {

    @Value("${kafka.topic.my-topic}")
    String myTopic;
    @Value("${kafka.topic.my-topic2}")
    String myTopic2;

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final BookProducer producer;

    private final AtomicLong atomicLong = new AtomicLong();

    BookController(BookProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public void sendMessageToKafkaTopic(@RequestParam("name") String name) throws JsonProcessingException {
        this.producer.sendMessage(myTopic, objectMapper.writeValueAsString(new Book(atomicLong.addAndGet(1), name)));
        this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));
    }
}

接收到 /book 的 Post 请求之后,分别向 myTopic、myTopic2 发送一条消息;

构建镜像

创建 Dockerfile:

ch02-kafka-integrate/Dockerfile

FROM openjdk:8-jre-slim
MAINTAINER [email protected]
RUN mkdir /app
COPY target/*.jar /app/app.jar
EXPOSE 8080
ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -jar /app/app.jar --spring.profiles.active=prod" ]

这里需要注意,我们需要指定 Spring Profile 使用 prod,来使用生产环境的配置!

构建镜像:

docker build -t jasonkay/java-deploy-app:v1.0.1 .

docker push jasonkay/java-deploy-app:v1.0.1

部署服务到K8S集群中

编写 Deployment:

ch02-kafka-integrate/deploy/deployment.yaml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: java-deploy-app
  namespace: workspace # 声明工作空间,默认为default
spec:
  replicas: 3
  selector:
    matchLabels:
      name: java-deploy-app
  template:
    metadata:
      labels:
        name: java-deploy-app
    spec:
      containers:
        - name: java-deploy-container
          image: jasonkay/java-deploy-app:v1.0.1
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8080 # containerPort是声明容器内部的port
          env: # 将Secrets挂载为环境变量
            - name: KAFKA_USER
              value: 'user1'
            - name: KAFKA_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: kafka-user-passwords
                  key: client-passwords

---
apiVersion: v1
kind: Service
metadata:
  name: java-deploy-app-service
  namespace: workspace # 声明工作空间,默认为default
spec:
  type: NodePort
  ports:
    - name: http
      port: 18888 # Service暴露在cluster-ip上的端口,通过<cluster-ip>:port访问服务,通过此端口集群内的服务可以相互访问
      targetPort: 8080 # Pod的外部访问端口,port和nodePort的数据通过这个端口进入到Pod内部,Pod里面的containers的端口映射到这个端口,提供服务
      nodePort: 32080 # Node节点的端口,<nodeIP>:nodePort 是提供给集群外部客户访问service的入口
  selector:
    name: java-deploy-app

Service 的部分没有改变;

在 Deploment 部分,我们通过 env 将 Kafka 连接的 Secrets 配置挂载到了容器的环境变量中,这样上面的 Kafka 配置类就能获取到相关配置并正确连接!

kubectl apply -f ch02-kafka-integrate/deploy/deployment.yaml

服务测试

通过 Curl 命令测试服务:

curl -X POST -F 'name=Java' http://<k8s-node-ip>:32080/book

# 查看日志
k logs -n workspace java-deploy-app-578668888d-5m5dm | tail -n 20

2023-12-20 00:35:23.090  INFO 7 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group1: partitions assigned: [my-topic-0]
2023-12-20 00:35:23.102  INFO 7 --- [ntainer#1-0-C-1] io.jasonkayzk.github.kafka.BookConsumer  : 消费者消费topic:my-topic partition:0的消息 -> Book(id=1, name=Java)
2023-12-20 00:35:23.102  INFO 7 --- [ntainer#1-0-C-1] io.jasonkayzk.github.kafka.BookConsumer  : 消费者消费topic:my-topic partition:0的消息 -> Book(id=1, name=Java)
2023-12-20 00:35:23.102  INFO 7 --- [ntainer#1-0-C-1] io.jasonkayzk.github.kafka.BookConsumer  : 消费者消费topic:my-topic partition:0的消息 -> Book(id=1, name=Java)


k logs -n workspace java-deploy-app-578668888d-7hv9r | tail -n 20

2023-12-20 00:35:10.977  INFO 7 --- [ad | producer-1] io.jasonkayzk.github.kafka.BookProducer  : 生产者成功发送消息到topic:my-topic2 partition:0的消息

附录


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK