3

AlanWang0202的个人空间

 3 years ago
source link: https://my.oschina.net/u/5084709/blog/5209796
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Prometheus源码 - memSeries

Prolegomena

Prometheus无疑是一款优秀的开源系统监控报警框架,在云原生的时代发挥着重要作用。它提供近实时的、基于动态云环境和容器微服务、服务以及应用程序的内省监控。同时也用于监控传统架构的资源。Fortunately,笔者每天的工作都会与Prometheus打交道,在使用过程中它体现的高效无不让人着迷。同时,笔者对于这款CNCF设计思想产生了浓厚的兴趣,这个框架是如何做到单一节点可以处理数以百万的监控指标,每秒处理数十万的数据点? 带着这种疑惑与兴趣,开始了prometheus设计思想和工程实践的探索。它的高效必然离不开一款高效的时序数据库的支持TSDB。本文会对TSDB的其中一个memSeries模块进行源码层面的剖析。借此机会,笔者也分享下看源码的心得,对于这种体量级别的应用,看其源码无异于自己置身扁舟漂泊在太平洋,无头绪,无方向,充满挑战。Efficiently,我们可以找一篇写的很有总结性质的好文章(Prometheus时序数据库-内存中的存储结构),然后阅读下该模块所涉及到的技术论文(Gorilla: A Fast, Scalable, In-Memory Time Series Database),这样再去撸源码就事半功倍了。

Introduction

笔者主要是围绕memSeries源码(prometheus/prometheus-main/tsdb/head.go)进行,顺序性的解读。因为还没有系统性的将整个工程源码都读完,而且这类resource很少,无法站在更高的角度将memSeries的方法具体作用和Prometheus实际使用完美的串联起来。Hence,本文主要讲memSeries源码实现过程和方法的效果。后续笔者会不断的解读Prometheus其他模块,这样一步步的将其各个模块串联起来,读者可以关注后续的update。

MemSeries Attributes

type memSeries struct {
   sync.RWMutex

   ref           uint64  
   lset          labels.Labels
   mmappedChunks []*mmappedChunk 
   headChunk     *memChunk
   chunkRange    int64
   firstChunkID  int

   nextAt        int64 // Timestamp at which to cu? the next chunk.
   sampleBuf     [4]sample
   pendingCommit bool // Whether there are samples waiting to be commi?ted to~ this series.

   app chunkenc.Appender // Current appender for the chunk.

   memChunkPool *sync.Pool

   txs *txRing
}

  1. ref :每接受一个新的时间序列(e.g. ht?p_request??_total{path="/", method="GET"},NTC. 时间序列=指标(e.g. h?tp_request_total) + 标签(k,v))

  2. lset:这个是识别这个series的标签集合。

    // 源码
    lset labels.Labels
    	| -> type Labels []Label
    		| -> type Label struct { Name, Value string }
    
  3. mmappedChunks

    type mmappedChunk struct {
       ref              uint64
       numSamples       uint16
       minTime, maxTime int64
    }
    

sample(t,v)是这个时间序列某个时间的采集数据经过gorilla压缩后的数据。sample是被存储到memchunk中,其中达到120个(默认15s采集一次数据,生成一个sample。Hence,120*15s=30min Full chunk),或者

chunkRange的两个小时(memSeries -> chunkRange 参数决定) 都可以叫做这个chunk是full(ref. Once the chunk fills till 120 samples (or) spans upto chunk/block range (let's call it chunkRange), which is 2h by default, a new chunk is cut and the old chunk is said to be "full". Link: https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/)

这个也可以在源码中考究:

// tsdb/head.go
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {

->   const samplesPerChunk = 120

当 active chunk 写满sample后,就会使用 chunks.ChunkDiskMapper -> chunkDiskMapper.WriteChunk() 写入到磁盘中,同时生成 chunkRef,这个值 represent :该时间序列磁盘中的chunk 在内存中的映射mmappedChunks []*mmappedChunk 维护着该时间序列的所有的chunk。

源码探索:

func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {

    ...
   // 将full chunk 写入到磁盘
   chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)

   // 建立内存映射 
   s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
      ...
   })
   ...
}
  1. headChunk:

    type memChunk struct {
       chunk            chunkenc.Chunk
       minTime, maxTime int64
    }
    

headChunk 一般都是active chunk 一直有samples写入。

Relevant Methods:

// 将目前的headchunk 写入磁盘同时在内存中建立映射保存元数据,memSeries的指针在指向新的headChunk
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk{ ... }

// 将headChunk 写入磁盘 建立M-map映射
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper){ ... }

那我们会疑惑一个问题:headChunk 满足什么条件才会触发memSeries指针指向新的headChunk,以及落盘和创建内存映射呢?这里,笔者画了一个flow chart. 关注两个指标即可

memSeries.chunkRange and const samplesPerChunk = 120

  1. chunkRange

    type memSeries struct {
     	...
       chunkRange    int64
       ...
    }
    

    它的作用可以看下该源码:

    func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
    
       | -> s.nextAt = rangeForTimestamp(mint, s.chunkRange)
       		| -> func rangeForTimestamp(t int64, width int64) (maxt int64) {
    				return (t/width)*width + width}
    }
    

​ 这个chunkRange的大小决定着 nextAt的值(default. 2H),这个值决定了新的headChunk什么时候被创建,也就是curChunk的什么时候full。

  1. firstChunkID : 因为每个memSeries中会有属于这个时间序列chunk的映射表mmappedChunks []*mmappedChunk,这个主要是是为了找到对应chunk(一般是on-disk chunk)的metadata

    type memSeries struct {
        ...
       mmappedChunks []*mmappedChunk
          |-> type mmappedChunk struct {
                    ...
    					}
    	...
       firstChunkID  int
        ...
    }
    

    实际应用:ix 表示chunk在mmappedChunks的索引,chunkID 是从1开始逐渐增长的。Hence,(id- s.firstChunkID)就是这个chunk在 mmappendChunks的索引。

    func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
    
       ix := id - s.firstChunkID  // chunk在mmappendChunks的索引
       ...
    }
    
  2. nextAt :下一个headChunk的开始时间。 可以看下面这段源码, 设置一个最低的限制,下一个chunk必须创建的时间,其实是受 const samplesPerChunk = 120(当sample 达到120个的时候回cutHeadChunk)chunkRange(default:2h) 影响的。

func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
	...
   s.nextAt = rangeForTimestamp(mint, s.chunkRange)
    		| -> func rangeForTimestamp(t int64, width int64) (maxt int64) {
					return (t/width)*width + width
				}
    ...
}
  1. sampleBuf

    type memSeries struct {
    	...
    	sampleBuf [4]sample
    	...
    }
    

通过源码查看 其实就是headChunk 保存最新的4个sample,对于有什么用处,目前看memSeries源码还没发现

func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
	...
   s.sampleBuf[0] = s.sampleBuf[1]
   s.sampleBuf[1] = s.sampleBuf[2]
   s.sampleBuf[2] = s.sampleBuf[3]
   s.sampleBuf[3] = sample{t: t, v: v}
   	...

}
  1. pendingCommit : OFF: Whether there are samples waiting to be committed to this series.

    这个字段在 func (a *headAppender) Commit() 这个方法中会涉及到,目前不展开讲,等下一个Head系列会提到。

  2. app : 其实是一个空接口,但是当初始化memSeries的时候,会实例化它。这个设计模式值得学习(其实一个工厂模型,但是在源码中比较难找。。。),

    // Appender adds sample pairs to a chunk.
    type Appender interface {
       Append(int64, float64)
    }
    

而它真正的实现是XOR算法模块,这个算法可以参考笔者另一篇文章:Gorilla Encoding. 在以后的Prometheus系列中,笔者会出一期关于XOR算法实现的分析锻炼下工程化落地能力。Anyway,memSeries如何使用这个接口的呢? 可以参考下面的源码片段

// XORChunk 结构体的这个方法返回一个xorAppender结构体
func (c *XORChunk) Appender() (Appender, error) {
   ...
   
   a := &xorAppender{
	...
   }
   ... 
  return a, err
}
// xorAppender 这个结构体 有一个method Append 这样就具体实现了 Appender interface{} 这个接口
func (a *xorAppender) Append(t int64, v float64) {
  	...
}

// 初始化chunk的时候 chunk 为 XORChunk,并使用XORChunk中的Appender()方法 返回一个xorAppender 结
// 构体,这个结构体具体实现了Append方法,并赋值给 s.app。
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
  ...
   	s.headChunk = &memChunk{
      chunk:   chunkenc.NewXORChunk(), // 实例化 chunk为 XORChunk()
	  ...
   	}
  	...
  	app, err := s.headChunk.chunk.Appender()  // 实例化
	if err != nil {
		panic(err)
	}
	s.app = app  // 实例化 s.app
	return s.headChunk
  
}

所以它才会有如下这个操作:

// s.app.Append(t,v) 
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
  ...
   s.app.Append(t, v)
  ...
}
  1. memChunkPool: 由于golang内建的GC机制会影响应用的性能,为了减少GC,golang提供了对象重用的机制,也就是sync.Pool对象池。 sync.Pool是可伸缩的,并发安全的。其大小仅受限于内存的大小,可以被看作是一个存放可重用对象的值的容器。 设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。

    
    type memSeries struct {
    	...
    
    	memChunkPool *sync.Pool
    
    	...
    }
    
  2. txs: 事务ID记录的一个结构体. 这个又是另一个模块(isolation)的知识体系,本文不细说。

    type memSeries struct {
    	...
    
    	txs *txRing
         | —> type txRing struct {
    			txIDs     []uint64
    			txIDFirst int // Position of the first id in the ring.
    			txIDCount int // How many ids in the ring.
    		}
    
    	...
    }
    

MemSeries methods

当介绍memSeries结构体元素的时候,其实都穿插着讲了它的方法,所以这个版本会简单的介绍下几个方法的实现和作用。

  1. func (s *memSeries) cutNewHeadChunk(...){...} :当前的headChunk达到full chunk条件的时候,会使用该方法重新初始化一个新的headChunk,并将memSeries的headChunk指向该chunk。

  2. func (s *memSeries) mmapCurrentHeadChunk(...){...} : 将目前的headChunk 写入到磁盘中,同时会建立内存映射。内存映射主要是 mmappedChunks []*mmappedChunk 进行内存中存储。

  3. func (s *memSeries) chunk(id int, ...){...} : 根据id,找到想查找的chunk在内存映射中的索引,从而找到该chunk。可以看下源码,这里面还是有一些trick

    func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
    
       ix := id - s.firstChunkID  // ix是 该chunk在m-map中的索引. chunk的id 是从1开始逐渐增加的1,2,3,...
       if ix < 0 || ix > len(s.mmappedChunks) {
          return nil, false, storage.ErrNotFound
       }
       if ix == len(s.mmappedChunks) { // 查找的是active chunk 正在执行写入的headChunk
          if s.headChunk == nil {
             return nil, false, errors.New("invalid head chunk")
          }
          return s.headChunk, false, nil  // 由于是active chunk 所以不能被GC 回收。
       }
       chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) // 查找的chunk已经落盘,根据m-map的ref 在磁盘中查找对应的chunk 这个落盘的chunk是没有mint,maxt 
       if err != nil {
          if _, ok := err.(*chunks.CorruptionErr); ok {
             panic(err)
          }
          return nil, false, err
       }
       mc := s.memChunkPool.Get().(*memChunk) // 这里是一个trick,从池子里拿一个memChunk内存空间 然后进行初始化,这个是并发安全的。
       mc.chunk = chk
       mc.minTime = s.mmappedChunks[ix].minTime
       mc.maxTime = s.mmappedChunks[ix].maxTime
        return mc, true, nil  // true: 表示该chunk使用后,可以被回收。
    }
    
  4. func (s *memSeries) truncateChunksBefore(mint int64) (removed int): 这个函数的作用是给一个时间,把这个时间点之前的chunk都从series中剔除掉。

    func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
       	// headchunk的最大时间小于给的这个时间段 说明 把这个series中所有的chunk都要清零。
        if s.headChunk != nil && s.headChunk.maxTime < mint { 
          removed = 1 + len(s.mmappedChunks)
          s.firstChunkID += removed
          s.headChunk = nil  // 清空
          s.mmappedChunks = nil // 清空
          return removed
       }
        // 判断每一个落盘的chunk中的时间 与 截断时间对比
       if len(s.mmappedChunks) > 0 {
          for i, c := range s.mmappedChunks { // 这个c其实不是chunk 只是磁盘某个chunk中在内存中的元数据
             if c.maxTime >= mint {
                break
             }
             removed = i + 1
          }
          s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...)
          s.firstChunkID += removed
       }
       return removed
    }
    
  5. func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (...) :

    func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
    
       // 这个就是定义了 一个full chunk的一个条件,当有120个samples 就证明full chunk,如果设置的采集时间是每15s采集一次,那么一个full chunk(每一个时间序列的)需要15s * 120 = 30min
       const samplesPerChunk = 120
    
       c := s.head()
    
       if c == nil {
          if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {  // 该headChunk的最大时间都大于插入的时间t了,所以这个sample无法被append
    
             return false, false
          }
    
          // len(s.mmappedChunks) = 0 and headChunk == nil 那么就创建一个新的HeadChunk 起始时       //间是t
          //
          c = s.cutNewHeadChunk(t, chunkDiskMapper) 
          chunkCreated = true
       }
       numSamples := c.chunk.NumSamples()
    
    
       if c.maxTime >= t {
          return false, chunkCreated
       }
    
       if numSamples == samplesPerChunk/4 {
          s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) // 推断并设置下一个headChunk创建时间。这个函数有一个分母有一个+1操作,是为了防止分母为0。
       }
       if t >= s.nextAt {
          c = s.cutNewHeadChunk(t, chunkDiskMapper)
          chunkCreated = true
       }
       s.app.Append(t, v)
    
       c.maxTime = t
    
       s.sampleBuf[0] = s.sampleBuf[1]
       s.sampleBuf[1] = s.sampleBuf[2]
       s.sampleBuf[2] = s.sampleBuf[3]
       s.sampleBuf[3] = sample{t: t, v: v}
    
       if appendID > 0 { // 是否需要被隔离? 如果是appendID == 0 则不需要。 主要是在查询和插入的时候。
          s.txs.add(appendID)
       }
    
       return true, chunkCreated
    }
    
  6. func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator{...}

    func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
       c, garbageCollect, err := s.chunk(id, chunkDiskMapper)  // 根据id 从 memseries中找到对应的c -> memchunk
       // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
       // series's chunk, which got then garbage collected before it got
       // accessed.  We must ensure to not garbage collect as long as any
       // readers still hold a reference.
       if err != nil {
          return chunkenc.NewNopIterator()
       }
       defer func() {
          if garbageCollect {
             // Set this to nil so that Go GC can collect it after it has been used.
             // This should be done always at the end.
             c.chunk = nil
             s.memChunkPool.Put(c)
          }
       }()
    
       ix := id - s.firstChunkID  // ix: c 在 memseries中的索引值
    
       numSamples := c.chunk.NumSamples()  // c 有多少个numSamples
       stopAfter := numSamples
    
       if isoState != nil {
          totalSamples := 0    // Total samples in this series.
          previousSamples := 0 // Samples before this chunk.
    
          for j, d := range s.mmappedChunks {
             totalSamples += int(d.numSamples)
             if j < ix {
                previousSamples += int(d.numSamples)
             }
          }
    
          if s.headChunk != nil {
             totalSamples += s.headChunk.chunk.NumSamples()
          }
    
          // Removing the extra transactionIDs that are relevant for samples that
          // come after this chunk, from the total transactionIDs.
          appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))  // 这个chunk 和它之前的所有 samples数量
    
          // Iterate over the appendIDs, find the first one that the isolation state says not
          // to return.
          it := s.txs.iterator()
          for index := 0; index < appendIDsToConsider; index++ {
             appendID := it.At()  // 没有初始化txs 所以就是postion=0  第一个位置的appendID
             if appendID <= isoState.maxAppendID { // Easy check first.
                if _, ok := isoState.incompleteAppends[appendID]; !ok {  // 没有检测到这个某个sample的完成操作
                   it.Next()
                   continue
                }
             }
             // Eq. index - previousSamples, 当小于0说明 index
             // 还没到目前chunk的上一个chunk或者到了上一个chunk但是没遍历完呢。 大于0 说明 index目前
             //curChunk上
             stopAfter = numSamples - (appendIDsToConsider - index)
    
             if stopAfter < 0 { // index还没遍历到 curChunk上
                stopAfter = 0 // Stopped in a previous chunk.
             }
             break
          }
       }
    
       if stopAfter == 0 {
          return chunkenc.NewNopIterator()  //NewNopIterator returns a new chunk iterator that does not hold any data.  目前index指针到了previous chunk末尾 或者curChunk开头,也就是stopAfter的pos所在,start = end ? 这样肯定返回一个没有数据的迭代器
       }
    
    
        // 以下代码是根据 传入的参数 it chunkenc.Iterator  来选择返回哪种类型的Iterator
       if id-s.firstChunkID < len(s.mmappedChunks) {
          if stopAfter == numSamples {
             return c.chunk.Iterator(it)  // index 到了curChunk的末尾,其中it 传入的object可以将Iterator实例化,
          }
    
           // 不同种类的Iterator
          if msIter, ok := it.(*stopIterator); ok {
             msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
             msIter.i = -1
             msIter.stopAfter = stopAfter
             return msIter
          }
          return &stopIterator{
             Iterator:  c.chunk.Iterator(it),
             i:         -1,
             stopAfter: stopAfter,
          }
       }
       // Serve the last 4 samples for the last chunk from the sample buffer
       // as their compressed bytes may be mutated by added samples.
       if msIter, ok := it.(*memSafeIterator); ok {
          msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
          msIter.i = -1
          msIter.total = numSamples
          msIter.stopAfter = stopAfter
          msIter.buf = s.sampleBuf
          return msIter
       }
       return &memSafeIterator{
          stopIterator: stopIterator{
             Iterator:  c.chunk.Iterator(it),
             i:         -1,
             stopAfter: stopAfter,
          },
          total: numSamples,
          buf:   s.sampleBuf,
       }
    }
    

上述Iterator的关系

// Iterator interface
type Chunk interface {
   ...
   Iterator(Iterator) Iterator
    | ->  type Iterator interface {Next() bool,Seek(t int64) bool,
    	At() (int64, float64), Err() error}
}
type stopIterator struct {
	chunkenc.Iterator  
	i, stopAfter int
}
type memSafeIterator struct {
	stopIterator 
	total int 
	buf [4]sample}
}

Conclusion

通过阅读源码,笔者不仅学习到了这些大神的代码风格,体会了prometheus TSDB的设计思想(这种设计风格建议总结下来,以后可以尝试造轮子),其中笔者更是对memSeries模块在脑海里有自己的认识和理解。后续笔者将会不断解剖Prometheus源码,大家可以关注下。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK