聊聊klog的Flush
source link: https://studygolang.com/articles/32372
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
本文主要研究一下klog的Flush
Flush
k8s.io/klog/[email protected]/klog.go
// Flush flushes all pending log I/O.
func Flush() {
logging.lockAndFlushAll()
}
Flush方法执行的是logging.lockAndFlushAll()
k8s.io/klog/[email protected]/klog.go
// init sets up the defaults and runs flushDaemon.
func init() {
logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
logging.setVState(0, nil, false)
logging.logDir = ""
logging.logFile = ""
logging.logFileMaxSizeMB = 1800
logging.toStderr = true
logging.alsoToStderr = false
logging.skipHeaders = false
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
go logging.flushDaemon()
}
klog的init方法异步协程执行logging.flushDaemon()
logging.flushDaemon()
k8s.io/klog/[email protected]/klog.go
// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
for range time.NewTicker(flushInterval).C {
l.lockAndFlushAll()
}
}
flushDaemon方法range新建ticker的channel,然后执行l.lockAndFlushAll()
lockAndFlushAll
k8s.io/klog/[email protected]/klog.go
// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
l.flushAll()
l.mu.Unlock()
}
lockAndFlushAll使用lock执行flushAll
flushAll
k8s.io/klog/[email protected]/klog.go
const (
infoLog severity = iota
warningLog
errorLog
fatalLog
numSeverity = 4
)
// flushAll flushes all the logs and attempts to "sync" their data to disk.
// l.mu is held.
func (l *loggingT) flushAll() {
// Flush from fatal down, in case there's trouble flushing.
for s := fatalLog; s >= infoLog; s-- {
file := l.file[s]
if file != nil {
file.Flush() // ignore error
file.Sync() // ignore error
}
}
}
flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法
flushSyncWriter
k8s.io/klog/[email protected]/klog.go
// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
Flush() error
Sync() error
io.Writer
}
type Writer interface {
Write(p []byte) (n int, err error)
}
flushSyncWriter接口定义了Flush、Sync方法,内嵌了io.Writer接口
redirectBuffer
k8s.io/klog/[email protected]/klog.go
// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {
w io.Writer
}
func (rb *redirectBuffer) Sync() error {
return nil
}
func (rb *redirectBuffer) Flush() error {
return nil
}
func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {
return rb.w.Write(bytes)
}
redirectBuffer内嵌了io.Writer,其Write方法通过io.Writer来写;其Sync及Flush方法都为空操作
syncBuffer
k8s.io/klog/[email protected]/klog.go
// syncBuffer joins a bufio.Writer to its underlying file, providing access to the
// file's Sync method and providing a wrapper for the Write method that provides log
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
logger *loggingT
*bufio.Writer
file *os.File
sev severity
nbytes uint64 // The number of bytes written to this file
maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}
func (sb *syncBuffer) Sync() error {
return sb.file.Sync()
}
func (sb *syncBuffer) Write(p []byte) (n int, err error) {
if sb.nbytes+uint64(len(p)) >= sb.maxbytes {
if err := sb.rotateFile(time.Now(), false); err != nil {
sb.logger.exit(err)
}
}
n, err = sb.Writer.Write(p)
sb.nbytes += uint64(n)
if err != nil {
sb.logger.exit(err)
}
return
}
syncBuffer定义了logger、file、sev、nbytes、maxbytes属性,内嵌了
*bufio.Writer
;其Sync方法执行的是*os.File
.Sync;其Flush方法执行的是*bufio.Writer
.Flush
Flush
/usr/local/go/src/bufio/bufio.go
type Writer struct {
err error
buf []byte
n int
wr io.Writer
}
// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
n, err := b.wr.Write(b.buf[0:b.n])
if n < b.n && err == nil {
err = io.ErrShortWrite
}
if err != nil {
if n > 0 && n < b.n {
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = err
return err
}
b.n = 0
return nil
}
*bufio.Writer
.Flush方法执行的是底层io.Writer的Write方法
rotateFile
// rotateFile closes the syncBuffer's file and starts a new one.
// The startup argument indicates whether this is the initial startup of klog.
// If startup is true, existing files are opened for appending instead of truncated.
func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error {
if sb.file != nil {
sb.Flush()
sb.file.Close()
}
var err error
sb.file, _, err = create(severityName[sb.sev], now, startup)
if err != nil {
return err
}
if startup {
fileInfo, err := sb.file.Stat()
if err != nil {
return fmt.Errorf("file stat could not get fileinfo: %v", err)
}
// init file size
sb.nbytes = uint64(fileInfo.Size())
} else {
sb.nbytes = 0
}
sb.Writer = bufio.NewWriterSize(sb.file, bufferSize)
if sb.logger.skipLogHeaders {
return nil
}
// Write header.
var buf bytes.Buffer
fmt.Fprintf(&buf, "Log file created at: %s\n", now.Format("2006/01/02 15:04:05"))
fmt.Fprintf(&buf, "Running on machine: %s\n", host)
fmt.Fprintf(&buf, "Binary: Built with %s %s for %s/%s\n", runtime.Compiler, runtime.Version(), runtime.GOOS, runtime.GOARCH)
fmt.Fprintf(&buf, "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg\n")
n, err := sb.file.Write(buf.Bytes())
sb.nbytes += uint64(n)
return err
}
syncBuffer.rotateFile方法会设置其Writer为bufio.NewWriterSize(sb.file, bufferSize),底层writer为syncBuffer的file
klog的init方法异步协程执行logging.flushDaemon(),它内部执行的是l.lockAndFlushAll();Flush方法是执行l.lockAndFlushAll();l.lockAndFlushAll()方法使用lock执行flushAll;flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法;对于redirectBuffer,其Flush及Sync方法为空操作;对于syncBuffer,其Sync方法执行的是*os.File
.Sync;其Flush方法执行的是*bufio.Writer
.Flush,*bufio.Writer
.Flush方法执行的是底层io.Writer的Write方法,即syncBuffer的file的Write方法。
有疑问加站长微信联系(非本文作者)
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK