聊聊golang的zap的ZapKafkaWriter
source link: https://studygolang.com/articles/32089
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
序
本文主要研究一下golang的zap的ZapKafkaWriter
ZapKafkaWriter
package logger import ( "errors" "sync" "sync/atomic" "syscall" ) // ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka type ZapKafkaWriter struct { kp *KafkaProducer ce *CloudEvents closed int32 // Nonzero if closing, must access atomically pendingWg sync.WaitGroup // WaitGroup for pending messages closeMut sync.Mutex } // newZapKafkaWriter returns a kafka io.writer instance func newZapKafkaWriter( kpCfg ProducerConfiguration, cloudEvents *CloudEvents, ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) { // create an async producer kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg) if err != nil { return nil, err } zw := &ZapKafkaWriter{ kp: kp, ce: cloudEvents, } return zw, nil }
ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter方法根据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创建KafkaProducer,然后根据KafkaProducer来创建ZapKafkaWriter
zapcore.WriteSyncer
// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well func (zw *ZapKafkaWriter) Sync() error { return nil } // Write sends byte slices to Kafka ignoring error responses (Thread-safe) // Write might block if the Input() channel of the AsyncProducer is full func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) { if zw.Closed() { return 0, syscall.EINVAL } if zw.kp.producer == nil { return 0, errors.New("No producer defined") } zw.pendingWg.Add(1) defer zw.pendingWg.Done() err := zw.kp.sendMessage(msg) return len(msg), err } // Closed returns true if the writer is closed, false otherwise (Thread-safe) func (zw *ZapKafkaWriter) Closed() bool { return atomic.LoadInt32(&zw.closed) != 0 } // Close must be called when the writer is no longer needed (Thread-safe) func (zw *ZapKafkaWriter) Close() (err error) { zw.closeMut.Lock() defer zw.closeMut.Unlock() if zw.Closed() { return syscall.EINVAL } atomic.StoreInt32(&zw.closed, 1) zw.pendingWg.Wait() return nil }
ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write方法使用KafkaProducer发送消息,其Sync方法目前不做任何操作,它还提供了Close方法,也就是也实现了Sink接口
小结
WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。
doc
有疑问加站长微信联系(非本文作者)
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK