8

技术分享| 消息队列Kafka群集部署

 1 year ago
source link: https://blog.51cto.com/u_15232255/5767140
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

技术分享| 消息队列Kafka群集部署

推荐 原创

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

2、主要应用场景是
  • 日志收集:可以用kafka收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者。

  • 消息系统:解耦生产和消费者,缓存消息。

  • 用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。

  • 运营指标:可以用于监控各种数据。

3、Kafka主要设计目标如下:
  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持线水平扩展
4、基本概念

kafka是一个分布式的分区的消息,提供消息系统应该具备的功能。

名称 解释
broker 消息中间件处理节点,一个broker就是一个kafka节点,多个broker构成一个kafka集群。
topic kafka根据消息进行分类,发布到kafka的每个消息都有一个对应的topic
producer 消息生产(发布)者
consumer 消息消费(订阅)者
consumergroup 消息订阅集群,一个消息可以被多个consumergroup消费,但是一个consumergroup只有一个consumer可以消费消息。

二、环境准备

当前环境:centos7.9三台
软件版本:kafka_2.13-3.0.0
环境目录:/usr/local/kafka

下载kafka;包含了zookeeper(三台机器都要操作)

[root@localhost opt]# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz
[root@localhost opt]# tar zxvf kafka_2.13-3.0.0.tgz
[root@localhost opt]# mv kafka_2.13-3.0.0 /usr/local/kafka

配置环境变量(三台机器都要操作)

[root@localhost opt]# vim /etc/profile
## 末尾添加
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$PATH:$ZOOKEEPER_HOME/bin

## 加载环境变量
[root@localhost opt]# source /etc/profile

三、Zookeeper的安装

由于Kafka分区位置和主题配置之类的元数据都存储在ZooKeeper群集中,所以要搭建kafka集群环境必须先安装zookeeper群集

zookeeper配置文件修改(三台配置一样)

[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# vim config/zookeeper.properties

dataDir=/tmp/zookeeper				## 主要用来配置zookeeper server数据的存放路径
clientPort=2181						## zookeeper服务端口
tickTime=2000						## zookeeper客户端与服务器之间的心跳时间。默认值为2000毫秒,即2秒
initLimit=10						## Follower连接到Leader并同步数据的最大时间
syncLimit=5							## Follower同步Leader的最大时间
maxClientCnxns=0					## 客户端最大连接数,设置0或者不设置表示取消连接数限制
admin.enableServer=false			## 禁用 Admin Server
server.0=192.168.1.13:2888:3888		## 配置zookeeper群集
server.1=192.168.1.108:2888:3888
server.2=192.168.1.143:2888:3888

创建myid文件(三台机器都要操作)

192.168.1.13机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "0" > /tmp/zookeeper/myid

192.168.1.108机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "1" > /tmp/zookeeper/myid

192.168.1.143机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "2" > /tmp/zookeeper/myid

创建system启动文件

[root@localhost kafka]# vim /usr/lib/systemd/system/zookeeper.service

[Unit]
Description=zookeeper
After=network.target
Wants=network.target

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin"
User=root
Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
Restart=always

[Install]
WantedBy=multi-user.target

启动zookeeper

[root@localhost kafka]# systemctl enable zookeeper.service
[root@localhost kafka]# systemctl start zookeeper.service

四、Kafka的安装

kafka配置文件修改

[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# vim config/server.properties (直接复制到另外两台)

broker.id=1												## 第一台1,第二台为2 第三台为3
listeners=PLAINTEXT://:9092								## 监听IP地址和端口
advertised.listeners=PLAINTEXT://192.168.1.13:9092		## 配置kafka的broker ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=404857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.13:2181,192.168.1.108:2181,192.168.1.143:2181	##连接zookeeper
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
## 修改192.168.1.108的配置
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.108:9092

修改192.168.1.143的配置
broker.id=3
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.143:9092
## 修改JVM参数 (根据需求修改)
kafka_heap_opts:指定堆大小,默认是1GB

[root@localhost kafka]# vim bin/kafka-server-start.sh 
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G -XX:MaxDirectMemorySize=1G"			## 修改这一行即可
fi

创建system启动文件

[root@localhost kafka]# vim /usr/lib/systemd/system/kafka.service

[Unit]
Description=kafka
After=network.target
Wants=network.target

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin"
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
Restart=always

[Install]
WantedBy=multi-user.target

启动zookeeper
[root@localhost kafka]# systemctl enable kafka.service
[root@localhost kafka]# systemctl start kafka.service

服务启动不起来 找不到Java

/usr/local/kafka/bin/kafka-run-class.sh: line 309: exec: java: not found

第一种: 修改配置文件中的Java环境

## 修改kafka-run-class.sh 配置文件
[root@localhost kafka]# vim bin/kafka-run-class.sh
## 找到如下:
# Which java to use
if [ -z "$JAVA_HOME" ]; then
  JAVA="/usr/local/jdk/bin/java"   				## 修改到绝对路径
else
  JAVA="$JAVA_HOME/bin/java"
fi

第二种: 做个软连接

[root@localhost kafka]# echo $PATH
/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin:/home/laserx/.local/bin:/home/laserx/bin

[root@localhost kafka]# ln -s /usr/local/jdk1.8.0_251/bin/java /usr/local/bin/java

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK