25

分布式延时任务调度系统设计与 Golang 实现

 3 years ago
source link: https://mp.weixin.qq.com/s/bDb1xY2CFT0bIgOUWpoROA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

延时应用场景

之前的文章分享了分布式任务调度系统负载均衡方案: 分布式任务调度系统分发及负载均衡实现方案

一个完整的任务调度系统,对 延时 任务的支持必不可少。延时任务、延迟消息、延迟队列基本语境和实现类似,那么它有哪些适用场景呢?最常见的如:用户下单xx分钟内未付款订单自动取消,释放库存;订单发货后xx天自动确认收货;订单结束后xx天自动评价;用户注册后1min内触发xx动作等。

延时解决方案

延时作为常见的需求自然有众多解决方案,数据库轮询是最容易想到的一个方案,时间轮,小顶堆,有序链表,延时队列以及各类开源项目也是琳琅满目。了解每种解决方案的原理以及优缺点,可以帮助在生产中做好技术选型。

1.数据库轮询

最简单且容易想到的方案是后台启动 定时脚本 ,定时轮询扫描数据库获取满足条件数据并处理, 这种方案实现简单有效

时间处理精度问题,linux系统crontab最小是 1分钟 ,如果需要更细时间粒度可以通过脚本 for{} 无限循环轮询数据库,总执行时间为50秒,每次轮询后sleep10秒 ,类似操作可达成更小时间粒度。

此方案项目初级比较有效,但也有较多 弊端:

  • 轮询粒度不好把控,轮询间隔时间过长影响精准度,过短又会产生大量不必要的数据库扫描,增加数据库压力;

  • 随着数据量增大此方案存在较大性能瓶颈;

  • 延时任务过多也会造成定时脚本不易维护。

2.延迟消息队列

2.1RabbitMQ队列

RabbitMQ本身不支持延时消息,但可通过 死信队列及死信路由 设置间接达成。

uEBVRnB.png!mobile

TTL(Time to live)分消息TTL和队列TTL,控制消息超时时间,消息在队列中生存时间一旦超过TTL设置时间即成为dead letter(死信),然后通过Dead letter exchange死信路由交换机来重新路由消息。

uENriia.png!mobile

方案分析

利用成熟RabbitMQ消息组件,稳定、易扩展、支持分布式,消息支持持久化可靠性好。但消息的延时时间需要保持一致,死信队列还是先进先出,如果先进的队列由于未到执行时间会阻塞所有后入消息,因此一种延时时间需要建一套路由。

除死信队列方案外还有一些RabbitMQ的插件可以实现延时,具体可下载插件:

rabbitmq_delayed_message_exchange

2.2 RocketMQ

RocketMQ是支持延时消息的,且足够高效可靠,但 延迟消息的时间不是任意时间,而是仅支持18个固定的时间段 ,这里不再赘述。

3.时间轮算法

时间轮算法是实现延时最常用的算法,这里重点介绍它的实现方案。

3.1实现原理

可以想象一个时钟的表盘,有一个指针绕着转动,每走一个格子称为一个 刻度 (时间间隔interval),表盘每个格子上挂载待执行 任务列表 (任务桶buckets),指针转动一圈 长度 (bucketSize),这些元素构成一个时间轮。

ZnaEZ3q.png!mobile

如果刻度是1s,总长度是60s,那么转一圈就是1分钟,可以实现1分钟内的延时。要实现更长时间跨度,可将总长度设置更大,但这会造成占用内存过大,更多空转浪费资源。有两种优化方案,使用 多层时间轮 多级时间轮

  • 多层时间轮就是增加 圈数circle ,一圈代表60s,那么10圈就是10分钟。

  • 多级时间轮可以想象成时钟的 时针、分针、秒针 ,一级到达后执行二级,再到三级,直到满足执行任务。

iUVVBj3.png!mobile

3.2具体代码

定义时间轮结构如下:

type TimeWheel struct {
ticker *time.Ticker //ticker
interval time.Duration //time duration of moving one slot.
buckets []*list.List //bucket list
bucketSize int //total size of bucket
currentPos int //current position in buckets
callbackFunc func(interface{}) //execute func
stopChannel chan bool //stop the ticker channel
}

定时器触发使用 time.Ticker ,它是Go自身实现的内置定时器,基于 最小堆 结构实现。 Buckets存放任务列表, 使用双向链表 container/list 结构,注意它非线程安全。

新建一个时间轮实例:

//create timewheel instance
func New(interval time.Duration, bucketSize int, callbackFunc func(interface{})) (*TimeWheel, e
rror) {
if interval <= 0 || bucketSize <= 0 || callbackFunc == nil {
return nil, errors.New("create timewheel instance fail")
}
tw := &TimeWheel{
interval: interval,
buckets: make([]*list.List, bucketSize),
bucketSize: bucketSize,
currentPos: 0,
callbackFunc: callbackFunc,
stopChannel: make(chan bool),
}
//init bucket,every bucket will have a list
for i := 0; i < bucketSize; i++ {
tw.buckets[i] = list.New()
}
return tw, nil
}

定义任务Task结构体,并添加任务。为了构造多层时间轮,给任务添加circle代表该任务在第几圈。pos代表任务在当前表盘上的位置。

//define task
type Task struct {
Id interface{} //task id global uniqueness
Data interface{} //data of task
Delay time.Duration //delay time, 30 means after 30 second
Circle int //task position in timewheel
}
//add task
func (tw *TimeWheel) AddTask(task *Task) {
delaySeconds := int(task.Delay.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle := int(delaySeconds / intervalSeconds / tw.bucketSize)
pos := int(tw.currentPos+delaySeconds/intervalSeconds) % tw.bucketSize
task.Circle = circle
tw.buckets[pos].PushBack(task)
}

启动时间轮,每经过一刻度(这个刻度可以是1s、5s任意),做一次检查,如果当前格里有任务则取出执行,碰到多圈任务将circle-1。当指针走到末尾代表走完一圈,会重置再从头执行。

//start timewheel
func (tw *TimeWheel) Start() {
//add ticker
tw.ticker = time.NewTicker(tw.interval)
//receive chan
go func() {
for {
select {
case <-tw.ticker.C: //reach a tick
log.Println("1 tick")
tw.tickHandler()
case <-tw.stopChannel: //true
tw.ticker.Stop() //stop the ticker
return
}
}
}()
}
//1 tick handler
func (tw *TimeWheel) tickHandler() {
bucket := tw.buckets[tw.currentPos]
for e := bucket.Front(); e != nil; {
task := e.Value.(*Task) //e.value is a task
if task.Circle > 0 {
task.Circle--
e = e.Next()
continue
}
//do task
go tw.callbackFunc(task.Data)
//remove e
next := e.Next()
bucket.Remove(e)
e = next
}
//finish 1 circle,reset
if tw.currentPos == tw.bucketSize-1 {
log.Println("new circle")
tw.currentPos = 0
} else {
tw.currentPos++
}
}

测试时间轮一圈10s,间隔刻度1s,添加延时12s的延时任务,第13s后执行任务。

func TestTimeWheel(t *testing.T) {
tw, err := New(1*time.Second, 10, func(data interface{}) {
log.Println("do task", data)
})
if err != nil {
t.Error(err)
}
log.Println("start timewheel...")
tw.Start()
task := Task{Id: 1, Data: "test1", Delay: 12 * time.Second}
tw.AddTask(&task)
time.Sleep(20 * time.Second)
}

执行效果:

ZnQZ7vU.png!mobile

3.3 更多细节考虑

3.3.1 长时间跨度的解决方案

由于时间跨度越大轮子越大,会占用更多内存,所以可以考虑采用 磁盘文件+内存时间轮 相结合的方案。内存时间轮只加载1小时的任务,磁盘文件可以时间命名(2020101721代表2020年10月17日21:00-21:59:59所有延时任务),每小时一个文件,一天24个,一般情况不会保存太多文件。

3.3.2 内存时间轮的高可用性

因为采用内存时间轮,如果程序崩溃会导致数据丢失。将时间轮持久化保存成文件存储,到达时间后预加载到内存,程序崩溃、重启后也可以重新加载,文件保存可保障数据不会丢失,当然也可保存在redis或其他持久化存储中。

除内存时间轮外也可以直接使用 redis的list结构 替代container/list, redis的string 结构保存时间轮当前指针。

考虑恢复时间轮后需要确认哪些未执行,那么可以在执行的时候记录 成功执行日志 记录执行位置偏移

考虑是否执行成功,按at least once语义可以再发送/执行一次,需要下游保障幂等。

3.3.3 任务执行方式

call back如果仅是发送消息等毫秒级完成还可以,如果是 执行http/rpc调用且较慢将会拖垮整 个延时任务系统,所以不要在callback做重任务,可以将到达延时的任务统一放到待发送MQ中,异步执行。

3.3.4 分布式集群任务分发

单个时间轮处理任务能力有限,任务量大可以对任务数据分片处理,开启 多个时间轮并行处理 。在任务添加时,根据Id取模或hash分片,保存在不同的时间轮文件中。

2020101721_0
2020101721_1
2020101721_2
...
2020101721_9

每小时再分10个任务片,分别由10个时间轮加载。

3.4 方案分析

时间轮方案执行效率高,时间精度高,但内存时间轮重启或宕机后需要考虑持久化和消费标记,集群扩展实现也较复杂。

4.排序链表算法

要使用排序链表数据结构,最先想到的就是redis的sorted set结构,这里以redis有序集合为基础来实现延时。

4.1 实现原理

redis有序集合zset结构是一个有序链表,可以通过 zadd 向链表添加元素,并将其 score 设置为延时任务执行的时间戳,值设为任务id。然后通过 zrange 获取链表第一个元素( 认是score最小元素 ),通过判断score和当前时间大小,决定是否到达执行时间。

4.2 具体代码

按时间轮设计思想定义一个带定时器的结构体:

//define bucket ticker
type BucketTicker struct {
Ticker *time.Ticker
Interval time.Duration
Name string
CallbackFunc func(interface{}) bool
}
//new ticker
func New(interval time.Duration, bucketName string, callbackFunc func(interface{}) bool) (*Buck
etTicker, error) {
if interval <= 0 || callbackFunc == nil {
return nil, errors.New("create bucket ticker instance fail")
}
bucket := &BucketTicker{
Interval: interval,
Name: bucketName,
CallbackFunc: callbackFunc,
}
return bucket, nil
}

定义任务及添加方法,将任务的执行时间(当前时间+延时时间)和任务唯一Id存到zset结构中,将任务主体序列化存到kv结构(string)中。

//define task
type Task struct {
Id string //task id global uniqueness
Data interface{} //data of task
Delay time.Duration //delay time, 30 means after 30 second
Timestamp int
}
//add task
func (bucket *BucketTicker) AddTask(task *Task) error {
//task id and delay time in redis zset
timestamp := time.Now().Add(task.Delay).Unix()
err := redisclient.ZAdd(bucket.Name, int(timestamp), task.Id)
if err != nil {
return err
}
//task body in redis string
data, err := json.Marshal(task)
if err != nil {
return err
}
err = redisclient.Set(task.Id, string(data))
if err != nil {
return err
}
return nil
}

启动定时器,每隔一个刻度,检查是否有满足执行时间的任务。间隔时间越长,可以减少与redis查询频率,但 延时任务处理精度 会降低。

func (bucket *BucketTicker) Start() {
timer := time.NewTicker(bucket.Interval) //interval
go func() {
for {
select {
case t := <-timer.C:
log.Println("1 tick")
bucket.tickHandler(t, bucket.Name)
}
}
}()
}
//tick handler
func (bucket *BucketTicker) tickHandler(currentTime time.Time, bucketName string) {
for {
task, err := getTask(bucketName)
if err != nil {
log.Println("error happen!", err)
return
}
if task == nil { //no task
return
}
//not arrival execution time
if task.Timestamp > int(currentTime.Unix()) {
return
}
//do task
taskDetail, err := getTaskDetail(task.Id)
if err != nil { //retry
log.Println("error happen!", err)
continue
}
//if callback success, remove finish task
if ok := bucket.CallbackFunc(taskDetail.Data); ok {
err = removeTask(bucketName, task.Id)
if err != nil {
continue
}
} else {
log.Println("error happen!", errors.New("callback error"))
continue //retry
}
return
}
}

getTask(),getTaskDetail()和removeTask()分别执行Redis操作。

//get task from redis zset
func getTask(bucketName string) (*Task, error) {
value, err := redisclient.ZRangeFirst(bucketName) //ZRANGE key 0 0 WITHSCORES
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}
timestamp := int(value[0].(float64))
taskId := value[1].(string)
task := Task{
Id: taskId,
Timestamp: timestamp,
}
return &task, nil
}
//get task detail by taskId
func getTaskDetail(taskId string) (*Task, error) {
v, err := redisclient.Get(taskId)
if err != nil {
return nil, err
}
if v == "" {
return nil, nil
}
task := Task{}
err = json.Unmarshal([]byte(v), &task)
if err != nil {
return nil, err
}
return &task, nil
}
//remove the task
func removeTask(bucketName string, taskId string) error {
err := redisclient.ZRem(bucketName, taskId)
if err != nil {
return err
}
err = redisclient.Del(taskId)
if err != nil {
return err
}
return nil
}

编写测试用例测试,添加2个延时任务分别是延时5秒和延时8秒。

func TestRedisDelay(t *testing.T) {
delay, err := New(1*time.Second, "test", func(data interface{}) bool {
log.Println("do task ", data)
return true
})
if err != nil {
t.Error(err)
}
log.Println("start ticker...")
delay.Start()


task1 := Task{Id: "1", Data: "task1", Delay: 5 * time.Second}
task2 := Task{Id: "2", Data: "task2", Delay: 8 * time.Second}
delay.AddTask(&task1)
delay.AddTask(&task2)
time.Sleep(10 * time.Second)
}

执行效果如下:

VrMB3a.png!mobile

4.3  分布式集群任务分片

当有更多延时任务时,考虑存储多个bucket,每个bucket有自己的定时器,执行自己的任务列表。当有任务添加时,轮询加入不同 bucket中。

4.4 方案分析

由于依赖比较成熟的组件redis,高可用程序挂掉重启后仍可继续处理,集群分片拓展也容易。但由于每次都取出数据比对score,会有频繁Redis IO操作,造成较大的资源浪费。

5.总结

延时方案方案除上述几种外还有最小堆的形式,文中提到的Go内置定时器即采用四叉堆结构,其实现原理与排序链表大同小异。

选择何种方案根据 业务场景和业务规模 而定。数据库轮询方案简单实用,在业务初期非常合适。延时队列方案实现简单,可以结合队列一起使用。当这些都不能满足业务时,再考虑自建延时系统,可以采用时间轮方案或有序链表方案。

文章相关代码请关注公众号 “ 技术岁月 ,发送关键字“ 延时任务 ”获取。

fuaQny.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK