6

Go服务错误堆栈收集降频策略

 2 years ago
source link: http://blog.thankbabe.com/posts/2022-04-04-go%E6%9C%8D%E5%8A%A1%E9%94%99%E8%AF%AF%E6%97%A5%E5%BF%97%E5%A0%86%E6%A0%88%E9%87%87%E9%9B%86%E9%99%8D%E9%A2%91/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Go服务错误堆栈收集降频策略

 2022-04-04  约 1412 字   预计阅读 3 分钟    次阅读  

Go服务在高并发请求下,当服务出现异常,会出现大量的错误日志调用栈跟踪,并上报到UDP日志服务,导致I/O彪高的问题。

多数的情况下,我们只需要通过几条错误日志的分析即可定位问题,也不需要看过多的重复的错误日志,针对这种场景下的问题,对堆栈日志的收集进行时间间隔内限量,超出限量的部分进行概率性的跳过忽略。

当前项目,日志指定的日志级别 >= error 之上的级别都需要输出调用堆栈

// 实例化Zap日志对象
logger = zap.New(core).WithOptions(zap.AddCaller()).WithOptions(zap.AddStacktrace(zapcore.ErrorLevel))

ELK日志:

有调用栈日志:

/imgs/log1.png

/imgs/log2.jpg

无调用栈跟踪

/imgs/log3.png

  • 通过计数器进行时间间隔计数

  • 堆栈日志收集进行概率性跳过策略
    • 通过配置文件配置和启用策略
  • core
    • feature-log-limit-test
      • 时间间隔计数器:
        • /utils/hcounter/timeCounter.go
      • 日志堆栈跳过策略:
        • /log/stackSkip.go

相关代码:

/utils/hcounter/timeCounter.go

  • 计数器计数通过 atomic 原子操作,(原子计数性能比锁高)
  • 间隔重置计数器和时间通过 锁 确保并发不会重复重置
package hcounter

import (
	"sync"
	"sync/atomic"
	"time"
)

var mutex sync.Mutex

// TimeCounter 时间间隔计数器
// 计数器计数通过 atomic 原子操作,(原子计数性能比锁高)
// 间隔重置计数器和时间通过 锁 确保并发不会重复重置
//
type TimeCounter struct {
	counter  uint32        // 计数器,从0开始
	abortTs  int64         // 截止时间,当前时间+间隔时间
	Max      uint32        // 默认限制数量,默认:100
	Delta    uint32        // 计数器累加值,默认:1
	Interval time.Duration // 间隔时间,默认:1分钟
}

type TimeCounterOptions func(tl *TimeCounter)

func NewTimeCounter(options ...TimeCounterOptions) *TimeCounter {
	tl := &TimeCounter{}
	// 初始化
	tl.init()
	// 赋能
	for _, opt := range options {
		opt(tl)
	}
	return tl
}

// 初始化,设置默认值
func (tl *TimeCounter) init() {
	if tl.Max <= 0 {
		tl.Max = 100
	}
	if tl.Delta <= 0 {
		tl.Delta = 1
	}
	if tl.Interval <= 0 {
		tl.Interval = 1 * time.Minute
	}
}

// 初始设置新的截止时间
func (tl *TimeCounter) initAbort() {
	var timestamp int64
	if tl.abortTs <= 0 {
		timestamp = time.Now().Add(tl.Interval).Unix()
	} else {
		timestamp = time.Unix(tl.abortTs, 0).Add(tl.Interval).Unix()
	}
	// 更新截止时间
	atomic.StoreInt64(&tl.abortTs, timestamp)
}

// 初始设置计数器数值
func (tl *TimeCounter) initCounter() {
	// 初始化计数器,从0开始
	atomic.StoreUint32(&tl.counter, 0)
}

// 锁操作
func (tl *TimeCounter) doLockHandle(doFn func()) {
	mutex.Lock()
	defer mutex.Unlock()
	doFn()

}

// CheckPass 校验是否通过
func (tl *TimeCounter) CheckPass() bool {
	nowTs := time.Now().Unix()
	// 校验是否过了截止时间
	if nowTs > tl.abortTs {
		// 重置截止时间和计数器 - 使用锁操作
		tl.doLockHandle(func() {
			// 锁里再检验,避免重复设置
			if nowTs <= tl.abortTs {
				return
			}
			// 初始化截止时间
			tl.initCounter()
			// 获取新的截止时间更新
			tl.initAbort()
		})
	}
	// 获取当前计数器值是否超过
	if tl.counter > tl.Max {
		return false
	}
	// 计数器累加
	newCounter := atomic.AddUint32(&tl.counter, tl.Delta)
	// 判断累加计数器是否超过
	if newCounter > tl.Max {
		return false
	}
	return true
}

func WithMax(max uint32) TimeCounterOptions {
	return func(tl *TimeCounter) {
		tl.Max = max
	}
}

func WithDelta(delta uint32) TimeCounterOptions {
	return func(tl *TimeCounter) {
		tl.Delta = delta
	}
}

func WithInterval(interval time.Duration) TimeCounterOptions {
	return func(tl *TimeCounter) {
		tl.Interval = interval
	}
}

/log/stackSkip.go

  • 堆栈日志跳过策略
package log

import (
	ycfg "github.com/olebedev/config"
	"math/rand"
	"time"
)

const MaxNum = 1000

// StackSkip 堆栈日志限制配置
type StackSkip struct {
	Prob           float64 // 跳过概率,小数不能大于1,支持3位小数(可以根据MaxNum修改,这次多位小数)
	CounterMax     int     // 时间间隔,最大允许堆栈日志数量
	IntervalSecond int     // 时间间隔,*/秒
	skipNum        int     // 跳过数值:随机 MaxNum = n ,n<skipNum,就不进行日志堆栈获取
	hc             *hcounter.TimeCounter
}

func NewStackLimitCfg(logCfg *ycfg.Config) *StackSkip {
	if logCfg == nil {
		return nil
	}
	cfg, _ := logCfg.Get("StackSkip")
	if cfg == nil {
		return nil
	}
	// 配置获取
	sl := &StackSkip{}
	sl.Prob = cfg.UFloat64("Prob", 0.5)
	sl.CounterMax = cfg.UInt("CounterMax", 100)
	sl.IntervalSecond = cfg.UInt("IntervalSecond", 60)
	sl.skipNum = int(MaxNum * sl.Prob)
	if sl.Prob < 0 || sl.CounterMax < 0 || sl.IntervalSecond < 0 || sl.skipNum < 0 || sl.skipNum > MaxNum {
		return nil
	}

	// 初始化时间间隔计数器
	sl.hc = hcounter.NewTimeCounter(
		hcounter.WithMax(uint32(sl.CounterMax)),
		hcounter.WithDelta(1),
		hcounter.WithInterval(time.Duration(sl.IntervalSecond)*time.Second),
	)
	return sl
}

// NeedStack 是否需要堆栈日志
func (sl StackSkip) NeedStack() bool {
	// hc=nil 不限制
	if sl.hc == nil {
		return true
	}

	// 是否达到启动概率性策略
	pass := sl.hc.CheckPass()
	if pass {
		return true
	}

	// 使用概率性策略,根据配置概率,概率性略过一些不获取堆栈日志
	i := rand.Intn(MaxNum)
	if i < sl.skipNum {
		return false
	}
	return true
}
func SyncUDPLog(ls LogStruct) {
	// ...

	// 日志堆栈概率性跳过(解决并发下大量错误日志输出堆栈信息导致I/O过高)
	if ls.NeedStack && logStackSkip != nil {
		ls.NeedStack = logStackSkip.NeedStack()
	}

	if !ls.NeedStack {
		// 无堆栈上报
	} else {
		// 有堆栈上报
	}

项目使用:

增加配置才生效,可以根据需求配置间隔时间,允许最大数量,超出最大数量后,根据配置的概率,跳过忽略掉堆栈日志

config/{{env}}/log.yaml

StackSkip:
  #时间间隔,*/秒
  IntervalSecond: 20
  #时间间隔,最大允许堆栈日志数量
  CounterMax: 10
  #跳过概率,小数不能大于1,支持3位小数
  Prob: 0.8

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK