5

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务分发)

 1 year ago
source link: https://rollingstarky.github.io/2023/01/12/node-js-design-patterns-message-broker-pattern-task-distribution/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务分发)

2023-01-12

| Program

| 0

|

11k

|

0:11

Distributing tasks to a set of consumers

将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumersfanout distributionventilator
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。

A messaging pipeline

ZeroMQ Fanout/Fanin 模式

分布式 hashsum 破解器

需要以下组件实现一个标准的并行管线:

  • 一个协调节点负责在多个工作节点间分发任务
  • 多个工作节点承担具体的计算任务
  • 一个用于收集计算结果的节点

The architecture of a typical pipeline with ZeroMQ

即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。

实现 producer

为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 ab 两个字母的话,长度为 3 的字符串组合共有图示的以下几种:
Indexed n-ary tree for alphabet (a, b)

indexed-string-variation 包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:

export function* generateTasks(searchHash, alphabet,
maxWordLength, batchSize) {
let nVariations = 0
for (let n = 1; n <= maxWordLength; n++) {
nVariations += Math.pow(alphabet.length, n)
}

console.log('Finding the hashsum source string over ' +
`${nVariations} possible variations`)

let batchStart = 1
while (batchStart <= nVariations) {
const batchEnd = Math.min(
batchStart + batchSize - 1, nVariations)
yield {
searchHash,
alphabet: alphabet,
batchStart,
batchEnd
}

batchStart = batchEnd + 1
}
}

producer.js:

import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)

const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await ventilator.send(JSON.stringify(task))
}
}

main().catch(err => console.log(err))
  • 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
  • 将每一个生成的任务字符串化,通过 PUSH socket 的 send() 方法发送给工作节点。工作节点以轮询的方式接收不同的任务
实现 worker

process Task.js:

import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task) {
const variationGen = isv.generator(task.alphabet)
console.log('processing from ' +
`${variationGen(task.batchStart)} (${task.batchStart})` +
`to ${variationGen(task.batchEnd)} (${task.batchEnd}`)

for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')

if (digest === task.searchHash) {
return word
}
}
}

processTask() 遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task 对象中的 searchHash 比较。

worker.js:

import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main() {
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()

fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')

for await (const rawMessage of fromVentilator) {
const found = processTask(JSON.parse(rawMessage.toString()))
if (found) {
console.log(`Found! => ${found}`)
await toSink.send(`Found: $found`)
}
}
}

main().catch(err => console.error(err))

worker.js 创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。

实现 results collector

collector.js:

import zmq from 'zeromq'

async function main() {
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')

for await (const rawMessage of sink) {
console.log('Message from worker: ', rawMessage.toString())
}
}

main().catch(err => console.error(err))

运行以下命令测试结果:

node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

AMQP 实现 pipeline 和 competing consumers

Task distribution architecture using a message queue broker

像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。

hashsum 破解器的 AMQP 实现

producer-amqp.js:

import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createConfirmChannel()
await channel.assertQueue('tasks_queue')

const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
}

await channel.waitForConfirms()
channel.close()
connection.close()
}

main().catch(err => console.error(err))
  • 此处创建的是一个 confirmChannel,它提供了一个 waitForConfirms() 函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接
  • channel.sendToQueue() 负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由

worker-amqp.js:

import amqp from 'amqplib'
import { processTask } from './processTask.js'

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('tasks_queue')
channel.consume(queue, async (rawMessage) => {
const found = processTask(
JSON.parse(rawMessage.content.toString()))
if (found) {
console.log(`Found! => ${found}`)
await channel.sendToQueue('results_queue',
Buffer.from(`Found: ${found}`))
}
await channel.ack(rawMessage)
})
}

main().catch(err => console.error(err))

collector-amqp.js:

import amqp from 'amqplib'

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('results_queue')
channel.consume(queue, msg => {
console.log(`Message from worker: ${msg.content.toString()}`)
})
}

main().catch(err => console.error(err))

运行如下命令测试效果:

node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

通过 Redis Streams 实现任务分发

Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。

A Redis Stream consumer group

Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。

producer-redis.js:

import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()

const [, , maxLength, searchHash] = process.argv

async function main() {
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await redisClient.xadd('tasks_stream', '*',
'task', JSON.stringify(task))
}

redisClient.disconnect()
}

main().catch(err => console.error(err))

worker-redis.js:

import Redis from 'ioredis'
import { processTask } from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
await redisClient.xgroup('CREATE', 'tasks_stream',
'workers_group', '$', 'MKSTREAM')
.catch(() => console.log('Consumer group already exists'))

const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'STREAMS',
'tasks_stream', '0')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}

while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
}
}

async function processAndAck(recordId, rawTask) {
const found = processTask(JSON.parse(rawTask))
if (found) {
console.log(`Found! => ${found}`)
await redisClient.xadd('results_stream', '*', 'result',
`Found: ${found}`)
}

await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))
  • xgroup 命令用来确保 consumer group 存在。
    • CREATE 表示我们希望创建一个 consumer group
    • tasks_stream 表示我们想要读取的 stream 的名字
    • workers_group 是 consumer group 的名字
    • 第四个参数表示 consumer group 开始读取的记录的位置。$ 表示当前 stream 中最后一条记录的 ID
    • MKSTREAM 表示如果 stream 不存在则创建它
  • 通过 xreadgroup 命令读取属于当前 consumer 的所有 pending 的记录。
    • 'GROUP''workers_group'consumerName 用来指代 consumer group 和 consumer 的名字
    • STREAMStasks_stream 用来指代我们想要读取的 stream 的名字
    • 0 用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
  • 通过另外一条 xreadgroup 命令读取 stream 里新增加的记录。
    • 'BLOCK''0' 两个参数表示如果没有新的消息,就一直阻塞等待。'0' 具体表示一直等待永不超时
    • 'COUNT''1' 表示一次请求只获取一条记录
    • 特殊 ID > 表示只获取还没有被当前的 consumer group 处理过的消息
  • processAndAck() 函数负责当 xreadgroup() 返回的记录被处理完成时,调用 xack 命令进行确认,将该记录从当前 consumer 的 pending 列表里移除

collector-redis.js:

import Redis from 'ioredis'

const redisClient = new Redis()

async function main() {
let lastRecordId = '$'
while (true) {
const data = await redisClient.xread(
'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
for (const [, logs] of data) {
for (const [recordId, [, message]] of logs) {
console.log(`Message from worker: ${message}`)
lastRecordId = recordId
}
}
}
}

main().catch(err => console.error(err))

运行程序测试效果:

node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK