2

Simple Scheduling Library (Go) [译]

 2 years ago
source link: https://ijayer.github.io/post/tech/code/golang/20200308_simple_scheduler_lib/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Simple Scheduling Library (Go) [译]

2020-03-08 844 words 2 mins read 48 times read

现在的应用程序或多或少都有这样得需求,既 在给定的某段时间重复执行一段代码。这段代码可能是从其他数据源获取数据进行解析,亦或是发送一些数据到其他地方,如 MQ、Handler 等。

这种问题或挑战一定会在你的项目中遇到,现在或将来,难道不是吗?

我们团队在前一个项目开发期间就遇到了这样的问题。随着应用程序不断迭代、某些功能上就需要在某段时间内能够持续的执行一个任务,当然,任务的执行频率是可以提前设置好的。因此,就有了下面提到的这个库,只有不到 50 行代码,用一种更优雅的方式来调度某种类型的任务。

我们来深入看看怎么实现的

其背后主要思想是:每个添加的任务都启动一个 goroutine,在指定的时间间隔内重复执行该任务, 并创建对应的 context.CancelFunc,以便能够在后台取消该 goroutine。

Scheduler 包对外表现的能力使用了 Scheduler 接口进行封装, 定义如下:

// Scheduler
type Scheduler interface {
	Add(ctx context.Context, job Job, interval time.Duration)
	Stop()
}

下面是 Schedule 的数据结构定义,它拥有的两个方法 Add()Stop()。Schedule 由 sync.WaitGroup() 和由 contex.CancelFunc 构成的切片组成。WaitGroup 用来在应用程序停止的时候平滑的关闭所有 goroutine。

// Schedule implements the Scheduler interface
type Schedule struct {
	wg        sync.WaitGroup
	cancelOps []context.CancelFunc
}

// NewSchedule create an instance of Schedule
func NewScheduler() Scheduler {
	s := Schedule{
		wg:        sync.WaitGroup{},
		cancelOps: make([]context.CancelFunc, 0),
	}
	return &s
}

Schedule 的两个方法实现如下,Stop 将会停止所有正在运行的 Job,并等待所有 goroutine 执行结束。

// Add starts a goroutine which constantly calls provided job
// with interval delay
func (s *Schedule) Add(ctx context.Context, job Job, interval time.Duration) {
	ctx, cancel := context.WithCancel(ctx)
	s.cancelOps = append(s.cancelOps, cancel)

	s.wg.Add(1)
	go s.process(ctx, job, interval)
}

// Stop cancels all running jobs
func (s *Schedule) Stop() {
	for _, cancel := range s.cancelOps {
		cancel()
	}
	s.wg.Wait()
}

Job 定义为函数类型,表示要执行的任务

// Job is a defined type which is just a simple func with context
type Job func(ctx context.Context)

当调用 Add() 方法添加一个任务时,启动一个 goroutine 来执行,该 goroutine 内部会监听两个信号:1. 打点器触发,执行任务;2. context.CancelFunc 触发,停止任务并退出。

// process executes the job by every ticker time, and stop job
// when context.Cancel called.
func (s *Schedule) process(ctx context.Context, job Job, interval time.Duration) {
	ticker := time.NewTicker(interval)

	for {
		select {
		case <-ticker.C:
			job(ctx)
		case <-ctx.Done():
			s.wg.Done()
			return
		}
	}
}

最后,切记在应用程序结束的时候要调用 Stop() 保证平滑退出。

See Also

Thanks to the authors 🙂


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK