6

设计基于 Redis 的定时任务系统(ZSET + Scripting)

 2 years ago
source link: https://jysperm.me/2019/08/redis-cronjob-system/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
精子最近在 第一届烧火节 中体验了钻木取火,并制作了第一个 Vlog。
查看源代码

设计基于 Redis 的定时任务系统(ZSET + Scripting)

这篇文章会带领大家实现一个基于 Redis 的分布式高可用定时任务系统,其中的 worker 以 Node.js 为例,但其实可以使用任何语言来实现。这篇文章不会给出完整的代码,更侧重于探索的过程。

高可用设计

首先我们希望让 worker 是无状态的,这样会大幅减少对于 worker 的高可用需求,也不涉及到 worker 之间的数据同步或者选举。我们将所有的状态集中到 Redis 上,Redis 可以用 Master-Slave + Sentinel 的方式达到一个相对较高的可用性。

为什么不使用选举 Master 的方式?

有一种很常见的做法是在单个实例上完成所有的工作,这样甚至连 Redis 都不需要了。但为了保证高可用,往往会同时启动多个实例,然后引入一个「选举」的过程来决定谁是那个完成所有工作的人(称为 master),在 master 失效后,则需要重新进行选举,选出另外一个 master。

简单来说我觉得这种做法不够「分布式」,只有一个 master 在工作,其他实例只是热备而已,同时选举和对 master 失效的监测也是一个非常麻烦的事情。而在我们的方案中所有的 worker 都是平等的,都在执行任务,可以在任意时候创建或移除 worker。

对于一个定时任务系统来说,核心的工作是给定时任务们按时间排序,然后找到并等待下一个需要触发的任务。Redis 刚好为我们提供了 ZSET 这种支持排序的集合类型,ZSET 中存储着若干个互不相同的 member(字符串或二进制数据),每个 member 有一个相关联的 score(数字),整个 ZSET 会按照 score 排序,基于这样的数据结构提供了若干操作。

我们将定时任务的 ID 作为 member 放在一个叫 cronjobs 的 ZSET 里,使用下次触发时间作为 score 来排序,这样便得到了一个以下次触发时间排序的列表。

我们可以这样向系统中添加任务:

redisClient.zadd('cronjobs', nextTriggerAt(cronjobId), cronjobId)

ZRANGE cronjobs 0 -1 WITHSCORES 可以看到已添加的任务:

127.0.0.1:6379> ZRANGE cronjobs 0 -1 WITHSCORES
1) "2"
2) "1565453967832"
3) "1"
4) "1565453998742"

然后我们可以在 worker 中写一个无限循环,不断地检查这个 ZSET 中 score 最小(触发时间最早)的任务是否已经超过了当前时间,如果是的话就执行这个任务,并修改 ZSET 中的 score 为下次触发时间,Node.js 的代码如下:

while (true) {
  const [cronjobId, triggerAt] = await redisClient.zrange('cronjobs', 0, 0, 'WITHSCORES')

  if (parseInt(triggerAt) < Date.now()) {
    // ZADD CH 会返回被修改 member 数量,只有当成功修改了 score 我们才会继续执行,否则说明这个任务已经被其他的 worker 执行了
    if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
      // 异步地运行任务,避免「阻塞」核心循环
      runJob(cronjobId)
    }
  } else {
    // 等待下一个任务触发,如果距离下一个任务的触发少于 10 秒,则等待下一个任务执行,否则等待 10 秒后重试。
    await bluebird.delay(triggerAt ? Math.min(parseInt(triggerAt) - Date.now(), 10000) : 10000)
  }
}

上面的循环构成了这个定时任务系统最核心的部分,后面我们会逐渐地完善他。

为什么不用 Keyspace Notifications?

在社区中有很多文章推荐简单地使用 Keyspace Notifications 来实现「定时」,即为一个 key 设置一个过期时间,然后订阅这个 key 过期的事件。但这种方式主要的问题是 Redis 的 Pub/Sub 并不保证送达,如果刚好在这个 key 过期时 worker 不在线,那么这一次触发就不会生效;如果刚好有多个 worker 在线,那么这一次触发的任务也可能被执行多次。

而我们选择的基于 ZSET 的方式,需要 worker 主动修改 ZSET 中的下次触发时间,即使 worker 暂时不可用,在恢复时也会继续执行之前剩余的任务。

这样 Redis 就变成了系统的单点?

是这样的,这个系统中几乎全部的状态都存储于 Redis 上,可以说是系统中的单点。但相比于 worker,Redis(或其他的数据库)是一个更稳定、更标准化的组件。你可以用官方的 Master-Slave + Sentinel 方案来达到一个相对较高的可用性,你也可以使用由云服务厂商提供的托管 Redis 产品,避免自己来维护它。

CRON 表达式

前面的代码中我们并没有实现 nextTriggerAt,你可以用 cron-parser 这样的库去解析 CRON 表达式,计算下次触发时间:

const cronParser = require('cron-parser')

async function nextTriggerAt(cronjobId) {
  // 从 Redis 或其他数据库中根据 cronjobId 拉取定时任务的详情
  const cronjobInfo = await getCronjobInfo(cronjobId)
  return cronParser.parseExpression(cronjobInfo.cron).next().getTime()
}

中断任务处理

如果一个 Worker 意外退出,那么当时正在被它处理的所有任务都会永久性地丢失。为了避免这种情况,我们将正在执行的任务也存储到 Redis 中(一个叫 running 的 ZSET):

if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
    // 为每个任务生成一个随机的 uuid 以便能单独地追踪每个任务,例如打印到日志中
  await redisClient.zadd('running', Date.now() + 60000, `${cronjobId}:${uuid.v4()}`)
  runJob(cronjobId).finally( () => {
    // 在一个任务被完成时,我们还需要将它从 running 集合中取出
    redisClient.zrem('running', uniqueId)
  })
 }

如果你有一些为多实例应用编写代码的经验,那么可能会注意到这里存在一个竞态条件:对 cronjobs 和 running 的操作并不是原子的,可能会出现对 cronjobs 的操作成功了,随即 worker 意外退出,没有来得及写入 running 的情况。

因为这里我们需要对 ZADD 的返回值做判断,所以不能简单地使用 Redis 的 Pipeline 功能,而是要用到 Lua Script:

redisClient.defineCommand('startJob', {
  lua: `
    local cronjobId = ARGV[1]
    local jobName = ARGV[2]
    local nextTriggerAt = tonumber(ARGV[3])
    local timeoutAt = tonumber(ARGV[4])

    local changed = redis.call('ZADD', 'cronjobs', 'CH', nextTriggerAt, cronjobId)

    if changed ~= 0 then
      redis.call('ZADD', 'running', timeoutAt, jobName)
    end

    return changed
  `
})

经过修改后的核心循环:

const jobName = `${cronjobId}:${uuid.v4()}`

if (await redisClient.startJob(cronjobId, jobName, nextTriggerAt(cronjobId), Date.now() + 60000)) {
  runJob(cronjobId).finally( () => {
    redisClient.zrem('running', uniqueId)
  })
}

然后我们便可以添加另外一个循环,从 running 中拉取已经超时的任务进行重试或其他处理,这里不再给出具体的代码。

Lua Script

Lua Script 是 Redis 提供的一种类似事务能力,Redis 保证每个 Lua Script 都是串行执行的,中途不会有其他指令被执行,这提供了一种非常强的一致性保证。在实际的开发中,我们可以将需要一致性保证的逻辑写成 Lua Script。

我们不可避免地会对 worker 进程进行新版本的部署或其他维护,因此我们需要一种平滑的方式来关闭 worker 进程,让它继续执行已经收到的任务,但不去接受新的任务,在执行完当前的任务之后,主动退出。

在 Unix 中最正统的方式是实现自定义的 SIGINT 处理器来实现这个功能,即由终端模拟器、进程管理器或容器平台向程序发送 SIGINT 信号,程序即开始进行退出前的清理工作,然后待清理工作结束后,程序主动退出。当然进程管理器也有可能等不及,再发送一个强制结束的 SIGKILL。

所以我们需要将所有正在执行的任务注册到一个全局的 Promise 数组中,然后在受到 SIGINT 时停止接受新任务,并等待所有正在执行的任务完成后主动退出:

let runningJobs = []
let shuttingDown = false

process.on('SIGTERM', () => {
  shuttingDown = true

  // 等待 runningJobs 中所有的任务完成,无论成功还是失败
  Promise.all(runningJobs.map( p => p.catch(() => {}) )).then( () => {
    process.exit(0)
  })
})

修改核心循环,在开始任务时将 runJob 返回的 Promise 存入 runningJobs,然后在任务执行完时取出:

while (true) {
  if (shuttingDown) {
    break
  }

  // ...

  if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
    // 异步地运行任务,避免「阻塞」核心循环
    const jobPromise = runJob(cronjobId)

    jobPromise.finally( () => {
      _.pull(runningJobs, jobPromise)
    ))

    runningJobs.push(jobPromise)
  }

  // ...
}

容量的横向拓展

目前这个定时任务系统中的 worker 是可以无限拓展的,但 Redis 却是整个系统中的瓶颈,每个 worker 都需要从 Redis 获取任务来执行。按照我们对于 Redis 通常 70k QPS 的估计,按每个任务需要执行 5 个命令计算,整个系统可以支持每秒 14k 次任务触发,对于绝大部分的场景其实完全够用了。

如果要继续拓展的话,我的建议是根据业务上的一些区分(例如用户、任务类型)将队列分散到不同的 Key 上面(例如 userA:cronjobsuserB:cornjobs),这样便可以利用 Redis Cluster 的分片功能来进行扩展了。

我们用 ZSET 将定时任务按照触发时间排序,然后使用一个无限循环来拉取需要触发的任务,实现了一个分布式定时任务系统的核心部分,读者可以在此基础上根据自己的需要做进一步扩展。

本文甚至没有给出完整的代码,因此并不能直接地复制到你的项目中使用,更多地在于提出和讨论一种解决方案。社区中也有一些类似的开源组件可供选用,例如 Bull 是一个功能完整的任务队列,其中包括了定时任务功能,Bull 使用了和本文类似的 ZSET + Scripting 技术,使用 Redis 作为后端。

精子写了这么多年博客,收到的优秀评论少之又少,在这个属于 SNS 的时代也并不缺少向作者反馈的渠道。因此如果你希望撰写评论,请发邮件至 [email protected] 并注明文章标题,我会挑选对读者有价值的评论附加到文章末尾。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK