4

系統設計: 處理服務讀取多個任務遇到的問題 (Go 語言)

 1 year ago
source link: https://blog.wu-boy.com/2022/11/simple-scheduler-with-multiple-worker-using-golang/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
system design 02

不同的服務都會有需要處理比較久的任務,這些任務是不能即時執行完成,才回應給前端,這樣使用者體驗會非常的差。將類型的任務存在資料庫或放在消息對列就是一種處理方式,接著啟動另一個服務來消化非即時性的任務,而常見的處理方式就是在服務內啟動多個 Worker Node 來平行消化任務 (如上圖)。

讀取多個任務問題

先看看底下此服務內部的設計,用 Go 語言來當範例解釋當下問題,假設有一個 Task 服務負責存放所有的任務,而 Agent 服務內可以開啟多個 Goroutine 來平行消化任務,步驟也很簡單,第一步就去讀取任務,而第二步就是執行任務。想看看在步驟一的時候,如果目前 Task 服務內沒有任何任務需要執行,就設計每 5 秒才去向 Task 服務詢問是否有新任務需要執行,避免太頻繁發請求給 Task 服務。

先假設有 100 個任務需要等待執行,在 Agent 內開啟 10 個 Worker Node 去消化,這樣每次就會發送 10 個請求,而這 10 個請求有可能對於 Task 服務來說就是 10 個 SQL 指令,如果是 10 台 Agnet 就變成 100 個請求,這樣對於 Task 服務來說會負擔太大。

system design 01

大家可以想看看如何解決上述的問題,底下提供一段 Go 語言實作多個 Worker 的代碼,步驟一就是開啟 Goroutine 來平行處理任務,就是在這邊會發送大量的請求到另一個 Task 服務

type Poller struct {
  routineGroup *routineGroup
  workerNum    int
}

func (p *Poller) Poll(ctx context.Context) error {
  for i := 0; i < p.workerNum; i++ {
    // step01
    p.routineGroup.Run(func() {
      for {
        select {
        case <-ctx.Done():
          return
        default:
          // step02
          task, err := p.fetch(ctx)
          if err != nil {
            log.Println("can't get task", err.Error())
            break
          }

          // step03
          if err := p.execute(ctx, task); err != nil {
            log.Println("execute task error:", err.Error())
          }
        }
      }
    })
  }
  p.routineGroup.Wait()
  return nil
}

改善系統設計

為了解決不要發送大量的請求,我們可以在最前面多設計一個 Scheduler 來確保一次只讀取一個任務後,才繼續執行下一個讀取任務。大家可以看看底下的設計圖

system design 02

從上面的設計圖,我們需要在 Poller 的 struct 內紀錄目前有多少個 Worker Node 正在執行,故新增一個 Metric struct 來記錄這些資訊

type metric struct {
  busyWorkers uint64
}

// newMetric for default metric structure
func newMetric() *metric {
  return &metric{}
}

func (m *metric) IncBusyWorker() uint64 {
  return atomic.AddUint64(&m.busyWorkers, 1)
}

func (m *metric) DecBusyWorker() uint64 {
  return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}

func (m *metric) BusyWorkers() uint64 {
  return atomic.LoadUint64(&m.busyWorkers)
}

有了上述資訊後,接著在 Poller 內多新增一個 ready channel 用來判斷是否有新的 Worker 可以分配。所以在初始化的 for 迴圈內需要判斷是否有新的 Worker Node 可以執行。

func (p *Poller) schedule() {
  p.Lock()
  defer p.Unlock()
  if int(p.metric.BusyWorkers()) >= p.workerNum {
    return
  }

  select {
  case p.ready <- struct{}{}:
  default:
  }
}

接著改寫整體 Poll 函示,多寫一個 for 迴圈來判斷是否有新的 Worker Node

func (p *Poller) Poll(ctx context.Context) error {
  // scheduler
  for {
    // step01
    p.schedule()

    select {
    case <-p.ready:
    case <-ctx.Done():
      return nil
    }

  LOOP:
    for {
      select {
      case <-ctx.Done():
        break LOOP
      default:
        // step02
        task, err := p.fetch(ctx)
        if err != nil {
          log.Println("fetch task error:", err.Error())
          break
        }
        p.metric.IncBusyWorker()
        // step03
        p.routineGroup.Run(func() {
          if err := p.execute(ctx, task); err != nil {
            log.Println("execute task error:", err.Error())
          }
        })
        break LOOP
      }
    }
  }
}

可以看到流程步驟會變成底下

  1. 判斷使否有新的 Worker Node 可以執行
  2. 單一 Worker Node 讀取是否有新任務
  3. 如果有新的任務,則紀錄 Worker Node + 1
  4. 返回步驟一

只要任何任務完成後,就將 Worker Node 數量再減一,並重新執行 p.schedule(),確保 ready channel 不為空。

func (p *Poller) execute(ctx context.Context, task string) error {
  defer func() {
    p.metric.DecBusyWorker()
    p.schedule()
  }()
  return nil
}

上述測試的代碼可以直接參考這邊,也許大家有其他方式可以解決此問題,像是用 Message Queue 避免大量請求也是一種解決方案,只是如果能不起另一種服務是最好的,畢竟團隊內有時候需要將整套流程打包放到客戶端環境,多起一種服務,這樣要除錯又更不方便了。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK