5

Go定时任务源码 - robfig/cron

 1 year ago
source link: https://studygolang.com/articles/35970?fr=sidebar
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go定时任务源码 - robfig/cron

uuid · 大约21小时之前 · 269 次点击 · 预计阅读时间 3 分钟 · 大约8小时之前 开始浏览    

robfig/cron是Go语言实现的开源定时任务调度框架,核心代码是巧妙的使用chan + select + for实现了一个轻量级调度协程,不但语法简洁,而且具有很好的性能。

任务抽象(业务隔离):任务抽象成一个Job接口,业务逻辑类只需实现该接口

type Job interface {
  Run()
}

计划接口:通过当前时间计算任务的下次执行执行时间,具体实现类可以根据实际需求实现

type Schedule interface {
    Next(time.Time) time.Time
}

定时任务对象:保存执行的任务Job、计算执行时间

type Entry struct {
    ID       EntryID   // id
    Schedule Schedule  // 计划
    Next     time.Time // 下次执行时间
    Job      Job       // 任务
}

任务调度管理:保存定时任务对象(Entry),调度任务执行,提供新增、删除接口(涉及关联资源竞争)

// 任务管理类
type Cron struct {
    nextID  int64        // 生成entry自增ID
    entries []*Entry     // 保存Entry
    add     chan *Entry  // 添加
    remove  chan EntryID // 删除
}
// 删除
func (c *Cron) Remove(id EntryID) { 
    c.remove <- id
}
// 新增
func (c *Cron) Add(spec string, cmd Job) EntryID  { 
    entry := &Entry{
        ID:         EntryID(atomic.AddInt64(&c.nextID, 1)),
        Schedule:   ParseStandard(spec),
        Job:        cmd,
    }
    c.add <- entry
    return entry.ID
}

核心调度:计算下次执行时间 -> 排序 -> 取最早执行数据 -> timer 等待,因为只有一个协程在执行这个run的调度,所以不存在资源竞争,不需要加锁,另外考虑到执行任务可能涉及阻塞,例如:IO操作,所以一般startJob方法会开启协程执行

func (c *Cron) run() {
    now := time.Now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now) // 计算下次执行时间
    }
    for {
        sort.Sort(byTime(c.entries)) // 时间排序
        timer := time.NewTimer(c.entries[0].Next.Sub(now))
        select {
        case now = <-timer.C:
            for _, e := range c.entries {
                if e.Next.After(now) || e.Next.IsZero() {
                    break
                }
                c.startJob(e.Job) // 开协程执行
                e.Next = e.Schedule.Next(now) // 计算下次执行时间
            }
        case newEntry := <-c.add: // 新增
            timer.Stop()
            newEntry.Next = newEntry.Schedule.Next(now)
            c.entries = append(c.entries, newEntry)
        }
    ...
    }
}
// 执行任务
func (c *Cron) startJob(j Job) {
    go func() {
        j.Run()
    }()
}

启动时会开启唯一协程执行run方法,计算任务执行时间,执行,任务管理等

func New() *Cron {
    c := &Cron{
        entries: nil,
        add:     make(chan *Entry),
        remove:  make(chan EntryID),
    }
    return c
}
func (c *Cron) Start() {
    go c.run()
}
  1. 共享资源(定时任务)的管理和调度由唯一协程管理
  2. 通过for + select + channel来循环计算执行时间,监听任务到期、增删事件
  3. 执行任务会新启协程执行,不阻塞调度
  4. 采用扇入/扇出原理,多协程添加、增删任务调度协程(Fan In),调度启动新协程执行任务(Fan Out)
  5. 调度协程使用的是CSP并发模型思想

我的博客:https://itart.cn

原文地址:https://itart.cn/blogs/2022/explore/cron-source-code.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK