9

Rust中实现 API 速率限制

 6 months ago
source link: https://www.jdon.com/72638.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Rust中实现 API 速率限制

在本教程中,我们将采用 "滑动窗口 "算法,通过一个动态周期来检查请求历史记录,并使用一个基本的内存哈希表来存储用户及其请求时间。我们还将了解如何使用 tower-governor 为您配置速率限制。

1、简单的滑动窗口速率限制器
为了弄清楚如何从头开始实现这一目标,让我们从头开始编写一个基于 IP 的滑动窗口速率限制器。

首先,我们将使用 cargo init 初始化一个常规项目,然后根据提示选择 Axum 作为我们的首选框架。

我们还需要一些额外的依赖项,因此先用这个 shell 片段安装它们:
cargo add [email protected] -F derive
cargo add [email protected] -F serde,clock

我们将声明一个新结构,该结构保存 IpAddr 键的 HashMap,其值为 Vec<DateTime<Utc>>(UTC 时区时间戳向量)。

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::net::IpAddr;
use chrono::{DateTime, Utc};

// 这将是用户访问端点的请求限制(每分钟)
// 如果用户试图超出此限制,我们应返回错误信息
const REQUEST_LIMIT: usize = 120;

#[derive(Clone, Default)]
pub struct RateLimiter {
    requests: Arc<Mutex<HashMap<IpAddr, Vec<DateTime<Utc>>>>>,
}

首先,我们要使用 .lock() 锁定哈希表,这样就有了写入权限。

然后,我们要使用 .entry() 函数检查哈希表中是否包含我们要检查的 IP 地址,然后根据长度是否低于请求限制,通过保留有效时间戳和推送新条目来修改哈希表。

然后,我们检查条目长度是否大于请求限制--如果是,则返回错误;如果不是,则返回 Ok(())。

impl RateLimiter {
    fn check_if_rate_limited(&self, ip_addr: IpAddr) -> Result<(), String> {
        // 我们只保存 60 秒前的时间戳
        let throttle_time_limit = Utc::now() - std::time::Duration::from_secs(60);

let mut requests_hashmap = self.requests.lock().unwrap();

let mut requests_for_ip = requests_hashmap
            // 在此处抓取条目,并允许我们对其进行就地修改
            .entry(ip_addr)
            // 如果条目为空,则插入一个带有当前时间戳的 vec
            .or_insert(Vec::new());

requests_for_ip.retain(|x| x.to_utc() > throttle_time_limit);
        requests_for_ip.push(Utc::now());

if requests_for_ip.len() > REQUEST_LIMIT {
            return Err("IP is rate limited :(".to_string());
        }

Ok(())
    }
}

下面是一个如何使用的基本示例:

use std::net::Ipv4Addr;

fn main() {
let rate_limiter = RateLimiter::default();

let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

// 在此,我们请求 120 次 - 我们的请求上限
    for _ in 1..80 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

// wait 30 seconds
    std::thread::sleep(std::time::Duration::from_secs(30));

// 在此再申请 40 次,以满足配额要求
    for _ in 1..40 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

// 再等 30 秒
    std::thread::sleep(std::time::Duration::from_secs(30));

// 现在我们可以再提出 80 项请求
    for _ in 1..80 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }
}

在此基础上,如果我们想将其扩展到与 Axum 一起使用,也是可以的。不过,生产就绪的速率限制系统通常要比这先进得多。下面我们将讨论如何利用板条箱进行速率限制,包括使用基于用户的速率限制。

2、实施基于用户的速率限制
对于没有登录的对外网站,IP 地址是唯一可以用来跟踪用户的信息(除了浏览器信息)。不过,基于已验证用户而非 IP 地址的速率限制可能更有用。在使用 IP 地址时,您可能会遇到以下问题:

  • 多个用户可能拥有相同的 IP 地址
  • 如果你阻止了用户(通过代理或其他方法),用户可以简单地更改他们使用的 IP 地址

使用基于用户的速率限制可以解决这些问题。虽然用户可以拥有多个 IP 地址,但我们可以将其分配给同一个用户。

具体实施
为了初始化我们的网络服务,我们将使用 cargo shuttle init(需要安装 cargo-shuttle)来创建我们的项目,并确保选择 Axum 作为框架。

在添加速率限制器之前,我们要创建一个自定义头密钥!这将用于需要用户身份验证的路由中。我们还可以在稍后为速率限制器实施自定义密钥提取器时使用头。我们首先要添加 axum-extra,并使用键入式头信息功能:

cargo add axum-extra -F typed-header

接下来,我们要创建一个结构来保存字符串,并实现 headers::Header 的 axum_extra 重导出。你可以看到下面的 Header 实现,它通过遍历 HeaderValue 来解码值,并创建 CustomHeader 结构。

我们可以从定义 HeaderName 开始:

static X: HeaderName = HeaderName::from_static("x-custom-key");
static CUSTOM_HEADER: &HeaderName = &X;

pub struct CustomHeader(String);

impl CustomHeader {
    pub fn key(self) -> String {
        self.0
    }
}

既然我们已经定义了自定义头名称(将用作头关键字),我们就可以为 CustomHeader 实现 axum_extra::headers::Header 了:

impl Header for CustomHeader {
    fn name() -> &'static HeaderName {
        CUSTOM_HEADER
    }

fn decode<'i, I>(values: &mut I) -> Result<Self, axum_extra::headers::Error>
    where
        I: Iterator<Item = &'i HeaderValue>,
    {
        let value = values
            .next()
            .ok_or_else(axum_extra::headers::Error::invalid)?;

Ok(CustomHeader(value.to_str().unwrap().to_owned()))
    }

fn encode<E>(&self, values: &mut E)
    where
        E: Extend<HeaderValue>,
    {
        let s = &self.0;

let value = HeaderValue::from_str(s).unwrap();

values.extend(std::iter::once(value));
    }
}

要将 CustomHeader 用作 Axum 提取器,我们需要像这样用 TypedHeader 将其包裹起来:

async fn register(
    TypedHeader(header): TypedHeader<CustomHeader>,
) -> impl IntoResponse {
    // .. your code goes here
}

这一切都很好,但这与费率限制有什么关系呢?

虽然我们可以在中间件中使用它,但更好的替代解决方案是使用 tower_governor。该板块使用通用小区速率算法(GCRA),它是漏斗算法的更复杂版本。有关 GCRA 的更多信息,请点击此处here

要开始使用,我们先将该板块添加到 Rust 程序中:
cargo add tower-governor

当我们要将其添加到主函数时,可以使用 GovernorConfigBuilder,然后将其添加到 GovernorLayer 中。请注意,虽然GovernorConfigBuilder没有实现克隆,但添加塔式服务层需要它实现克隆。这意味着我们需要将配置生成器封装起来,然后再使用 Box::leak 将封装泄露出去,以获得静态生命周期的 GovernorConfig,供我们的 axum::Router 使用:

use auxm::{Router, routing::get};
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

#[shuttle_runtime::main]
async fn main() -> shuttle_axum::ShuttleAxum {
    let governor_conf = Box::new(
        GovernorConfigBuilder::default()
            .per_second(2)
            .burst_size(5)
            .finish()
            .unwrap(),
    );

let router = Router::new()
        .route("/", get(hello_world))
        .layer(GovernorLayer {
            // We can leak this because it is created once and then never needs to be destructed
            config: Box::leak(governor_conf),
        });

Ok(router.into())
}

默认情况下,GovernorConfigBuilder 使用一种名为 PeerIpKeyExtractor 的类型,它试图抓取连接客户端的 IP 密钥。不过,要使用我们的头作为提取的密钥,我们可以实现 tower_governor::key_extractor::KeyExtractor。为此,我们将使用一个单元结构,因为当我们稍后将其添加到 GovernorConfigBuilder 时,目前不需要任何额外的变量:

use tower_governor::GovernorError;
use axum::http::Request;

#[derive(Clone)]
pub struct CustomHeaderExtractor;

impl KeyExtractor for CustomHeaderExtractor {
    type Key = String;

fn extract<T>(&self, req: &Request<T>) -> Result<Self::Key, GovernorError> {
        let headers = req.headers();

match headers.get(CUSTOM_HEADER) {
            Some(res) => {
                let res = res.to_str()
                    .map_err(|_| GovernorError::UnableToExtractKey)?;

Ok(res.to_owned())
            },
            None => Err(GovernorError::UnableToExtractKey)
        }
    }
}

这样,我们就可以在主函数中将 CustomHeaderExtractor 添加到我们的 GovernorConfigBuilder 中。

let governor_conf = Box::new(
    GovernorConfigBuilder::default()
        .per_second(2)
        .burst_size(5)
        .key_extractor(CustomHeaderExtractor)
        .finish()
        .unwrap(),
);

当用户尝试访问任何与治理层分层的路由时,现在它会尝试获取一个标头名称为 x-custom-key 的标头--如果不存在,路由将返回错误。在这里,我们设置了限制,允许用户每 2 秒发送 5 个请求。

请注意,在生成器中,per_second() 函数会告诉我们补充配额的确切间隔时间,而 burst_size 则会告诉我们在 tower-governor 开始阻止来自给定 IP 地址(或 API 密钥,在我们的例子中)的请求之前的配额是多少。

我们还可以额外设置 per_millisecond() 和 per_nanosecond()参数,这样如果想每半秒补充一次配额,就可以在生成器中使用 per_millisecond(500)。

部署
现在我们已经完成了部署,你可以使用 cargo shuttle deploy 进行部署(如果是在脏 Git 分支上,则添加--ad),然后观看奇迹的发生。部署完成后,Shuttle 会在终端输出部署的详细信息。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK