26

open-falcon 聚合器aggregator代码解析

 4 years ago
source link: https://studygolang.com/articles/29677
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

总结:aggregator聚合器就是从falcon_portal.cluster表中取出用户在页面上配置的表达式,然后解析后,通过api拿到对应机器组的所有机器,通过api查询graph数据算出一个值重新打回transfer作为一个新的点。

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法
  • WorkerRun这个方法解析分子分母的配置
  • 调用api 根据grp_id拿出所有机器列表
  • 调用graph的last接口拿出所有endpoint的counter 的值然后进行计算
  • 计算后重新打回 一个线程安全的双向链表队列
  • 另外一个goroutine异步pop队列中的值发生给 transfer的http接口(不是给agent用的rpc接口)
  • 机器量很多时获取机器列表和查询最新的值都是瓶颈
  • 我在想如果直接在transfer中直接做数据的聚合速度上不存在瓶颈

下面我们来看下代码:

  1. main.go中核心的两个地方
//查询db 调api算值 push 到push的队列中
    go cron.UpdateItems()
    //从push队列push到transfer
    sender.StartSender()

2.看下go cron.UpdateItems()

func updateItems() {
        //从db中查询出结果
    items, err := db.ReadClusterMonitorItems()
    if err != nil {
        return
    }
        //对比key(id+uptime),将已经变更的项删除 
    deleteNoUseWorker(items)
    //启动新的worker
    createWorkerIfNeed(items)
}
//看下这个读db的func
func ReadClusterMonitorItems() (M map[string]*g.Cluster, err error){
   ......
   /*看到这个funcreturn的是个map key是 每个聚合项的id和他更新时间的字符串
   value 就是Cluster结构体指针
   type Cluster struct {
    Id          int64
    GroupId     int64
    Numerator   string
    Denominator string
    Endpoint    string
    Metric      string
    Tags        string
    DsType      string
    Step        int
    LastUpdate  time.Time
   }
   */
   M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c
   return M, err
}

3.看下 deleteNoUseWorker 和createWorkerIfNeed 这两个func都是围绕 Worker这个struct的进行增删

func deleteNoUseWorker(m map[string]*g.Cluster) {
    del := []string{}
    for key, worker := range Workers {
            //遍历已经创建的work,如果key在新的map中没有了说明这条记录在db中被更改或删除了
        //所以删掉它 给Workers这个map缩容
        if _, ok := m[key]; !ok {
               //将worker 中的Quit chan关闭 会调用ticker.stop 真正关闭 
            worker.Drop()
            del = append(del, key)
        }
    }

    for _, key := range del {
        delete(Workers, key)
    }
}

func createWorkerIfNeed(m map[string]*g.Cluster) {
 
    for key, item := range m {
        if _, ok := Workers[key]; !ok {
                //如果配置中step小于0 丢弃这条
            if item.Step <= 0 {
                log.Println("[W] invalid cluster(step <= 0):", item)
                continue
            }
                        //初始化worker     
            worker := NewWorker(item)
            Workers[key] = worker
            worker.Start()
        }
    }
}

4. 看下Worker这个结构体包含三个域

  • ticker作为一个计时器实现类似cron的功能每隔一段时间执行一次Start 中的func
  • ClusterItem作为每个聚合器的配置
  • Quit是一个chan用来外部关闭 key在新的map中没有了说明这条记录在db中被更改或删除了
type Worker struct {
    Ticker      *time.Ticker
    ClusterItem *g.Cluster
    Quit        chan struct{}
}

func NewWorker(ci *g.Cluster) Worker {
    w := Worker{}
    w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second)
    w.Quit = make(chan struct{})
    w.ClusterItem = ci
    return w
}

func (this Worker) Start() {
    go func() {
        for {
            select {
            case <-this.Ticker.C:
                WorkerRun(this.ClusterItem)
            case <-this.Quit:
                if g.Config().Debug {
                    log.Println("[I] drop worker", this.ClusterItem)
                }
                this.Ticker.Stop()
                return
            }
        }
    }()
}

func (this Worker) Drop() {
    close(this.Quit)
}

var Workers = make(map[string]Worker)

到这里我们已经看明白聚合器的流程了:

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法

5.下面看下最重要的方法 WorkerRun

func WorkerRun(item *g.Cluster) {
    debug := g.Config().Debug
    /*
    Numerator代表分子    例如 $(cpu.user)+$(cpu.system) 代表求cpu.user和cpu.system的和
    Denominator代表分母  例如 $# 代表所有机器
    */
        //cleanParam去除\r等字符
    numeratorStr := cleanParam(item.Numerator)
    denominatorStr := cleanParam(item.Denominator)
        //判断分子分母是否合法
    if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) {
        log.Println("[W] invalid numerator or denominator", item)
        return
    }
        //判断分子分母是否需要计算
      needComputeNumerator := needCompute(numeratorStr)
    needComputeDenominator := needCompute(denominatorStr)
    //如果分子分母都不需要计算就不需要用到聚合器了
    if !needComputeNumerator && !needComputeDenominator {
        log.Println("[W] no need compute", item)
        return
    }
        //比如分子是这样的: "($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80"
    //那么parse的返回值为 [cpu.busy cpu.idle cpu.nice] [+ -] >80
    numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)
    denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)

    if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) {
        log.Println("[W] operators invalid", item)
        return
    }
    /*add retry for gethostname bygid
    这里源码是动过sdk根据group_id查找组里面机器列表
    这里我进行了两点优化:
    1.sdk调用时没有加重试,http失败导致这次没有get到机器所以这个点就不算了导致断点
    2.原来的接口在机器量超过1k时就效率就会很慢 2w+机器需要8s,看了代码是用orm进行了多次查询而且附带了很多别的信息
    这里我只需要group_id对应endpoint_list所以我写了一个新的接口用一条raw_sql进行查询
    测试2w+的机器0.2s就能返回
    */
    retry_limit :=3
    r_s :=0
    var hostnames []string
    for r_s <retry_limit{
        hostnames_tmp, err_tmp := sdk.HostnamesByID(item.GroupId)
        if err_tmp != nil {
            log.Println("[E] get hostlist err",err_tmp)
            r_s+=1
            time.Sleep(time.Second)
        }else{
            hostnames = hostnames_tmp
            break
        }
    }
    //没有机器当然不用算了
    if len(hostnames)==0{
        log.Println("[E] get 0 record hostname item:",item)
        return
    }

    now := time.Now().Unix()

    /*这里是调用graph/lastpoint这个api 查询最近一个点的数据
    1.机器是上面查到的主机列表
    2.counter这里做了合并 把所有要查的metirc都放在一个请求里面查询了
    3.查询的时候在api那边做了for循环 逐个item查询 估计这里也会拖慢速度
    4.查完之后计算下值推到发送队列
    */
    valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)
    if err != nil {
        log.Println("[E] get queryCounterLast", err, item)
        return
    }

    ..........
    sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))
}

6.最后看下发送的代码

  • MetaDataQueue是个线程安全的双向链表
  • 上面说的WorkerRun方法中会将转化好的监控项数据PushFront入链表
  • startSender这个goroutine 每200毫秒会将队列中的数据取出发送到transfer的http接口
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {
    md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...)
    MetaDataQueue.PushFront(md)
}

const LIMIT = 200

var MetaDataQueue = NewSafeLinkedList()
var PostPushUrl string
var Debug bool

func StartSender() {
    go startSender()
}

func startSender() {
    for {
        L := MetaDataQueue.PopBack(LIMIT)
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)
            continue
        }

        err := PostPush(L)
        if err != nil {
            log.Println("[E] push to transfer fail", err)
        }
    }
}

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK