Go 每日一库之 cron
source link: https://studygolang.com/articles/29537
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
简介
cron
一个用于管理定时任务的库,用 Go 实现 Linux 中 crontab
这个命令的效果。之前我们也介绍过一个类似的 Go 库—— gron
。 gron
代码小巧,用于学习是比较好的。但是它功能相对简单些,并且已经不维护了。如果有定时任务需求,还是建议使用 cron
。
快速使用
文本代码使用 Go Modules。
创建目录并初始化:
$ mkdir cron && cd cron $ go mod init github.com/darjun/go-daily-lib/cron
安装 cron
,目前最新稳定版本为 v3:
$ go get -u github.com/robfig/cron/v3
使用:
package main import ( "fmt" "time" "github.com/robfig/cron/v3" ) func main() { c := cron.New() c.AddFunc("@every 1s", func() { fmt.Println("tick every 1 second") }) c.Start() time.Sleep(time.Second * 5) }
使用非常简单,创建 cron
对象,这个对象用于管理定时任务。
调用 cron
对象的 AddFunc()
方法向管理器中添加定时任务。 AddFunc()
接受两个参数,参数 1 以字符串形式指定触发时间规则,参数 2 是一个无参的函数,每次触发时调用。 @every 1s
表示每秒触发一次, @every
后加一个时间间隔,表示每隔多长时间触发一次。例如 @every 1h
表示每小时触发一次, @every 1m2s
表示每隔 1 分 2 秒触发一次。 time.ParseDuration()
支持的格式都可以用在这里。
调用 c.Start()
启动定时循环。
注意一点,因为 c.Start()
启动一个新的 goroutine 做循环检测,我们在代码最后加了一行 time.Sleep(time.Second * 5)
防止主 goroutine 退出。
运行效果,每隔 1s 输出一行字符串:
$ go run main.go tick every 1 second tick every 1 second tick every 1 second tick every 1 second tick every 1 second
时间格式
与Linux 中 crontab
命令相似, cron
库支持用 5 个空格分隔的域来表示时间。这 5 个域含义依次为:
-
Minutes
:分钟,取值范围[0-59]
,支持特殊字符* / , -
; -
Hours
:小时,取值范围[0-23]
,支持特殊字符* / , -
; -
Day of month
:每月的第几天,取值范围[1-31]
,支持特殊字符* / , - ?
; -
Month
:月,取值范围[1-12]
或者使用月份名字缩写[JAN-DEC]
,支持特殊字符* / , -
; -
Day of week
:周历,取值范围[0-6]
或名字缩写[JUN-SAT]
,支持特殊字符* / , - ?
。
注意,月份和周历名称都是不区分大小写的,也就是说 SUN/Sun/sun
表示同样的含义(都是周日)。
特殊字符含义如下:
-
*
:使用*
的域可以匹配任何值,例如将月份域(第 4 个)设置为*
,表示每个月; -
/
:用来指定范围的 步长 ,例如将小时域(第 2 个)设置为3-59/15
表示第 3 分钟触发,以后每隔 15 分钟触发一次,因此第 2 次触发为第 18 分钟,第 3 次为 33 分钟。。。直到分钟大于 59; -
,
:用来列举一些离散的值和多个范围,例如将周历的域(第 5 个)设置为MON,WED,FRI
表示周一、三和五; -
-
:用来表示范围,例如将小时的域(第 1 个)设置为9-17
表示上午 9 点到下午 17 点(包括 9 和 17); -
?
:只能用在月历和周历的域中,用来代替*
,表示每月/周的任意一天。
了解规则之后,我们可以定义任意时间:
-
30 * * * *
:分钟域为 30,其他域都是*
表示任意。每小时的 30 分触发; -
30 3-6,20-23 * * *
:分钟域为 30,小时域的3-6,20-23
表示 3 点到 6 点和 20 点到 23 点。3,4,5,6,20,21,22,23 时的 30 分触发; -
0 0 1 1 *
:1(第 4 个) 月 1(第 3 个) 号的 0(第 2 个) 时 0(第 1 个) 分触发。
记熟了这几个域的顺序,再多练习几次很容易就能掌握格式。熟悉规则了之后,就能熟练使用 crontab
命令了。
func main() { c := cron.New() c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println("On the half hour of 3-6am, 8-11pm") }) c.AddFunc("0 0 1 1 *", func() { fmt.Println("Jun 1 every year") }) c.Start() for { time.Sleep(time.Second) } }
预定义时间规则
为了方便使用, cron
预定义了一些时间规则:
-
@yearly
:也可以写作@annually
,表示每年第一天的 0 点。等价于0 0 1 1 *
; -
@monthly
:表示每月第一天的 0 点。等价于0 0 1 * *
; -
@weekly
:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0
; -
@daily
:也可以写作@midnight
,表示每天 0 点。等价于0 0 * * *
; -
@hourly
:表示每小时的开始。等价于0 * * * *
。
例如:
func main() { c := cron.New() c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) c.AddFunc("@daily", func() { fmt.Println("Every day on midnight") }) c.AddFunc("@weekly", func() { fmt.Println("Every week") }) c.Start() for { time.Sleep(time.Second) } }
上面代码只是演示用法,实际运行可能要等待非常长的时间才能有输出。
固定时间间隔
cron
支持固定时间间隔,格式为:
@every <duration>
含义为每隔 duration
触发一次。 <duration>
会调用 time.ParseDuration()
函数解析,所以 ParseDuration
支持的格式都可以。例如 1h30m10s
。在快速开始部分,我们已经演示了 @every
的用法了,这里就不赘述了。
时区
默认情况下,所有时间都是基于当前时区的。当然我们也可以指定时区,有 2 两种方式:
- 在时间字符串前面添加一个
CRON_TZ=
+ 具体时区,具体时区的格式在之前carbon
的文章中有详细介绍。东京时区为Asia/Tokyo
,纽约时区为America/New_York
; - 创建
cron
对象时增加一个时区选项cron.WithLocation(location)
,location
为time.LoadLocation(zone)
加载的时区对象,zone
为具体的时区格式。或者调用已创建好的cron
对象的SetLocation()
方法设置时区。
示例:
func main() { nyc, _ := time.LoadLocation("America/New_York") c := cron.New(cron.WithLocation(nyc)) c.AddFunc("0 6 * * ?", func() { fmt.Println("Every 6 o'clock at New York") }) c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() { fmt.Println("Every 6 o'clock at Tokyo") }) c.Start() for { time.Sleep(time.Second) } }
Job
接口
除了直接将无参函数作为回调外, cron
还支持 Job
接口:
// cron.go type Job interface { Run() }
我们定义一个实现接口 Job
的结构:
type GreetingJob struct { Name string } func (g GreetingJob) Run() { fmt.Println("Hello ", g.Name) }
调用 cron
对象的 AddJob()
方法将 GreetingJob
对象添加到定时管理器中:
func main() { c := cron.New() c.AddJob("@every 1s", GreetingJob{"dj"}) c.Start() time.Sleep(5 * time.Second) }
运行效果:
$ go run main.go Hello dj Hello dj Hello dj Hello dj Hello dj
使用自定义的结构可以让任务携带状态( Name
字段)。
实际上 AddFunc()
方法内部也调用了 AddJob()
方法。首先, cron
基于 func()
类型定义一个新的类型 FuncJob
:
// cron.go type FuncJob func()
然后让 FuncJob
实现 Job
接口:
// cron.go func (f FuncJob) Run() { f() }
在 AddFunc()
方法中,将传入的回调转为 FuncJob
类型,然后调用 AddJob()
方法:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { return c.AddJob(spec, FuncJob(cmd)) }
线程安全
cron
会创建一个新的 goroutine 来执行触发回调。如果这些回调需要并发访问一些资源、数据,我们需要显式地做同步。
自定义时间格式
cron
支持灵活的时间格式,如果默认的格式不能满足要求,我们可以自己定义时间格式。时间规则字符串需要 cron.Parser
对象来解析。我们先来看看默认的解析器是如何工作的。
首先定义各个域:
// parser.go const ( Second ParseOption = 1 << iota SecondOptional Minute Hour Dom Month Dow DowOptional Descriptor )
除了 Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)
外,还可以支持 Second
。相对顺序都是固定的:
// parser.go var places = []ParseOption{ Second, Minute, Hour, Dom, Month, Dow, } var defaults = []string{ "0", "0", "0", "*", "*", "*", }
默认的时间格式使用 5 个域。
我们可以调用 cron.NewParser()
创建自己的 Parser
对象,以位格式传入使用哪些域,例如下面的 Parser
使用 6 个域,支持 Second
(秒):
parser := cron.NewParser( cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, )
调用 cron.WithParser(parser)
创建一个选项传入构造函数 cron.New()
,使用时就可以指定秒了:
c := cron.New(cron.WithParser(parser)) c.AddFunc("1 * * * * *", func () { fmt.Println("every 1 second") }) c.Start()
这里时间格式必须使用 6 个域,顺序与上面的 const
定义一致。
因为上面的时间格式太常见了, cron
定义了一个便捷的函数:
// option.go func WithSeconds() Option { return WithParser(NewParser( Second | Minute | Hour | Dom | Month | Dow | Descriptor, )) }
注意 Descriptor
表示对 @every/@hour
等的支持。有了 WithSeconds()
,我们不用手动创建 Parser
对象了:
c := cron.New(cron.WithSeconds())
选项
cron
对象创建使用了选项模式,我们前面已经介绍了 3 个选项:
-
WithLocation
:指定时区; -
WithParser
:使用自定义的解析器; -
WithSeconds
:让时间格式支持秒,实际上内部调用了WithParser
。
cron
还提供了另外两种选项:
-
WithLogger
:自定义Logger
; -
WithChain
:Job 包装器。
WithLogger
WithLogger
可以设置 cron
内部使用我们自定义的 Logger
:
func main() { c := cron.New( cron.WithLogger( cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) c.AddFunc("@every 1s", func() { fmt.Println("hello world") }) c.Start() time.Sleep(5 * time.Second) }
上面调用 cron.VerbosPrintfLogger()
包装 log.Logger
,这个 logger
会详细记录 cron
内部的调度过程:
$ go run main.go cron: 2020/06/26 07:09:14 start cron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00 hello world cron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00 cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00 hello world cron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00 cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00 cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00 hello world cron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0
我们看看默认的 Logger
是什么样的:
// logger.go var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, false} } func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, true} } type printfLogger struct { logger interface{ Printf(string, ...interface{}) } logInfo bool }
WithChain
Job 包装器可以在执行实际的 Job
前后添加一些逻辑:
panic Job Job Job
我们可以将 Chain
类比为 Web 处理器的中间件。实际上就是在 Job
的执行逻辑外在封装一层逻辑。我们的封装逻辑需要写成一个函数,传入一个 Job
类型,返回封装后的 Job
。 cron
为这种函数定义了一个类型 JobWrapper
:
// chain.go type JobWrapper func(Job) Job
然后使用一个 Chain
对象将这些 JobWrapper
组合到一起:
type Chain struct { wrappers []JobWrapper } func NewChain(c ...JobWrapper) Chain { return Chain{c} }
调用 Chain
对象的 Then(job)
方法应用这些 JobWrapper
,返回最终的`Job:
func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len(c.wrappers)-i-1](j) } return j }
注意应用 JobWrapper
的顺序。
内置 JobWrapper
cron
内置了 3 个用得比较多的 JobWrapper
:
-
Recover
:捕获内部Job
产生的 panic; -
DelayIfStillRunning
:触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行; -
SkipIfStillRunning
:触发时,如果上一次任务还未完成,则跳过此次执行。
下面分别介绍。
Recover
先看看如何使用:
type panicJob struct { count int } func (p *panicJob) Run() { p.count++ if p.count == 1 { panic("oooooooooooooops!!!") } fmt.Println("hello world") } func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{})) c.Start() time.Sleep(5 * time.Second) }
panicJob
在第一次触发时,触发了 panic
。因为有 cron.Recover()
保护,后续任务还能执行:
go run main.go cron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=... goroutine 18 [running]: github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0) D:/code/golang/pkg/mod/github.com/robfig/cron/[email protected]/chain.go:45 +0xbc panic(0x4cf380, 0x513280) C:/Go/src/runtime/panic.go:969 +0x174 main.(*panicJob).Run(0xc0000140e8) D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xba github.com/robfig/cron/v3.Recover.func1.1() D:/code/golang/pkg/mod/github.com/robfig/cron/[email protected]/chain.go:53 +0x6f github.com/robfig/cron/v3.FuncJob.Run(0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/[email protected]/cron.go:136 +0x2c github.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/[email protected]/cron.go:312 +0x68 created by github.com/robfig/cron/v3.(*Cron).startJob D:/code/golang/pkg/mod/github.com/robfig/cron/[email protected]/cron.go:310 +0x7a hello world hello world hello world hello world
我们看看 cron.Recover()
的实现,很简单:
// cron.go func Recover(logger Logger) JobWrapper { return func(j Job) Job { return FuncJob(func() { defer func() { if r := recover(); r != nil { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] err, ok := r.(error) if !ok { err = fmt.Errorf("%v", r) } logger.Error(err, "panic", "stack", "...\n"+string(buf)) } }() j.Run() }) } }
就是在执行内层的 Job
逻辑前,添加 recover()
调用。如果 Job.Run()
执行过程中有 panic
。这里的 recover()
会捕获到,输出调用堆栈。
DelayIfStillRunning
还是先看如何使用:
type delayJob struct { count int } func (d *delayJob) Run() { time.Sleep(2 * time.Second) d.count++ log.Printf("%d: hello world\n", d.count) } func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{})) c.Start() time.Sleep(10 * time.Second) }
上面我们在 Run()
中增加了一个 2s 的延迟,输出中间隔变为 2s,而不是定时的 1s:
$ go run main.go 2020/06/27 14:11:16 1: hello world 2020/06/27 14:11:18 2: hello world 2020/06/27 14:11:20 3: hello world 2020/06/27 14:11:22 4: hello world
看看源码:
// chain.go func DelayIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var mu sync.Mutex return FuncJob(func() { start := time.Now() mu.Lock() defer mu.Unlock() if dur := time.Since(start); dur > time.Minute { logger.Info("delay", "duration", dur) } j.Run() }) } }
首先定义一个该任务共用的互斥锁 sync.Mutex
,每次执行任务前获取锁,执行结束之后释放锁。所以在上一个任务结束前,下一个任务获取锁是无法成功的,从而保证的任务的串行执行。
SkipIfStillRunning
还是先看看如何使用:
type skipJob struct { count int32 } func (d *skipJob) Run() { atomic.AddInt32(&d.count, 1) log.Printf("%d: hello world\n", d.count) if atomic.LoadInt32(&d.count) == 1 { time.Sleep(2 * time.Second) } } func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{})) c.Start() time.Sleep(10 * time.Second) }
输出:
$ go run main.go 2020/06/27 14:22:07 1: hello world 2020/06/27 14:22:10 2: hello world 2020/06/27 14:22:11 3: hello world 2020/06/27 14:22:12 4: hello world 2020/06/27 14:22:13 5: hello world 2020/06/27 14:22:14 6: hello world 2020/06/27 14:22:15 7: hello world 2020/06/27 14:22:16 8: hello world
注意观察时间,第一个与第二个输出之间相差 3s,因为跳过了两次执行。
注意 DelayIfStillRunning
与 SkipIfStillRunning
是有本质上的区别的,前者 DelayIfStillRunning
只要时间足够长,所有的任务都会按部就班地完成,只是可能前一个任务耗时过长,导致后一个任务的执行时间推迟了一点。 SkipIfStillRunning
会跳过一些执行。
看看源码:
func SkipIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var ch = make(chan struct{}, 1) ch <- struct{}{} return FuncJob(func() { select { case v := <-ch: j.Run() ch <- v default: logger.Info("skip") } }) } }
定义一个该任务共用的缓存大小为 1 的通道 chan struct{}
。执行任务时,从通道中取值,如果成功,执行,否则跳过。执行完成之后再向通道中发送一个值,确保下一个任务能执行。初始发送一个值到通道中,保证第一个任务的执行。
总结
cron
实现比较小巧,且优雅,代码行数也不多,非常值得一看!
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue:smile:
参考
- cron GitHub: https://github.com/robfig/cron
- Go 每日一库之 carbon: https://darjun.github.io/2020/02/14/godailylib/carbon/
- Go 每日一库之 gron: https://darjun.github.io/2020/04/20/godailylib/gron/
- Go 每日一库 GitHub: https://github.com/darjun/go-daily-lib
我
我的博客: https://darjun.github.io
欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~
欢迎关注我们的微信公众号,每天学习Go知识
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK