3

5种kafka消费端性能优化方法 - 华为云开发者联盟

 1 year ago
source link: https://www.cnblogs.com/huaweiyun/p/16716343.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
摘要:带你了解基于FusionInsight HD&MRS的5种kafka消费端性能优化方法。

本文分享自华为云社区《FusionInsight HD&MRSkafka消费端性能优化方法》,作者: 穿夹克的坏猴子。

kafka消费端性能优化主要从下面几个方面优化:

1.接口使用方面优化:

旧版本highlevel-consumer:偏移量信息存储在zookeeper,最大消费线程数与分区数量相同,不推荐

旧版本simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐

新版本highlevel-consumer:偏移量信息存储在kafka指定的topic中,默认情况下最大消费线程数与分区数量相同,可以实现多线程消费单分区,推荐

2.参数调优(以下参数需根据现网环境评估调至合适的值):

2.1 旧版本消费者(kafka old API)参数调优

fetch.message.max.bytes:该参数为一次性从kafka集群中获取的数据块大小。在升级到651版本后这个参数需要调大,否则容易出现获取数据限制的报错。建议调整大小不小于kafka的服务端参数message.max.bytes。

注意如何确认为旧版本:如果生产者的配置方式包含如下这些配置,则为旧版本:group.id/zookeeper.connect

2.2 新版本参数(kafka new API)参数调优

max.poll.records:意味消费者一次poll()操作,能够获取的最大数据量,调整这个值能提升吞吐量,于此同时也需要同步提升max.poll.interval.ms的参数大小。

fetch.max.bytes:意味server端可返回给consumer的最大数据大小,增加可以提升吞吐量,但是在客户端和服务端网络延迟比较大的环境下,建议可以减小该值,防止业务处理数据超时。

heartbeat.interval.ms:消费超时时间,consumer与kafka之间的超时时间,该参数不能超过session.timeout.ms,通常设置为session.timeout.ms的三分之一,默认值:3000。

max.partition.fetch.bytes:限制每个consumer发起fetch请求时候,读到数据(record)的限制,设置过大,consumer本地缓存的数据就会越多,可能影响内存的使用,默认值:1048576。

fetch.max.bytes:server端可返回给consumer的最大数据大小,数值可大于max.partition.fetch.bytes,一般设置为默认值即可,默认值:52428800

session.timeout.ms:使用consumer组管理offset时,consumer与broker之间的心跳超时时间,如果consumer消费数据的频率非常低,建议增大这个参数值,默认值:10000。

auto.offset.reset:消费过程中无法找到数据消费到的offset位置,所选择的消费策略,earliest:从头开始消费,可能会消费到重复数据,latest:从数据末尾开始消费,可能会丢失数据。默认值:earlist。

max.poll.interval.ms:消费者在每一轮poll() (拉取数据之间的最大时间延迟),如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将触发rebalance,以便将分区重新分配给别的成员。

如果,再两次poll之间需要添加过多复杂的,耗时的逻辑,需要延长这个时间,默认值:300s。

max.poll.records:消费者一次poll()操作,能够获取的最大数据量,增加这个参数值,会增加一次性拉取数据的数据量,确保拉取数据的时间,至少在max.poll.interval.ms规定的范围之内,默认值:500。

2.3 Simpleconsumer参数调优

simpleconsumer在初始化阶段需要传一个fetchsize的参数,比如:consumer=new SimpleConsumer(leaderBroker,a_port,100000,64*1024,clientName)中64*1024,该参数表示simpleconsumer一次性获取的数据大小,如果该值过大则可能会导致request时间过长,使用过程中应该降低这个值,保证消费频率。

使用SimpleConsumer的核心需求是:多线程消费单个分区,以达到提升性能的要求,如果没有这样需求,不建议使用这个这种消费方式

3.消费端频繁rebalance导致性能下降调优:

3.1因业务处理能力不足导致的:

session.timout.ms控制心跳超时时间。

heartbeat.interval.ms控制心跳发送频率,建议该值不超过session.timout.ms的三分之一。

max.poll.interval.ms控制每次poll的间隔,时间=获取数据的时间+处理数据的时间,如果max.poll.records设定的值在max.poll.interval.ms指定的时间内没有处理完成会触发rebalance,这里给出一个相对较为合理的配置,建议在预计的处理时间的基础上再加1分钟。

max.poll.records 每个批次处理的数据条数,默认为500条。如果处理能力较低,建议可以减小这个值。

3.2 非正常消费者频繁的访问kafka集群导致频繁rebalance:

收集kafka-request.log,查看异常的topic有哪些客户端节点在消费,cat kafka-request.* | grep “topic=topicName” | grep “apikey=FETCH” | awk –F’from connection’ ‘{print $2}’ | awk –F’;’ ‘{print $1}’ | awk –F’-’ ‘{print $2}’ | awk –F’:’ ‘{print $1}’ | sort | uniq –c | sort -nr ,找出不应该产生消费行为的节点,停止异常节点上消费者

4.版本引发性能下降优化

FI 8.0.2版本之前kafka SimpleAclAuthorizer鉴权异常导致性能下降,8.0.2版本在使用非安全端口(21005或者9092端口)时会出现集群性能下降的问题,表现:kafka-root.log中出现大量ExitcodeException:id:Default#Principal:no such user报错。

解决办法:升级到FI 8023以上版本。

临时规避办法:业务侧使用21007端口访问kafka,去掉鉴权插件即allow.everyone.if.no.acl.found=true,将以下kafka服务端配置置为空:authorizer.class.name=。

5.FI 6513~6516版本的内核问题引发的性能异常

6513版本在kafka引入社区的的lazy index功能后,在新的segment创建的过程中可能会导致并发创建失败的问题,常见的报错(server.log中)如以下两种类型:

(1)java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code;

(2)java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index;

当出现以上两种类型的报错的时候可以断定是版本问题导致,问题预警如:https://support.huawei.com/enterprise/zh/bulletins-product/ENEWS2000007844;
解决方案:升级到6517版本以上版本或者打入紧急补丁:https://support.huawei.com/enterprise/zh/cloud-computing/fusioninsight-hd-pid-21110924/software/251482609?idAbsPath=fixnode01%7C7919749%7C7941815%7C19942925%7C250430185%7C21110924;

临时规避方案:重启异常的broker实例。

点击关注,第一时间了解华为云新鲜技术~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK