9

为开源项目 go-gin-api 增加后台任务模块

 3 years ago
source link: https://segmentfault.com/a/1190000040633391
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

任务管理界面 (WEB)

01.png

支持在 WEB 界面 中对任务进行管理,例如:新增任务编辑任务启用/禁用任务手动执行任务 等。

任务的属性包括:

    • SHELL
    • HTTP
  • 表达式(/5 *)
  • 超时时间(秒)
  • 重试间隔(秒)
  • 执行结束是否通知

    • 结果关键字匹配通知

当执行方式为 HTTP 时,支持选择请求方式 GETPOST

当设置执行结束通知时,支持选择通知方式 邮件 或 Webhook

当设置邮件通知时,支持输入邮箱地址多个用,分割;

当设置结果关键字匹配通知时,支持输入关键字多个用,分割;

任务增加完成后,会把任务数据持久化到 MySQL 中。

任务调度器

参考了两个开源组件:

最终选择使用 jakecoffman/cron ,后者是在前者的基础上做了一定的补充,例如 AddFunc() 增加了 name 参数,同时还增加了 RemoveJob(name string) 支持删除特定的任务。

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func(), name string) {
    c.AddJob(spec, FuncJob(cmd), name)
}

...

// RemoveJob removes a Job from the Cron based on name.
func (c *Cron) RemoveJob(name string) {
    if !c.running {
        i := c.entries.pos(name)

        if i == -1 {
            return
        }

        c.entries = c.entries[:i+copy(c.entries[i:], c.entries[i+1:])]
        return
    }

    c.remove <- name
}

对其简单封装下就可以使用了,下面是封装的方法,方法的具体实现与使用从 go-gin-api 中获取。

type Server interface {
    i()

    // Start 启动 cron 服务
    Start()

    // Stop 停止 cron 服务
    Stop()

    // AddTask 增加定时任务
    AddTask(task *cron_task_repo.CronTask)

    // RemoveTask 删除定时任务
    RemoveTask(taskId int)

    // AddJob 增加定时任务执行的工作内容
    AddJob(task *cron_task_repo.CronTask) cron.FuncJob
}

当调用 Start() 启动服务时,会把 MySQL 中的任务列表加载到调度器中。

通过以上方法,当从 WEB 界面 操作 新增、编辑、启用/禁用、手动执行任务时,可以动态的对调度器中的任务进行管理。

任务执行器

任务执行器指的是任务真实执行所在的机器。

我的思路是使用 Kafka 的发布与订阅功能,当调度器发现需要执行的任务时,将任务信息写到 KafkaTopic 中,任务执行器订阅相关的 Topic 获取任务信息然后执行任务。

如果任务的执行方式为 HTTP,那么任务执行器可以为一组集群,专门处理调用 HTTP 任务,这里可以为一个消费组(Consumer Group),也可适具体场景而定。

如果任务的执行方式为 SHELL,那么任务执行器必须在脚本所在的宿主机上,这里可以为一个具体任务的消费者。

如果任务量过多,可以考虑根据业务场景多设置几个 Topic

在项目中为了便于演示,不写入到 Kafka 中,仅记录了日志。

func (s *server) AddJob(task *cron_task_repo.CronTask) cron.FuncJob {
    return func() {
        s.taskCount.Add()
        defer s.taskCount.Done()

        msg := fmt.Sprintf("开始执行任务:(%d)%s [%s]", task.Id, task.Name, task.Spec)
        s.logger.Info(msg)
    }
}

日志目录:/logs/go-gin-api-cron.log

本文纯属抛砖引玉,有问题,欢迎批评指正。

go-gin-api 项目安装简单,开箱即用,创建一个后台任务试试吧。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK