22

JavaScript 自定义限流队列 fetch - rxliuli blog

 3 years ago
source link: https://blog.rxliuli.com/p/df3e7f5aee43475fa81ca34e315e471f/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

为什么需要它

有些时候不得不需要限制并发 fetch 的请求数量,避免请求过快导致 IP 封禁

需要做到什么

  • 允许限制 fetch 请求同时存在的数量
  • 时间过久便认为是超时了

该方法的请求是无序的!

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前判断当前请求的数量,添加请求等待数量
    1. 如果请求数量已满,则进行等待
    2. 如果请求数量未满,则删除一个请求等待数量
  3. 请求完成,删除当前请求数量

等待队列:循环监听

该方法需要使用回调函数

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 使用 setInterval 函数持续监听队列和当前执行的请求数
    • 发现请求数量没有到达最大值,且等待队列中还有值,那么就执行一次请求

等待队列:触发钩子

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 添加完成,等待当前请求数量未满
  4. 尝试启动等待队列(钩子)

暂停请求实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise((resolve) => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;
//这是一个可以被reject的promise
var abortPromise = new Promise(function (resolve, reject) {
abortFn = function () {
reject("abort promise");
};
});
var abortablePromise = Promise.race([fetchPromise, abortPromise]);
setTimeout(function () {
abortFn();
}, timeout);

return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
this.waitCount = 0;
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
this.waitCount++;
await wait(() => this.execCount < this.limit);
this.waitCount--;
this.execCount++;
try {
return await promiseTimeout(fetch(url, init), this.timeout);
} finally {
this.execCount--;
}
}
}
1
2
3
4
5
6
7
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((i) =>
requestLimiting
._fetch("/")
.then((res) => console.log(res))
.catch((err) => console.log(err))
);

等待队列:循环监听实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise((resolve) => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;

//这是一个可以被reject的promise
var abortPromise = new Promise(function (resolve, reject) {
abortFn = function () {
reject("abort promise");
};
});

var abortablePromise = Promise.race([fetchPromise, abortPromise]);

setTimeout(function () {
abortFn();
}, timeout);

return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
// 等待队列
this.waitArr = [];

// 监视 execCount 的值
setInterval(async () => {
if (this.execCount >= this.limit) {
return;
}
console.debug(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`
);
const args = this.waitArr.shift(0);
if (!args) {
return;
}
this.execCount++;
const callback = args[2];
try {
// 如果没有错误就返回 res
callback({ res: await promiseTimeout(fetch(...args), this.timeout) });
} catch (err) {
// 否则返回 err
callback({
err: err,
});
} finally {
this.execCount--;
}
}, 100);
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @param {Function} callback 回调函数
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init, callback) {
this.waitArr.push(arguments);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
requestLimiting._fetch(
"/",
{
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i,
},
},
// 这里使用了回调函数,参数使用解构得到
({ res, err }) => {
console.log(`res: ${res}, err: ${err}`);
}
)
);

等待队列:触发钩子实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise((resolve) => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;
//这是一个可以被 reject 的 Promise
var abortPromise = new Promise(function (resolve, reject) {
abortFn = function () {
reject("abort promise");
};
});
// 有一个 Promise 完成就立刻结束
var abortablePromise = Promise.race([fetchPromise, abortPromise]);
setTimeout(function () {
abortFn();
}, timeout);
return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
// 等待队列
this.waitArr = [];
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
const _innerFetch = async () => {
console.log(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`
);
this.execCount++;
const args = this.waitArr.shift(0);
try {
return await promiseTimeout(fetch(...args), this.timeout);
} finally {
this.execCount--;
}
};
this.waitArr.push(arguments);
await wait(() => this.execCount < this.limit);
// 尝试启动等待队列
return _innerFetch();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
requestLimiting
._fetch("/", {
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i,
},
})
.then((res) => console.log(res))
.catch((err) => console.log(err))
);

目前而言,最后一种实现是最好的,同时实现了两种规范

  • 返回 Promise,避免使用回调函数
  • 请求执行与添加顺序相同

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK