37

深入浅出 Rust 异步编程之 Tokio

 4 years ago
source link: https://mp.weixin.qq.com/s/FzJAV8Ah8DqDeNSee9WnPA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文以tokio为例简单介绍Rust异步编程相关的一些知识。

首先让我们看看为什么使用rust来进行异步编程。 这里tokio官方给出了一个性能测试的对比,可以看到tokio是性能最好,实际上运行这个基准测试的时候,tokio性能更好的2.0版本尚未发布,否则估计性能还有很大提升。 因此,我们可以认为需要非常极致性能的时候,我们可以选择rust+tokio来实现。

6VZVryv.png!web

Rust网络编程

Rust实际上并不跟一定的网络编程模型强绑定,实际rust可以实现阻塞IO+多线程,非阻塞IO+回调,用户态线程等多种模型。 这里着重介绍Rust实现的用户态线程。

  • 首先,Rust的用户态线程是一种基于Future的用户态线程,关于Future本身,本文后续部分有详细论述。

  • 其次,由于是Rust实现,因此可以做到零成本抽象,并且更容易做到安全。

  • 最后,由于没有运行时大量内存分配,没有动态逻辑分派,也没有GC开销,所以该实现的效率非常高。

Rust异步编程是构建在操作系统相关API上,MIO库类似Java的Nio库,针对多种操作系统的不同API做了统一封装。 Future库类似Java的Future库,提供了相关接口和常用的组合能力。 Tokio构建于两者之上,在MIO和future的基础上实现了用户态线程。 使用Tokio进行异步编程的技术栈如下,需要注意的是,应用程序会同时接触到Tokio和future的API。

2QRzuqa.jpg!web

Futures

future是rust异步编程的核心。 首先我们介绍什么是future。 future是一段异步计算程序,可以在将来获取产生的数据。 举例来说,获取数据库查询结果,RPC调用这些实际上都可以使用future来实现。 通常实现future有两种模式,一种基于推模式,也被称为基于完成的模式,一种基于拉模式,也被称为基于就绪的模式。 Rust的future库实现了基于拉模式的future。

rust的future选择拉模式来实现。

接口定义如下:

pub trait Future {


type Item;


type Error;



fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

}


假设一个future要做这样的功能,从TCP数据流读取数据并计算自己读了多少个字节并进行回调。那用代码表示:

struct MyTcpStream {

socket: TcpStream,

nread: u64,

}


impl Future for MyTcpStream {

type Item =u64;

type Error = io::Error;


fn poll(&mut self) -> Poll<Item, io::Error> {

let mut buf = [0;10];

loop {

match self.socket.read(&mut buf) {

Async::Ready(0) => return Async::Ready(self.nread),

Async::Ready(n) => self.nread += n,

Async::NotReady => return Async::NotReady,

}

}

}


}

每次调用poll方法,MyTcpStream都会调用socket的read方法(这里的TcpStream本身也是一个future,read内部也是调用poll方法),当read返回为Async::NotReady的时候,调度器会将当前的Task休眠,如果返回Async::Read(n)表示读到了数据,则给计数器加对应的数,如果返回Async::Ready(0),则表示TcpStream里有的数据已经读完,就将计数器返回。

为了方便大家使用,future库包提供了很多组合子,以AndThen组合子为例:

enum AndThen<A,F> {

First(A, F),

}


fn poll(&mut self) -> Async<Item> {

match fut_a.poll() {

Async::Ready(v) => Async::Ready(f(v)),

Async::NotReady => Async::NotReady,

}

}

这里AndThen枚举,First有两个值,其中A是一个future,F是一个闭包,AndThen实现的poll方法,就是假如调用future_a的poll方法有返回值,那么就调用闭包,并将其返回值包装为Async::Ready返回,如果poll的返回值是Async::NotReady则同样返回Async::NotReady。 有了这个AndThen方法,通过组合子函数(比如and_then实际上是将上一个future和闭包传入生成一个AndThen future),我们就可以实现一些复杂逻辑:

let f=MyTcpStream::connect(&remote_addr)

.and_then(|num| {println!("already read %d",num);

return num;}).and_then(|num| {

process(num)

});

tokio::spawn(f);

上面的代码就是建立Tcp连接,然后每次读数据,都通过第一个and_then打印日志,然后再通过第二个and_then做其他处理,tokio::spawn用于执行最终的future,用图形来表示:

VNNvuiV.jpg!web

如果没有数据:

7fyQNjb.jpg!web

如果有数据:

2yqu2eN.jpg!web

如果将MyTcpStream的poll实现改为:

fn poll(&mut self) -> Poll<Item, io::Error> {

let mut buf = [0;1024];

let mut bytes = bytesMut::new();

loop {

match self.socket.read(&mut buf) {

Async::Ready(0) => return Async::Ready(bytes.to_vec()),

Async::Ready(n) => bytes.put(buf[0..n]),

Async::NotReady => return Async::NotReady,

}

}

}

这段代码主要是将socket中数据读出,然后包装为Async::Ready或者Async::NotReady供下一个future使用,我们就可以实现更复杂的逻辑,比如:

MyTcpStream::connect(&remote_addr)

.and_then(|sock| io::write(sock, handshake)) //这里发送handshake

.and_then(|sock| io::read_exact(sock, 10)) // 这里读handshake的响应,假设handeshake很短

.and_then(|(sock, handshake)| { // 这个future做验证并发送请求

validate(handshake);

io::write(sock, request)

})

.and_then(|sock| io::read_exact(sock, 10))// 这里读取响应

.and_then(|(sock, response)| { // 这里处理响应

process(response)

})

我们上面解释了future和组合子,漏掉一个重要的API,就是:

tokio::spawn(future)

当我们使用spawn方法的时候,tokio会将传入的future生成一个task,由于future内部包含了另外的future,所以就组成了如下所示结构,其中task就是轻量级线程。

IrYRviF.jpg!web

Tokio

上面我们介绍了future相关的内容,接下来我们先看看tokio如何使用,我们这里先用taokio启动一个服务器,代码如下:

let listener = TcpListener::bind(&addr).unwrap();


let server = listener.incoming().for_each(move |socket| {

tokio::spawn(process(socket));

Ok(())

}).map_err(|err| {

println!("accept error = {:?}", err);

});


tokio::run(server);

上面的代码首先生成一个TcpListener,listener的incomming和foreach会将连进来的tcp连接生成TcpStream(即代码中的socket),针对每一个连接启动一个用户态线程处理。

Tokio本身是基于Mio和future库来实现的,其主要包含两个主要的大功能部分(本文不是对源码进行分析,Tokio不同版本之间的差异也较大,只是进行原理说明),reactor和scheduler。

scheduler负责对task进行调度,上文所展示的task调度部分功能就是由scheduler负责,reactor部分主要是负责事件触发,比如网络事件,文件系统事件,定时器等等。 用图展示如下:

JRRfMzn.jpg!web

当有事件触发的时候,reactor会通过task的api通知scheduler运行该任务。

M3IfmmQ.jpg!web

对于Reactor来说,其中最重要的结构是Poll和io_dispatch,在linux上Poll是对Epoll实例的封装(在其他操作系统上也类似),io_dispatch其中记录了调度相关的信息,具体来说主要是记录了task的id和fd的对应关系。 当通过Poll获取到FD事件的时候,通过io_dispatch找到task,然后再通知调度器。

EFvaea3.png!web

TcpListner实际并非rust std库中的TcpListner,tokio对其进行了包装,每次有新连接到来的时候都会生成一个新的TcpStream。

TcpStream也是tokio包装后的TcpStream,可以看到其中包含一个PollEvented,而PollEvented内部包含实际的TcpSteam。

PollEvented构造之后,会调用io_dispatch中的注册接口,然后在第一次调用poll的时候,将fd和task关联。

Async/await

通过上面的文章可以看到,直接使用tokio相关API还是有些难度的,然而在rust 1.39.0之后的版本,我们可以使用async/awai特性来简化代码,使得代码更容易理解。 使用async/await后,上面的代码可以简化为:

#[tokio::main]

pub async fn main() -> Result<(), Box<dyn Error>> {

let mut stream = TcpStream::connect("127.0.0.1:6142").await?;

println!("created stream");

let result = stream.write(b"hello world\n").await;

println!("wrote to stream; success={:?}", result.is_ok());

Ok(())

}

要点在于对于需要异步的函数使用async修饰,在调用async函数的时候使用await获取返回结果。 实际上async函数是由编辑器生成的future,await也是由编译器生成代码调用future的poll方法。 因此真正用好async/await也需要对上面的内容了解清楚。

Tips

最后,使用tokio有一些需要注意地方:

  1. 生命周期的问题。 声明周期的问题是一直贯穿rust的,具体到tokio使用上来说,最主要的是self的生命周期问题,主要是因为runtime要求借用是静态的,这个跟对象本身的声明周期是有矛盾的。 我们推荐的主要做法是使用actor模型,这样可以消除掉对于静态生命周期的要求。

  2. 注意兼容问题,最主要是需要注意future01和future03的兼容性问题,future官方提供了兼容包,来做版本之间的兼容,如果要使用async/await,推荐尽量使用future03库。

  3. runtime,tokio的一个runtime对应一个线程池,因此推荐对不同业务使用不同线程池,减少业务之间相互影响。

  4. 使用TaskExecutor/Handle来spawm一个task。 上面代码里经常使用的tokio::spawn是针对默认runtime的,如果使用了不同的runtime,那么就不能使用tokio::spawn。 另外,TaskExecutor/Handle支持clone,可以解决一些生命周期带来的问题。

  5. 可以在代码中通过api通知task运行。

近期文章推荐

技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。

高可用架构

改变互联网的构建方式

vEjQNvu.jpg!web 长按二维码 关注「高可用架构」公众号


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK