23

kafka可插拔增强如何实现?

 4 years ago
source link: http://www.cnblogs.com/snidget/p/12836056.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

nU7JJr3.png!web

导弹拦截,精准防御。

背景

拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;

类比springMVC的拦截器:

FRVbuaI.png!web

N7jymuU.jpg!web

这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;

kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。

配置方法:配置参数

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

eE3eem7.jpg!web

注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;

zQreAzN.jpg!web

通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |

生产者拦截器

拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor

AfyMv2v.jpg!web

消费者拦截器

org.apache.kafka.clients.consumer.ConsumerInterceptor

eqQFjiA.jpg!web

实操

实现端到端的性能监控:

处理过程:

eEJBb2v.jpg!web

生产者代码:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


    private Jedis jedis; // 省略Jedis初始化


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }

消费者代码:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }


    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs)

配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。

小结

类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。

对Kafka进行一些可插拔的功能增强可以通过拦截器实现。

本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。

NRrYFfQ.png!web

原创不易,关注诚可贵,转发价更高!转载请注明出处,让我们互通有无,共同进步,欢迎沟通交流。

我会持续分享Java软件编程知识和程序员发展职业之路,欢迎关注,我整理了这些年编程学习的各种资源,关注公众号‘李福春持续输出’,发送'学习资料'分享给你!

jaEb63b.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK