7

使用benjamin-batchly实现Rust异步批处理 - alexheretic

 2 years ago
source link: https://www.jdon.com/60977
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用benjamin-batchly实现Rust异步批处理 - alexheretic
有时,与其同时做很多小事,不如将它们捆绑在一起,一次完成,作为一个批处理。所以在星期四早上的一个银行假期里,我很早就醒了(主要是因为我 1 岁男孩的尖叫声)并且(在尖叫声停止后)写了一个crate 来帮助做到这一点:benjamin_batchly

示例:插入数据库
数据库优化的一个反复出现的主题是减少对数据库的“旅行”。发送 1 条包含 N 项的胖消息通常比发送 N 条仅包含 1 项的瘦消息要快。

考虑一个我们想要插入数据库的 crud 风格的创建请求。

async fn handle_create_foo(db: &Db, data: CreateFooRequest) -> Result<(), Error> {
    let db_item = DbItem::from(data);
    db.insert(db_item).await?;
    Ok(())
}

因此,对于每个CreateFooRequest,我们将在我们的数据库中插入一个项目。我们可以使用BatchMutex对它们进行批处理。

use benjamin_batchly::{BatchMutex, BatchResult};

async fn handle_create_foo(
    batch_mutex: &BatchMutex<(), DbItem>,
    db: &Db,
    data: CreateFooRequest,
) -> Result<(), Error> {
    let db_item = DbItem::from(data);
    match batch_mutex.submit((), db_item).await {
        BatchResult::Work(mut batch) => {
            db.bulk_insert(&batch.items).await?;
            batch.notify_all_done();
            Ok(())
        }
        BatchResult::Done(_) => Ok(()),
        BatchResult::Failed => Err(Error::BatchFailed),
    }
}

提交到BatchMutex提供 3 个异步结果。

  • Work :一批 1 个或多个项目,包括提交的项目,应处理。
  • Done:提交的项目由另一个提交者批量处理并通知完成。
  • Failed:提交的项目由另一个提交者批量处理,但在通知完成之前被丢弃。

因此,如果在批处理进行时有 100 个CreateFooRequest请求进入,它们将在submit处等待。上一批完成后,所有等待提交的内容将成为下一批。1 次调用将返回BatchResult::Work并且在batch.notify_all_done()调用后 99 次将返回BatchResult::Done。

注意:在示例中,我使用单位类型 () 作为submit的第一个参数。这是可用于分区批处理的“批处理密钥”。只有使用相同批次密钥提交的项目才会捆绑在一起,这在更一般的情况下很有用。

注意(2):也支持单独的项目返回值,查看文档以获取更多信息。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK