2

Rust Runtime 设计与实现-组件篇

 2 years ago
source link: https://www.ihcblog.com/rust-runtime-design-4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本系列文章主要介绍如何设计和实现一个基于 io-uring 的 Thread-per-core 模型的 Runtime。

我们的 Runtime 最终产品 Monoio 现已开源,你可以在 github.com/bytedance/monoio 找到它。

本文是系列的第四篇,前面该讲的设计基本讲完了,这里主要说说 channel 等组件。

Channel in Go and in Rust

在我们熟悉的 Golang 中,所有 Task 一定是 goroutine,通信基本是靠 channel 的。但是在 Rust 中,Future 和 Task 不一定是一个东西,多个 Future 可以通过 select 等嵌套,然后其本质上还是单个 Task,性能上也相比 Golang 的模型要好。

但是即便是如此,在 Rust 中我们还是有很强的使用 channel 的需求。在 thread-per-core 模型下,我们可以使用仅本线程的 channel 来提升性能。

这部分的内容已经开源为 local-sync crate,它并不依赖我们的 Runtime,你可以在合适的场景独立使用它。

https://github.com/monoio-rs/local-sync

MPSC Channel

MPSC = Multiple Producer Single Consumer。

我们的 mpsc channel 需要提供两种模式,一个是 bounded 一个是 unbounded。

在 bounded 模式下,我们的 send 可能会 await,因为可能容量不够写入,需要等消费。

在 unbounded 模式下,send 不需要等待。因为不限制存储空间,所以即便消费端卡住,我们在空间不够时也可以随意地开辟新的空间来存储。

当然,因为 channel 里可能没有数据,所以两种模式下的 recv 都是异步接口。

最终实现效果:

#[monoio::test]
async fn tets_unbounded_channel() {
let (tx, mut rx) = channel();
tx.send(1).unwrap();
assert_eq!(rx.recv().await.unwrap(), 1);


drop(tx);
assert_eq!(rx.recv().await, None);
}

存储数据结构

我们需要先设计底层存储数据结构。为了能够低成本地扩容,并且不产生大量内存申请和释放,我们设计出了两种方案可以选择。

链表 + 固定大小 Block

storage1

这种设计下每个 block 大小一致,block 之间使用单向链表维护。

读数据时从 head 的 begin 位置读,写数据时向 tail 的 end 位置写。当一块数据被度完时,它会被插入到 tail 之后继而可以循环使用,避免重复释放和分配。
这样当缓存数据量增多时会付出 数据峰值 / BLOCK_SIZE 次的内存分配。

一个额外的好处是,我们可以全局缓存 Block 继而进一步优化性能。

链表 + 指数扩大 Block

storage1

这个方案也是使用链表维护 blocks,理解起来可以认为平滑扩容,即扩容时仅写入新的 ring,旧 ring 在消费完时再释放。

每次 ring 扩容即可扩容为原有的两倍大小,这样可以在峰值流量下只需要付出 log(数据峰值) 次内存分配。

结合实现复杂度,我们最后决定采用方案1:链表 + 固定大小 Block。Tokio 的 mpsc 也是这种结构,不过它这么做更多的是为了无锁并发考虑。

多 Block 通过单向链表组成 Queue,Queue 提供 push 和 pop 能力(无大小限制)。

Queue 在 push 和 pop 时会保证指针指向内容的合法性,在适当的时机会移动 block 指针。

我们的等待需要两种:

  1. Bounded 实现下,发送者需要等待空槽
  2. 接收者需要等待数据

第二种等待非常易与实现:因为 mpsc 本身就只有一个 consumer,当它需要等待数据时,只需要将自己对应 waker 设置在 channel 中,当有数据写入时,会额外检查这个 waker 并 wake 它。

第一种等待的实现略复杂,复杂度在于可以有一堆等待者。并且为了避免饥饿,还需要先到先得地唤醒。那么显然这个结构可以直接利用 VecDeque 或链表存储。当等待者被 drop 的时候,还需要将自己摘掉,这样一来就只有类似 Slab 或者链表可以使用了。

我们这里参考 Tokio 的做法,使用链表存储。将这个结构抽象为 Semaphore,既可以满足 mpsc 之用,又可以单独暴露给用户使用,还可以拿来实现其他同步结构。

layers

前面有了 Block,我们在 Block 的基础上构建了与异步无关的同步存储 Queue,它是无限容量的。

前面也提到有两种等待,但发送方的等待只有在 bounded 的情况下使用。

所以我们在 Queue 的基础上抽象出一个底层结构 Chan,提供大家的共用的逻辑,只实现第二种等待:

  1. 暴露 recv 接口,允许等待。
  2. 暴露同步 send 接口,上层调用者需要自己维护容量限制和 sender 等待。

Chan 的基础上我们要封装出 bounded 和 unbounded 两种实现。unbounded 下这个约等于原样转发,因为我们的 Chan 本身就可以理解为一个 unbounded channel。

对于 bounded channel,我们需要实现容量限制和发送方的唤醒机制。这部分就要引入前一小节提到的 semaphore。我们利用 semaphore 管理和等待空槽数量(同时还顺便拿来管理接收者状态)。

所以我们在 Chan 层面集成 semaphore。由于上层需求不同,所以这里 Semaphore 其实是个 trait。对于 bounded channel,我们直接使用 Semaphore 实现;对于 unbounded,我们可以实现一个空的 Semaphore。

pub trait Semaphore {
fn add_permits(&self, n: usize);
fn close(&self);
fn is_closed(&self) -> bool;
}


impl Semaphore for crate::semaphore::Inner {
fn add_permits(&self, n: usize) {
self.release(n);
}


fn close(&self) {
crate::semaphore::Inner::close(self);
}


fn is_closed(&self) -> bool {
crate::semaphore::Inner::is_closed(self)
}
}


pub struct Unlimited {
closed: UnsafeCell<bool>,
}


impl Unlimited {
pub fn new() -> Self {
Self{
closed: UnsafeCell::new(false),
}
}
}


impl Semaphore for Unlimited {
fn add_permits(&self, _: usize) {}


fn close(&self) {
unsafe {
*self.closed.get() = true;
}
}


fn is_closed(&self) -> bool {
unsafe { *self.closed.get() }
}
}

Oneshot Channel

Once Cell

Semaphore

Q:都 thread local 了为啥要搞同步能力?

A:并发 != 并行,所以即便是单线程,也会同时执行多个 task,而 task 之间需要通信,有依赖关系,所以需要一个组件来支持它们异步等待。

Q:为啥不用开源库?

A:目前并没有基本能用的同类开源组件。目前主流 Rust Runtime 是 Tokio,非 thread per core 模型,所以使用自带跨线程同步能力的数据结构。

有一个还算可用的是 local-channel 但是其实现上一来性能不佳(使用 Vec 存储,扩容时会产生数据拷贝),二来功能上有所欠缺(只有 unbounded mpsc 实现,无法做 back pressure 等),所以我们在考虑之后决定自己造轮子。

本系列文章到此结束。文章里难免会有一些笔误或者我理解上的错误。如果你想与我讨论,可以直接邮件我(在关于页有联系方式);如果是 Monoio 相关相关的问题,也可以直接在 Github 上提 Issue 或 Discussion。

Monoio 目前仍处于非常不完善的阶段,期待你的贡献:)

另外,我们还搭建了一个国内的 crates.io 和 rustup 的镜像,欢迎使用 RsProxy

如果你想转载本博客的文章,麻烦注明出处谢谢。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK