2

聊聊zerolog的diode.Writer

 3 years ago
source link: https://segmentfault.com/a/1190000038836241
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文主要研究一下zerolog的diode.Writer

diode.Writer

github.com/rs/[email protected]/diode/diode.go

// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
// non-blocking and thread safe.
type Writer struct {
    w    io.Writer
    d    diodeFetcher
    c    context.CancelFunc
    done chan struct{}
}

func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer {
    ctx, cancel := context.WithCancel(context.Background())
    dw := Writer{
        w:    w,
        c:    cancel,
        done: make(chan struct{}),
    }
    if f == nil {
        f = func(int) {}
    }
    d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
    if pollInterval > 0 {
        dw.d = diodes.NewPoller(d,
            diodes.WithPollingInterval(pollInterval),
            diodes.WithPollingContext(ctx))
    } else {
        dw.d = diodes.NewWaiter(d,
            diodes.WithWaiterContext(ctx))
    }
    go dw.poll()
    return dw
}

diode.Writer是一个lock-free,non-blocking及thread safe的Writer;它借助了diodes来实现;NewWriter会创建diode.Writer,并启动dw.poll()

github.com/rs/[email protected]/diode/diode.go

func (dw Writer) poll() {
    defer close(dw.done)
    for {
        d := dw.d.Next()
        if d == nil {
            return
        }
        p := *(*[]byte)(d)
        dw.w.Write(p)

        // Proper usage of a sync.Pool requires each entry to have approximately
        // the same memory cost. To obtain this property when the stored type
        // contains a variably-sized buffer, we add a hard limit on the maximum buffer
        // to place back in the pool.
        //
        // See https://golang.org/issue/23199
        const maxSize = 1 << 16 // 64KiB
        if cap(p) <= maxSize {
            bufPool.Put(p[:0])
        }
    }
}

poll方法使用for循环执行dw.d.Next()及dw.w.Write(p)

diodeFetcher

github.com/rs/[email protected]/diode/diode.go

type diodeFetcher interface {
    diodes.Diode
    Next() diodes.GenericDataType
}

// Diode is any implementation of a diode.
type Diode interface {
    Set(GenericDataType)
    TryNext() (GenericDataType, bool)
}

diodeFetcher接口内嵌了Diode接口,定义了Next方法

github.com/rs/[email protected]/diode/internal/diodes/poller.go

// Next polls the diode until data is available or until the context is done.
// If the context is done, then nil will be returned.
func (p *Poller) Next() GenericDataType {
    for {
        data, ok := p.Diode.TryNext()
        if !ok {
            if p.isDone() {
                return nil
            }

            time.Sleep(p.interval)
            continue
        }
        return data
    }
}

Poller实现了diodeFetcher接口的Next方法,它使用for循环,不断通过p.Diode.TryNext()来获取data

ManyToOne

github.com/rs/[email protected]/diode/internal/diodes/many_to_one.go

// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
    writeIndex uint64
    readIndex  uint64
    buffer     []unsafe.Pointer
    alerter    Alerter
}

// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
    for {
        writeIndex := atomic.AddUint64(&d.writeIndex, 1)
        idx := writeIndex % uint64(len(d.buffer))
        old := atomic.LoadPointer(&d.buffer[idx])

        if old != nil &&
            (*bucket)(old) != nil &&
            (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
            log.Println("Diode set collision: consider using a larger diode")
            continue
        }

        newBucket := &bucket{
            data: data,
            seq:  writeIndex,
        }

        if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
            log.Println("Diode set collision: consider using a larger diode")
            continue
        }

        return
    }
}

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
    // Read a value from the ring buffer based on the readIndex.
    idx := d.readIndex % uint64(len(d.buffer))
    result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

    // When the result is nil that means the writer has not had the
    // opportunity to write a value into the diode. This value must be ignored
    // and the read head must not increment.
    if result == nil {
        return nil, false
    }

    // When the seq value is less than the current read index that means a
    // value was read from idx that was previously written but has since has
    // been dropped. This value must be ignored and the read head must not
    // increment.
    //
    // The simulation for this scenario assumes the fast forward occurred as
    // detailed below.
    //
    // 5. The reader reads again getting seq 5. It then reads again expecting
    //    seq 6 but gets seq 2. This is a read of a stale value that was
    //    effectively "dropped" so the read fails and the read head stays put.
    //    `| 4 | 5 | 2 | 3 |` r: 7, w: 6
    //
    if result.seq < d.readIndex {
        return nil, false
    }

    // When the seq value is greater than the current read index that means a
    // value was read from idx that overwrote the value that was expected to
    // be at this idx. This happens when the writer has lapped the reader. The
    // reader needs to catch up to the writer so it moves its write head to
    // the new seq, effectively dropping the messages that were not read in
    // between the two values.
    //
    // Here is a simulation of this scenario:
    //
    // 1. Both the read and write heads start at 0.
    //    `| nil | nil | nil | nil |` r: 0, w: 0
    // 2. The writer fills the buffer.
    //    `| 0 | 1 | 2 | 3 |` r: 0, w: 4
    // 3. The writer laps the read head.
    //    `| 4 | 5 | 2 | 3 |` r: 0, w: 6
    // 4. The reader reads the first value, expecting a seq of 0 but reads 4,
    //    this forces the reader to fast forward to 5.
    //    `| 4 | 5 | 2 | 3 |` r: 5, w: 6
    //
    if result.seq > d.readIndex {
        dropped := result.seq - d.readIndex
        d.readIndex = result.seq
        d.alerter.Alert(int(dropped))
    }

    // Only increment read index if a regular read occurred (where seq was
    // equal to readIndex) or a value was read that caused a fast forward
    // (where seq was greater than readIndex).
    //
    d.readIndex++
    return result.data, true
}

ManyToOne实现了Diode接口的Set和TryNext方法

func diodeDemo() {
    wr := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) {
        fmt.Printf("Logger Dropped %d messages", missed)
    })
    log := zerolog.New(wr)
    log.Print("test")

    time.Sleep(1 * time.Second)
}
{"level":"debug","message":"test"}

zerolog借助diodes提供了一个lock-free,non-blocking及thread safe的diode.Writer


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK