4

在kubernetes上通过Knative服务和FastAPI消费使用 Kafka事件

 1 year ago
source link: https://www.jdon.com/62555
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在kubernetes上通过Knative服务和FastAPI消费使用 Kafka事件


分享我使用Knative设置事件驱动架构的经验和工作流程。

我现在构建的大多数最近的应用程序都严重依赖于 Kafka 和 Kubernetes,长话短说,这最终会产生一个向主题发送事件的生产者和一个消费该事件的while循环。在 Kafka 中,您可以配置消息的自动提交偏移量,也可以在使用消息后手动提交。在大多数情况下,这些 while 循环是消耗同步/异步消息的大型进程,并且很难扩展。为了改进这个过程,我想介绍和讨论 Knative。

在本文中,我将重点介绍使用 Kafka 作为事件源、FastAPI (Python) 作为 Web 服务的 Knative Broker(Kafka 源和接收器)设置,当然还有 Kubernetes 作为跨多个主机管理容器化应用程序的系统(本文假设您已经对 Kafka 和 Kubernetes 有基本的了解)。

Knative
Knative 是一个用于构建无服务器和事件驱动应用程序的开源解决方案。由 Google 创建并移交给一个不断改进它的优秀团队。受 vmware、Google、RedHat、IBM 等公司信赖的软件。考虑到微服务、Kubernetes 和事件驱动方法的当前趋势,Knative 可能是您的完美选择。

项目分为两个主要模块:事件和服务。

Knative事件
它是一组工具,允许您使用事件驱动的架构来处理您的应用程序。多亏了有许多 API,它创建了将事件生产者路由到事件消费者(称为后来的接收器)的组件。它使用标准的 HTTP POST 请求在生产者和接收器之间发送这些事件。

稍后,这些接收器可以用作“桶”,事件将从这些桶中通过 HTTP 推送到您的应用程序。启用它的组件称为触发器,它们可以将给定的服务订阅到接收器,事件将由您的应用程序生成和使用。

我们所指的应用程序可能是一个简单的 Web 服务器,例如 k8s Deployment + Service,或者在本例中为 Knative Service。

Knative服务
它是一组对象(Kubernetes 自定义资源定义),用于定义和控制无服务器工作负载在集群上的行为方式。它全权负责为您设置、管理流量、Pod、扩展和修订 Kubernetes。主要用于避免耗时的操作资源、快速开发、自动缩放(包括缩放到零个 pod 以节省资源)和无聊的 Kafka 消费者循环。

v2-06e67ea62d65a62f7da4a7b2508cd6d0_720w.jpg?source=d16d100b

设置

Knative 服务

Kubernetes 资源
1、为服务模块安装 CRD:

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-crds.yaml

安装核心服务组件:

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-core.yaml

安装网络层(在这种情况下,我使用的是 kourier)——它需要连接到 kservice:

kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.7.0/kourier.yaml

通过运行以下命令将 Knative Serving 配置为默认使用 Kourier:

kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'

验证您的外部 IP:

kubectl --namespace kourier-system get service kourier

为了使服务对公共流量关闭并仅启用它私有,我们需要使用特殊设置标记我们的服务:networking.knative.dev/visibility: cluster-local

组件
Knative Service 运行一个普通的 web 应用程序,当然,我将使用一个简单的 FastAPI Python 应用程序来记录我们的通知事件:

# main.py
import logging
from typing import Dict
from fastapi import FastAPI, Request

app = FastAPI()


@app.post("/events/notifications")
async def root(request: Request) -> Dict[str, str]:
    event_data = await request.json()
    logging.info(event_data)
    return {"message": "ok"}

为 Knative Service 定义一个 Kubernetes 资源:

# my-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: my-notifications
  labels:
    networking.knative.dev/visibility: cluster-local
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/max-scale: "3"
        autoscaling.knative.dev/scale-to-zero: "false"
      labels:
        app: my-notifications
    spec:
      containers:
        - name: my-notifications
          resources:
            requests:
              memory: "200Mi"
              cpu: "200m"
            limits:
              memory: "400Mi"
              cpu: "400m"
          image: my-notifications-image
          imagePullPolicy: Always
          args: [ 'python', 'main.py' ]

记住要注释服务,autoscaling.knative.dev/scale-to-zero: "false"否则在没有流量的情况下,Knative 会杀死所有的 pod,你不会看到它正在运行。


请注意networking.knative.dev/visibility仅定义内部集群连接的标签。要验证您的网络是内部运行命令并检查它是否以svc.cluster.local[ docs ] 结尾:

kubectl get kservice my-notifications

NAME               URL                                                 LATESTCREATED            LATESTREADY              READY   REASON
my-notifications   http://my-notifications.default.svc.cluster.local   my-notifications-1b2ce   my-notifications-1b2ce   True

我还始终建议为您的应用程序设置准备就绪和活跃性探测器,以 ping 服务。由于时间关系,我这里略过。
将您的配置应用到 k8s 中:

kubectl apply -f my-service.yaml

验证服务器是否运行:

kubectl logs -l app=my-notifications

Kubernetes 资源
为事件模块安装 CRD:

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-crds.yaml

安装核心事件组件:

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-core.yaml

安装 apache Kafka 代理(负责事件路由):

kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.0/eventing-kafka-controller.yaml
kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.0/eventing-kafka-broker.yaml

组件
第一个配置是一个Broker k8s 自定义资源,它定义了一个用于收集事件池的事件网格。我们的 Knative Broker 类是Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config

正如您在上面可能注意到的,我们使用默认的 ConfigMap 进行定义,我们可以覆盖默认的或使用新的。最重要的是设置bootstrap.servers数据,确保它与运行 Kafka 的 URL 相同:

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: default
data:
  default.topic.partitions: '10'
  default.topic.replication.factor: '1'
  bootstrap.servers: '...kafka.svc.cluster.local:29092'

非常重要的通知是 Kafka 代理将创建一个默认主题(在我们的例子中knative-broker-default-default),来自我们的 Kafka 源的所有事件都将被复制到该主题。如果我们的代理路由来自多个主题的事件,则所有这些事件都将复制到这一主题。确保您的分区号符合您的需要。


如上所述,我们还需要一个KafkaSource资源。它将映射我们已经构建的 Kafka 主题中的事件并将它们推送到我们的代理接收器。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers:
    - ...kafka.svc.cluster.local:29092 # note the kafka namespace
  topics:
    - notifications
    - loggings
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

最后一步是配置一种机制,该机制将订阅通知服务并(仅通过 HTTP)从代理推送通知事件:

# triggers.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: notification-trigger
spec:
  broker: default
  filter:
    attributes:
      source: /apis/v1/namespaces/default/kafkasources/kafka-sourcenotifications
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: my-notifications
    uri: /events/notifications

最好为多个主题使用一个代理(KafkaSource 配置)。您可以使用filter配置根据类型或来源过滤特定事件,并订阅不同的服务或不同的 API 路由。归根结底,经纪人就是经纪人!


如果您关心每个分区的事件顺序,分区中的所有事件在成功消息后按顺序处理,请使用 config 注释您的触发器kafka.eventing.knative.dev/delivery.order: ordered。knative 文档中的更多详细信息


应用触发器资源:

kubectl apply -f triggers.yaml

验证触发器准备情况:

kubectl get Trigger

NAME                            BROKER    SUBSCRIBER_URI                                                                                    AGE     READY   REASON
notification-trigger            default   http://my-notification.default.svc.cluster.local/v1/user-notifications                            0d1h    True

确保 URI 与我们的服务 URI 相同,结尾为: svc.cluster.local。据我所知,您还需要集成像 istio 或 kourier 这样的网络层以供内部使用。

测试
最好的方法是简单地为您的 Kafka 主题生成事件。它们应该被汇集到我们定义的接收器中,然后通过触发器推送到 Knative 服务。如果显示事件日志,只需检查日志。
如果你使用的是 Confluent Kafka,你可以做简单的 Producer:

import uuid, json
from confluent_kafka import Producer


producer = Producer()
producer.produce(
    topic="notifications",
    value=json.dumps({"message": "test"}),
    key=str(uuid.uuid4()),
)
producer.poll(0)

然后,记录您的应用程序以查看事件是否到达:

kubectl logs -l app=my-notifications

如果您在事件流程中发现任何问题,请应用日志记录 ConfigMap 以获取更多调试信息Knative 文档

概括
您可以决定此工具是否适合您。在我看来,如果您需要快速开发应用程序并且没有太多时间维护 k8s 基础架构,这是一个很好的解决方案。
这是引入“无服务器”并自己维护它的好方法。下一个美好的未来是按需基础设施(自动扩展),包括归零,这可以节省一些钱,特别是对于初创公司和业余爱好者项目。可以肯定的是,它使部署和部署变得更轻松。开箱即用的 Knative 创建具有滚动部署的修订版,允许使用 3 行代码拆分流量/回​​滚。
 


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK