2

Node.js 设计模式笔记 —— 由 Promises 和 Async、Await 实现的异步模式

 1 year ago
source link: https://rollingstarky.github.io/2022/11/12/node-js-design-patterns-promises-and-async/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Node.js 设计模式笔记 —— 由 Promises 和 Async、Await 实现的异步模式

2022-11-12

| Program

| 0

|

12k

|

0:12

回调函数(Callbacks)是 Node.js 中异步编程的底层构件,但它们远远达不到对用户友好的程度。对于实现代码中最常见的串行控制流,一个未经训练的开发者很容易陷入到 callback hell 问题中。即便实现是正确的,该串行控制流也会显得不必要的复杂和脆弱。

为了获得更好的异步编程体验,第一个出现的就是 promise,一种保存了异步操作的状态和最终结果的对象。Promise 可以轻易地被串联起来,实现串行控制流,可以像其他任何对象一样自由地转移。Pormise 大大简化了异步代码,后来在此基础上又有了 asyncawait,能够令异步代码看起来就像是同步代码一样。

Promises

Promises 是 ECMAScript 2015 标准(ES6)的一部分,为传递异步结果提供了一种健壮的解决方案,替代原本的 CPS 样式的回调函数。Promise 能够令所有主要的异步控制流更加易读、简洁和健壮。

Promise 是一种用来代表异步操作的最终结果(或错误)的对象。在专业术语中,当异步操作未完成时,我们称 Promise 是 pending 的;当异步操作成功结束时,Promise 是 fulfilled 的;当异步操作因为错误终止时,Promise 是 rejected 的;当 Promise 或者是 fulfilled 或者是 rejected,则将其认定为 settled

Promise 对象的 then() 方法可以获取成功执行后的结果或者终止时报出的错误:

promise.then(onFulfilled, onRejected)

其中 onFulfilled 是一个回调函数,最终会接收到 Promise 成功时的值;onRejected是另一个回调函数,最终会接收 Promise 异常终止时的值(如果有的话)。

基于回调函数的如下代码:

asyncOperation(arg, (err, result) => {
if (err) {
// handle the error
}
// do stuff with the result
})

Promise 实现上述同样的功能,则更加优雅、结构化:

asyncOperationPromise(arg)
.then(result => {
// do stuff with result
}, err => {
// handle the error
})

asyncOperationPromise() 会返回一个 Promise,可以被用来获取最终结果的值或者失败的原因。但最为关键的属性是,then() 方法会同步地返回另一个 Promise。
更进一步地,如果 onFulfilled 或者 onRejected 函数返回一个值 x,那么 then() 方法返回的 Promise 会有以下行为:

  • x 是一个值,则 then() 返回的 Promise 使用 x 作为自身完成时的值
  • x 是一个 Promise 且成功完成,则 x 完成时返回的值作为 then() 返回的 Promise 完成时的值
  • x 是一个 Promise 且因为错误终止,则 x 终止的原因作为 then() 返回的 Promise 终止的原因

上述行为能够令我们将多个 promise 连接成链,轻松地将异步操作聚合在一起。如果我们没有指定一个 onFulfilled 或者 onRejected handler,Promise 完成时的值或者终止时的原因都会自动地传递给链条中的下一个 Promise。通过 Promise 链,任务的执行顺序突然变得很简单。

asyncOperationPromise(arg)
.then(result1 => {
// return another promise
return asyncOperationPromise(arg2)
})
.then(result2 => {
// return a value
return 'done'
})
.then(undefined, err => {
// any error in the chain is caught here
})

promise API

Promise 构造函数(new Promise((resolve, reject) => {}))会创建一个新的 Promise 实例,其完成还是终止取决于作为参数传入的函数的行为。
作为参数传入的函数接收如下两个参数:

  • resolve(obj):resolve 是一个函数,在调用时为 Promise 提供完成时的值。当 obj 是值时,则 obj 本身作为 Promise 完成时的值;当 obj 是另一个 Promise 时,则 obj 完成时的值作为当前 Promise 完成时的值
  • reject(err):Promise 因为 err 终止
function delay(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(new Date())
}, milliseconds)
})
}

console.log(`${new Date().getSeconds()}s\nDelaying...`)
delay(1000)
.then(newDate => {
console.log(`${newDate.getSeconds()}s`)
})

Promise 最重要的静态方法:

  • Promise.resolve(obj):从另一个 Promise、thenable 对象或者值创建一个新的 Promise
  • Promise.reject(err):创建一个 Promise,该 Promise 会因为 err 终止
  • Promise.all(iterable):从一个可迭代对象创建 Promise,若该 iterable 中的每一项都提供了一个 fulfill 值,则 Promise 最终以包含这些值的列表作为 fulfill 值;若其中有任意一项 reject,则 Promise.all() 返回的 Promise 以第一个 reject 的 err 终止
  • Promise.allSettled(iterable):此方法会等待所有输入的 Promise 或者 fulfill 或者 reject,之后返回一个包含所有 fulfill 值和 reject 原因的列表
  • Promise.race(iterable):返回可迭代对象中第一个 fulfill 或 reject 的 Promise

Promise 关键的实例方法:

  • promise.catch(onRejected):实际上就是 promise.then(undefined, onRejected) 的语法糖
  • promise.finally(onFinally):允许我们设置一个 onFinally 回调函数,在 promise fulfill 或者 reject 时调用

顺序执行意味着,每次只执行一系列任务中的一个,完成后再依次执行后面的任务。这一系列任务的先后顺序必须是预先定义好的,因为一个任务的结果有可能影响后续任务的执行。

An example of sequential execution flow with three tasks

上述执行流程有着不同形式的变种:

  • 顺序执行一系列已知的任务,不需要在它们之间传递数据
  • 前一个任务的输出作为后一个任务的输入(chainpipelinewaterfall
  • 迭代任务集合,同时在每个元素上一个接一个地运行异步任务

package.json

{
"name": "03-promises-web-spider-v2",
"version": "1.0.0",
"private": true,
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"cheerio": "^1.0.0-rc.3",
"mkdirp": "^0.5.1",
"superagent": "^5.2.2",
"slug": "^1.1.0"
},
"engines": {
"node": ">=14"
},
"engineStrict": true
}

spider.js

import {promises as fsPromises} from 'fs'
import {dirname} from 'path'
import superagent from 'superagent'
import mkdirp from 'mkdirp'
import {urlToFilename, getPageLinks} from './utils.js'
import {promisify} from 'util'

const mkdirpPromises = promisify(mkdirp)

function download(url, filename) {
console.log(`Downloading ${url}`)
let content
return superagent.get(url)
.then((res) => {
content = res.text
return mkdirpPromises(dirname(filename))
})
.then(() => fsPromises.writeFile(filename, content))
.then(() => {
console.log(`Downloaded and saved: ${url}`)
return content
})
}

function spiderLinks(currentUrl, content, nesting) {
let promise = Promise.resolve()
if (nesting === 0) {
return promise
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
promise = promise.then(() => spider(link, nesting - 1))
}

return promise
}

export function spider(url, nesting) {
const filename = urlToFilename(url)
return fsPromises.readFile(filename, 'utf8')
.catch((err) => {
if (err.code !== 'ENOENT') {
throw err
}

// The file doesn't exist, so let’s download it
return download(url, filename)
})
.then(content => spiderLinks(url, content, nesting))
}

spider-cli.js

import {spider} from './spider.js'

const url = process.argv[2]
const nesting = Number.parseInt(process.argv[3], 10) || 1

spider(url, nesting)
.then(() => console.log('Download complete'))
.catch(err => console.error(err))

utils.js

import {join, extname} from 'path'
import {URL} from 'url'
import slug from 'slug'
import cheerio from 'cheerio'

function getLinkUrl(currentUrl, element) {
const parsedLink = new URL(element.attribs.href || '', currentUrl)
const currentParsedUrl = new URL(currentUrl)
if (parsedLink.hostname !== currentParsedUrl.hostname ||
!parsedLink.pathname) {
return null
}
return parsedLink.toString()
}

export function urlToFilename(url) {
const parsedUrl = new URL(url)
const urlPath = parsedUrl.pathname.split('/')
.filter(function (component) {
return component !== ''
})
.map(function (component) {
return slug(component, {remove: null})
})
.join('/')
let filename = join(parsedUrl.hostname, urlPath)
if (!extname(filename).match(/htm/)) {
filename += '.html'
}

return filename
}

export function getPageLinks(currentUrl, body) {
return Array.from(cheerio.load(body)('a'))
.map(function (element) {
return getLinkUrl(currentUrl, element)
})
.filter(Boolean)
}

node spider-cli.js http://www.baidu.com 2

其中的 spiderLinks() 函数通过循环动态地构建了一条 Promise 链:

  • 先定义一个“空的” Promise 对象(resovle 到 undefined),这个空 Promise 只是作为链条的起点
  • 在循环中,不断将 promise 变量更新为新的 Promise 对象(通过调用上一个 Promise 的 then() 方法得到)。这就是 Promise 的异步遍历模式

for 循环的最后,promise 变量会是最后一个 then() 方法返回的 Promise,因而只有当链条中的所有 Promise 都 resolve 时,promise 才会 resolve。

纵观所有代码,我们可以不需要像使用 callback 那样,强制地包含众多错误传递逻辑。因而大大减少了代码量和出错的机会。

在某些情况下,一系列异步任务的执行顺序并不重要,我们需要的只是当所有的任务都完成后能收到通知。

An example of parallel execution with three tasks

虽然 Node.js 是单线程的,但得益于其 non-blocking nature,我们仍可以实现并发行为。

An example of how asynchronous tasks run in parallel

比如我们有一个 Main 函数需要执行两个异步任务:

  • Main 函数首先触发异步任务 Task1 和 Task2 的执行。异步任务触发后,会将程序控制权立即交还给 Main 函数,再转交给 event loop
  • 当 Task1 中的异步任务结束时,event loop 调用 Task1 的回调函数,将控制权交给 Task1。Task1 执行完成自身内部的同步指令,通知 Main 函数并返还控制权
  • 当 Task2 中的异步任务结束时,event loop 调用 Task2 的回调函数,将控制权交给 Task2。在 Task2 的终点,Main 函数再次被通知。Main 函数得知 Task1 和 Task2 全部结束,继续执行或者返回结果

简单来说,在 Node.js 中,我们只能并发地执行异步操作,因为它们的并发行为是由内部的非阻塞 API 控制的。同步(阻塞)操作无法并发地执行,除非它们的执行与异步操作交织在一起,或者由 setTimeout()setImmediate() 包裹。

Promise 实现并发执行流,可以借助内置的 Promise.all() 方法。该方法会返回一个新的 Promise,只有当所有传入的 Promise 都 fulfill 时,新 Promise 才会 fulfill。如果传入的 Promise 之间没有因果关系,这些 Promise 就会并发地执行。

对于前面的 spider 应用,只需要将 spiderLinks() 函数改为如下形式:

function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return Promise.resolve()
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}

Async/await

Promise 链相对于 callback hell 来说肯定是要好太多的,但是我们仍然需要调用 then() 方法,以及为链条中的每一个任务创建新的函数,对于日常编程中非常普遍的控制流来说还是比较麻烦。而 Async/await 可以帮助我们写出像同步代码一样可读性强、容易理解的异步代码。
Async 函数是一种特殊的函数,在函数体里面可以使用 await 表达式“暂停”任意一个 Promise 的执行,将控制权交还给 async 函数的调用者,等该 Promise revolve 后再返回到暂停的地方继续执行。

function delay(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(new Date())
}, milliseconds)
})
}

async function playingWithDelays() {
console.log('Initial date: ', new Date())
const dateAfterOneSecond = await delay(1000)
console.log('Date after one second: ', dateAfterOneSecond)

const dateAfterThreeSeconds = await delay(3000)
console.log('Date after 3 secnods: ', dateAfterThreeSeconds)
return 'done'
}

playingWithDelays()
.then(result => {
console.log(`After 4 seconds: ${result}`)
})

Async/await 的另一个巨大的优势在于,它能够标准化 try...catch 代码块的行为,不管是针对同步代码中的 throw,抑或是异步代码中的 Promise reject。

function delayError(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
})
})
}

async function playingWithErrors(throwSyncError) {
try {
if (throwSyncError) {
throw new Error('This is a synchronous error')
}
await delayError(1000)
} catch (err) {
console.log(`We have an error: ${err.message}`)
} finally {
console.log('Done')
}
}

// playingWithErrors(true)
playingWithErrors(false)

借助 Async/await,可以对之前的 spider 应用实现很多优化。比如 download() 函数:

async function download(url, filename) {
console.log(`Downloading ${url}`)
const {text: content} = await superagent.get(url)
await mkdirpPromises(dirname(filename))
await fsPromises.writeFile(filename, content)
console.log(`Downloaded and saved: ${url}`)
return content
}

整段代码行数大大减少,看起来也很“平整”,没有任何层级和缩进。

接下来是 spiderLinks() 函数,使用 async/await 异步地遍历一个列表:

async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
await spider(link, nesting - 1)
}
}

然后是 spider() 函数,如何简单地通过 try...catch 处理错误,令异步代码更加易读:

export async function spider(url, nesting) {
const filename = urlToFilename(url)
let content
try {
content = await fsPromises.readFile(filename, 'utf8')
} catch (err) {
if (err.code !== 'ENOENT') {
throw err
}
content = await download(url, filename)
}
return spiderLinks(url, content, nesting)
}

使用纯 async/await 实现并行的异步执行流程,可以参考如下代码:

async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
for (const promise of promises) {
await promise
}
}

然而上述代码存在一定的问题。如果列表中有一个 Promise reject 了,我们不得不等待列表中其他所有的 Promise 都 resolve,spiderLinks() 函数返回的 Promise 才会 reject。这种行为在多数情况下都是不理想的。
我们通常都会想要在操作发生错误的第一时间捕获错误信息。因而并行执行异步操作,最后仍建议使用下面形式的代码:

async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK