7

聊聊golang的zap的ZapKafkaWriter

 3 years ago
source link: https://studygolang.com/articles/32089
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文主要研究一下golang的zap的ZapKafkaWriter

ZapKafkaWriter

package logger

import (
    "errors"
    "sync"
    "sync/atomic"
    "syscall"
)

// ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka
type ZapKafkaWriter struct {
    kp        *KafkaProducer
    ce        *CloudEvents
    closed    int32          // Nonzero if closing, must access atomically
    pendingWg sync.WaitGroup // WaitGroup for pending messages
    closeMut  sync.Mutex
}

// newZapKafkaWriter returns a kafka io.writer instance
func newZapKafkaWriter(
    kpCfg ProducerConfiguration, cloudEvents *CloudEvents,
    ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {

    // create an async producer
    kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)
    if err != nil {
        return nil, err
    }

    zw := &ZapKafkaWriter{
        kp: kp,
        ce: cloudEvents,
    }
    return zw, nil
}

ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter方法根据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创建KafkaProducer,然后根据KafkaProducer来创建ZapKafkaWriter

zapcore.WriteSyncer

// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well
func (zw *ZapKafkaWriter) Sync() error {
    return nil
}

// Write sends byte slices to Kafka ignoring error responses (Thread-safe)
// Write might block if the Input() channel of the AsyncProducer is full
func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {
    if zw.Closed() {
        return 0, syscall.EINVAL
    }

    if zw.kp.producer == nil {
        return 0, errors.New("No producer defined")
    }

    zw.pendingWg.Add(1)
    defer zw.pendingWg.Done()

    err := zw.kp.sendMessage(msg)
    return len(msg), err
}

// Closed returns true if the writer is closed, false otherwise (Thread-safe)
func (zw *ZapKafkaWriter) Closed() bool {
    return atomic.LoadInt32(&zw.closed) != 0
}

// Close must be called when the writer is no longer needed (Thread-safe)
func (zw *ZapKafkaWriter) Close() (err error) {
    zw.closeMut.Lock()
    defer zw.closeMut.Unlock()

    if zw.Closed() {
        return syscall.EINVAL
    }

    atomic.StoreInt32(&zw.closed, 1)

    zw.pendingWg.Wait()
    return nil
}

ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write方法使用KafkaProducer发送消息,其Sync方法目前不做任何操作,它还提供了Close方法,也就是也实现了Sink接口

小结

WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。

doc

有疑问加站长微信联系(非本文作者)

eUjI7rn.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK