6

聊聊klog的Flush

 3 years ago
source link: https://studygolang.com/articles/32372
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文主要研究一下klog的Flush

Flush

k8s.io/klog/[email protected]/klog.go

// Flush flushes all pending log I/O.
func Flush() {
    logging.lockAndFlushAll()
}

Flush方法执行的是logging.lockAndFlushAll()

k8s.io/klog/[email protected]/klog.go

// init sets up the defaults and runs flushDaemon.
func init() {
    logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
    logging.setVState(0, nil, false)
    logging.logDir = ""
    logging.logFile = ""
    logging.logFileMaxSizeMB = 1800
    logging.toStderr = true
    logging.alsoToStderr = false
    logging.skipHeaders = false
    logging.addDirHeader = false
    logging.skipLogHeaders = false
    logging.oneOutput = false
    go logging.flushDaemon()
}

klog的init方法异步协程执行logging.flushDaemon()

logging.flushDaemon()

k8s.io/klog/[email protected]/klog.go

// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
    for range time.NewTicker(flushInterval).C {
        l.lockAndFlushAll()
    }
}

flushDaemon方法range新建ticker的channel,然后执行l.lockAndFlushAll()

lockAndFlushAll

k8s.io/klog/[email protected]/klog.go

// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
    l.mu.Lock()
    l.flushAll()
    l.mu.Unlock()
}

lockAndFlushAll使用lock执行flushAll

flushAll

k8s.io/klog/[email protected]/klog.go

const (
    infoLog severity = iota
    warningLog
    errorLog
    fatalLog
    numSeverity = 4
)

// flushAll flushes all the logs and attempts to "sync" their data to disk.
// l.mu is held.
func (l *loggingT) flushAll() {
    // Flush from fatal down, in case there's trouble flushing.
    for s := fatalLog; s >= infoLog; s-- {
        file := l.file[s]
        if file != nil {
            file.Flush() // ignore error
            file.Sync()  // ignore error
        }
    }
}

flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法

flushSyncWriter

k8s.io/klog/[email protected]/klog.go

// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
    Flush() error
    Sync() error
    io.Writer
}

type Writer interface {
    Write(p []byte) (n int, err error)
}

flushSyncWriter接口定义了Flush、Sync方法,内嵌了io.Writer接口

redirectBuffer

k8s.io/klog/[email protected]/klog.go

// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {
    w io.Writer
}

func (rb *redirectBuffer) Sync() error {
    return nil
}

func (rb *redirectBuffer) Flush() error {
    return nil
}

func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {
    return rb.w.Write(bytes)
}

redirectBuffer内嵌了io.Writer,其Write方法通过io.Writer来写;其Sync及Flush方法都为空操作

syncBuffer

k8s.io/klog/[email protected]/klog.go

// syncBuffer joins a bufio.Writer to its underlying file, providing access to the
// file's Sync method and providing a wrapper for the Write method that provides log
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
    logger *loggingT
    *bufio.Writer
    file     *os.File
    sev      severity
    nbytes   uint64 // The number of bytes written to this file
    maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}

func (sb *syncBuffer) Sync() error {
    return sb.file.Sync()
}

func (sb *syncBuffer) Write(p []byte) (n int, err error) {
    if sb.nbytes+uint64(len(p)) >= sb.maxbytes {
        if err := sb.rotateFile(time.Now(), false); err != nil {
            sb.logger.exit(err)
        }
    }
    n, err = sb.Writer.Write(p)
    sb.nbytes += uint64(n)
    if err != nil {
        sb.logger.exit(err)
    }
    return
}

syncBuffer定义了logger、file、sev、nbytes、maxbytes属性,内嵌了*bufio.Writer;其Sync方法执行的是*os.File.Sync;其Flush方法执行的是*bufio.Writer.Flush

Flush

/usr/local/go/src/bufio/bufio.go

type Writer struct {
    err error
    buf []byte
    n   int
    wr  io.Writer
}

// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
    if b.err != nil {
        return b.err
    }
    if b.n == 0 {
        return nil
    }
    n, err := b.wr.Write(b.buf[0:b.n])
    if n < b.n && err == nil {
        err = io.ErrShortWrite
    }
    if err != nil {
        if n > 0 && n < b.n {
            copy(b.buf[0:b.n-n], b.buf[n:b.n])
        }
        b.n -= n
        b.err = err
        return err
    }
    b.n = 0
    return nil
}

*bufio.Writer.Flush方法执行的是底层io.Writer的Write方法

rotateFile

// rotateFile closes the syncBuffer's file and starts a new one.
// The startup argument indicates whether this is the initial startup of klog.
// If startup is true, existing files are opened for appending instead of truncated.
func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error {
    if sb.file != nil {
        sb.Flush()
        sb.file.Close()
    }
    var err error
    sb.file, _, err = create(severityName[sb.sev], now, startup)
    if err != nil {
        return err
    }
    if startup {
        fileInfo, err := sb.file.Stat()
        if err != nil {
            return fmt.Errorf("file stat could not get fileinfo: %v", err)
        }
        // init file size
        sb.nbytes = uint64(fileInfo.Size())
    } else {
        sb.nbytes = 0
    }
    sb.Writer = bufio.NewWriterSize(sb.file, bufferSize)

    if sb.logger.skipLogHeaders {
        return nil
    }

    // Write header.
    var buf bytes.Buffer
    fmt.Fprintf(&buf, "Log file created at: %s\n", now.Format("2006/01/02 15:04:05"))
    fmt.Fprintf(&buf, "Running on machine: %s\n", host)
    fmt.Fprintf(&buf, "Binary: Built with %s %s for %s/%s\n", runtime.Compiler, runtime.Version(), runtime.GOOS, runtime.GOARCH)
    fmt.Fprintf(&buf, "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg\n")
    n, err := sb.file.Write(buf.Bytes())
    sb.nbytes += uint64(n)
    return err
}

syncBuffer.rotateFile方法会设置其Writer为bufio.NewWriterSize(sb.file, bufferSize),底层writer为syncBuffer的file

klog的init方法异步协程执行logging.flushDaemon(),它内部执行的是l.lockAndFlushAll();Flush方法是执行l.lockAndFlushAll();l.lockAndFlushAll()方法使用lock执行flushAll;flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法;对于redirectBuffer,其Flush及Sync方法为空操作;对于syncBuffer,其Sync方法执行的是*os.File.Sync;其Flush方法执行的是*bufio.Writer.Flush,*bufio.Writer.Flush方法执行的是底层io.Writer的Write方法,即syncBuffer的file的Write方法。


有疑问加站长微信联系(非本文作者)

280

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK