7

用 Go 語言實現固定大小的 Ring Buffer 資料結構

 1 year ago
source link: https://blog.wu-boy.com/2023/01/ring-buffer-queue-with-fixed-size-in-golang/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

用 Go 語言實現固定大小的 Ring Buffer 資料結構

 Posted on January 24, 2023  |  4 minutes  |  738 words  |  appleboy
logo

Ring Buffer Queue 是固定大小記憶體的 FIFO (first in, first out) 資料結構,用來處理不同 Process 之前的資料交換。工作原理就是在一段連續固定大小區間的記憶體用 (head/tail) 兩個指針來決定現在要將資料放在哪個位置。本篇將帶大家用 Go 語言快速實現 Ring Buffer 資料結構。

由於是固定大小的 Queue,在嵌入式系統領域非常好用,尤其時記憶體非常小的時候,使用上格外小心,因此開發者通常會宣告固定大小來儲存數據,而非使用動態分配,避免系統記憶體被用完。

必須要知道此資料結構也是有限制的,固定大小的 Queue,最終會遇到資料滿的時候該如何處理,有兩種方式,第一種就是將最新的資料拋棄,另一種就是將最新的資料繼續寫入舊的記憶體區段。

Wiki 上面看到底下這張動畫圖

circular buffer

從上圖可以看到在資料結構內需要底下成員

  • buf: 內容儲存格式
  • capacity: 固定容量大小 (length)
  • head: 指向從 Buffer 內取出資料 (start)
  • tail: 指向從 Buffer 內儲存資料 (end)

API 實作細節需要底下四個函示

  • Enqueue: 將資料放入 Queue
  • Dequeue: 從 Queue 讀取資料
  • IsEmpty: 判斷 Queue 內是否有資料
  • IsFull: 判斷 Queue 是否已經滿了

根據上述的資料結構,可以用 Go 語言宣告如下

type T interface{}

type CircularBuffer struct {
  sync.Mutex
  taskQueue []T
  capacity  int
  head      int
  tail      int
}

實作判斷 IsEmpty,是否為空,只要是 headtail 值相同的話,就是空的。資料結構初始化後,head 跟 tail 就都為零。

func (s *CircularBuffer) IsEmpty() bool {
  return s.head == s.tail
}

實作判斷 IsFull 是否已經滿了,無法在寫入資料,當有新資料寫入時,tail 就會 +1,而 head 則是維持不動。等到 Queue 寫入到最後一個空間後,tail + 1 就會等於 head 值,而由於是 Ring 結構,所以需要除以 capacity 取餘數。這邊請注意,由於要計算是否已經滿了,故需要浪費掉一個位置來確認。

func (s *CircularBuffer) IsFull() bool {
  return s.head == (s.tail+1)%s.capacity
}

假設 buffer size 為 4 好了

# 初始化
head: 0, tail: 0
# 寫入資料
head: 0, tail: 1
# 寫入資料
head: 0, tail: 2
# 寫入資料
head: 0, tail: 3
# Queue 裡面只有三個了
# IsFull() 已經為 true

接著看看如何實作 Enqueue 及 Dequeue

func (s *CircularBuffer) Enqueue(task T) error {
  if s.IsFull() {
    return errMaxCapacity
  }

  s.Lock()
  s.taskQueue[s.tail] = task
  s.tail = (s.tail + 1) % s.capacity
  s.Unlock()

  return nil
}

func (s *CircularBuffer) Dequeue() (T, error) {
  if s.IsEmpty() {
    return nil, queue.ErrNoTaskInQueue
  }

  s.Lock()
  data := s.taskQueue[s.head]
  s.head = (s.head + 1) % s.capacity
  s.Unlock()

  return data, nil
}

可以看到上述代碼實作方式相同,Enqueue 就是將 tail+1,而 Dequeue 就是將 head+1。最後補上 New 函式

func NewCircularBuffer(size int) *CircularBuffer {
  w := &CircularBuffer{
    taskQueue: make([]T, size),
    capacity:  size,
  }

  return w
}

驗證上述程式碼,底下是測試代碼

func TestQueue(t *testing.T) {
  size := 100
  q := NewCircularBuffer(size)

  for i := 0; i < (size - 1); i++ {
    assert.NoError(t, q.Enqueue(i+1))
  }
  // can't insert new data.
  assert.Error(t, q.Enqueue(0))
  assert.Equal(t, errFull, q.Enqueue(0))

  for i := 0; i < (size - 1); i++ {
    v, err := q.Dequeue()
    assert.Equal(t, i+1, v.(int))
    assert.NoError(t, err)
  }

  _, err := q.Dequeue()
  // no task
  assert.Error(t, err)
  assert.Equal(t, errNoTask, err)
}

上面可以看到先宣告 100 size 的 Slice,但是只能使用 99 大小,第 100 個要塞入就會出現錯誤。程式碼請參考,測試程式碼請參考這邊

解決浪費單一空間

上面有提到此 Ring buffer 需要浪費一個空間來計算 Empty 或 Full。來看看怎麼解決這問題,其實很簡單,在資料結構內多新增一個 full 變數來確認 Queue 是否為滿。

type CircularBuffer struct {
  capacity  int
  head      int
  tail      int
+ full      bool
}

func (s *CircularBuffer) IsEmpty() bool {
- return s.head == s.tail
+ return s.head == s.tail && !s.full
}

func (s *CircularBuffer) IsFull() bool {
- return s.head == (s.tail+1)%s.capacity
+ return s.full
}

func (s *CircularBuffer) Enqueue(task T) error {
  s.Lock()
  s.taskQueue[s.tail] = task
  s.tail = (s.tail + 1) % s.capacity
+ s.full = s.head == s.tail
  s.Unlock()

  return nil
}

func (s *CircularBuffer) Dequeue() (T, error) {

  s.Lock()
  data := s.taskQueue[s.head]
+ s.full = false
  s.head = (s.head + 1) % s.capacity
  s.Unlock()

測試代碼如下


func BenchmarkCircularBufferEnqueueDequeue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
    _, _ = q.Dequeue()
  }
}

func BenchmarkCircularBufferEnqueue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
  }
}

func BenchmarkCircularBufferDequeue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
  }

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _, _ = q.Dequeue()
  }
}

透過 GitHub Action 測試結果如下

goos: linux
goarch: amd64
pkg: github.com/go-training/training/example52-ring-buffer-queue
cpu: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
BenchmarkCircularBufferEnqueueDequeue
BenchmarkCircularBufferEnqueueDequeue-2     21792045          57.14 ns/op         7 B/op         0 allocs/op
BenchmarkCircularBufferEnqueue
BenchmarkCircularBufferEnqueue-2            34254517          37.20 ns/op         7 B/op         0 allocs/op
BenchmarkCircularBufferDequeue
BenchmarkCircularBufferDequeue-2            54213112          22.14 ns/op         0 B/op         0 allocs/op

最後程式碼請參考這邊,測試代碼請參考這裡



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK