9

【译】理解Rust中的Futures(二)

 3 years ago
source link: http://www.cnblogs.com/praying/p/14179397.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

原文标题:Understanding Futures in Rust -- Part 2

原文链接:https://www.viget.com/articles/understanding-futures-is-rust-part-2/

公众号: Rust 碎碎念

翻译 by: Praying

背景

如果你还没有看前面的内容,可以在 这里 [1] 查看(译注:已有译文,可在公众号查看)。

在第一部分,我们介绍了 Future trait,了解了 future 是如何被创建和运行的,并且开始知道它们如何能被链接到一起。

上次内容的代码可以在 这个 playground 链接 [2] 查看,并且本文中所有示例代码将会以这段代码为基础。

注意:所有的代码示例都有对应的 playground 链接,其中一些用于解释说明但无法编译的代码会有相应的标记。

目标

如果你熟悉 JavaScript 中的 promise 并且阅读了最新的博客,你可能会对先前文章中提到的组合子( thencatchfinally )感到困惑。

你将会在本文章找到与它们对等的东西,并且在最后,下面这段代码将能够编译。你将会理解使得 future 能够运作的类型,trait 和底层概念。

// This does not compile, yet

fn main() {
let my_future = future::ready(1)
.map(|x| x + 3)
.map(Ok)
.map_err(|e: ()| format!("Error: {:?}", e))
.and_then(|x| future::ready(Ok(x - 3)))
.then(|res| {
future::ready(match res {
Ok(val) => Ok(val + 3),
err => err,
})
});

let val = block_on(my_future);
assert_eq!(val, Ok(4));
}

工具函数

首先,我们需要一些工具函数, future::readyblock_on 。这些函数能够让我们很容易地创建和运行 future 直到它们完成,这些函数虽然有用,但是在生产环境的代码中并不常见。

在开始之前,我们先把我们的 Future trait 和 Context 结构体整合到模块里以免和标准库冲突。

mod task {
use crate::NOTIFY;

pub struct Context<'a> {
waker: &'a Waker,
}

impl<'a> Context<'a> {
pub fn from_waker(waker: &'a Waker) -> Self {
Context { waker }
}

pub fn waker(&self) -> &'a Waker {
&self.waker
}
}

pub struct Waker;

impl Waker {
pub fn wake(&self) {
NOTIFY.with(|f| *f.borrow_mut() = true)
}
}

}
use crate::task::*;

mod future {
use crate::task::*;

pub enum Poll<T> {
Ready(T),
Pending,
}

pub trait Future {
type Output;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output>;
}
}
use crate::future::*;

Playground 链接 [3]

这里唯一需要注意的就是,只有将模块,类型和函数公开,才能在代码中使用它们。这可以通过 pub 关键字来完成。

工具函数实现

Future::Ready

future::ready 创建了一个 future,该 future 带有传入值并且是立即就绪(ready)的。当你有一个已经不是 future 的值的时候,这个函数可以用于开启一个 future 链,就像前一个示例那样。

mod future {
// ...

pub struct Ready<T>(Option<T>);

impl<T> Future for Ready<T> {
type Output = T;

fn poll(&mut self, _: &Context) -> Poll<Self::Output> {
Poll::Ready(self.0.take().unwrap())
}
}

pub fn ready<T>(val: T) -> Ready<T> {
Ready(Some(val))
}
}

fn main() {
let my_future = future::ready(1);
println!("Output: {}", run(my_future));
}

Playground 链接 [4]

我们创建了一个类型为 Ready<T> 的泛型结构体,该结构体包装了一个 Option 。这里我们使用 Option 枚举以保证 poll 函数只被调用一次。在 executor 的实现中,在返回一个 Poll::Ready 之后调用 poll 将会报错。

BLOCK_ON

为了我们的目标,我们把我们的 run 函数重命名为 block_on 。在 future-preview 这个 crate 中,该函数使用内部的 LocalPool 来运行一个 future 直到完成,同时会阻塞当前线程。我们的函数也做了相似的事情。

fn block_on<F>(mut f: F) -> F::Output
where
F: Future,
{
NOTIFY.with(|n| loop {
if *n.borrow() {
*n.borrow_mut() = false;
let ctx = Context::from_waker(&Waker);
if let Poll::Ready(val) = f.poll(&ctx) {
return val;
}
}
})
}

fn main() {
let my_future = future::ready(1);
println!("Output: {}", block_on(my_future));
}

Playground 链接 [5]

组合子(Combinators)

首先,让我们从一些能够让你直接作用于另一个 Future 的 Output 值的一些组合子开始。在本文中,我们使用非正式的但是比较流行的组合子定义,即能够允许你对某种类型执行操作,并与其他类型结合起来的函数。例如,一个嵌套的 future 可以由一个组合子函数函数创建,它可以有一个复杂的类型 Future< Output = Future < Output = i32>> 。这可以被称为一个 future,该 future 的输出(Output)是另一个 future,新的 future 的输出是 i32 类型。这样的组合子中,最简单的一个就是 map

Map

如果你熟悉 Result 或者 Option 类型的 map 函数,那么对它应该不陌生。map 组合子持有一个函数并将其应用到 future 的 Output 值上,返回一个新的 future,这个新 future 把函数的结果(Result)作为它的 Output 。Future 中的 map 组合子甚至比 Result 或者 Option 中更简单,因为不需要考虑 failure 的情况。map 就是简单的 Future->Future

下面是函数签名:

// does not compile
fn map<U, F>(self: Sized, f: F) -> Map<Self, F>
where
F: FnOnce(Self::Output) -> U,
Self: Sized,

map 是一个泛型函数,它接收一个闭包,返回一个实现了 Future 的 Map 结构体。不是每当我们在值上进行链接都需要实现 Future trait,正如我们在最后一部分做的那样,我们可以使用这些函数来为我们完成这些工作。

让我们来分析一下:

  • Map<Self, F> 声明了 map 函数的(返回)类型,包括当前的 future,以及传入函数的 future。

  • where 是一个能够让我们添加类型约束的关键字。对于 F 类型参数,我们可以在内部定义约束 map<U, F: FnOnce(Self::Output) -> U ,但是使用 where 语句可读性会更好。

  • FnOnce(Self::Output) -> U 是一个函数的类型定义,该函数接收当前类型的 Output 并返回任意类型 UFnOnce 是函数 trait 中的一个,其他还包括 FnMutFnFnOnce 是用起来最简单的,因为编译器可以保证这个函数只被调用一次。它使用环境中用到的值并获取其所有权。 FnFnMut 分别以不可变和可变的方式借用环境中值的引用。所有的闭包都实现了 FnOnce trait,并且其中一些没有移动值的闭包还实现了 FnMutFn trait。这是 Rust 做的最酷的事情之一,允许对闭包和第一类函数参数进行真正有表达力的使用。 Rust book 中的相关内容 [6] 值得一读。

  • Self: Sized 是一个约束,允许 map 只能被 Sized 的 trait 实现者调用。你不必考虑这个问题,但是确实有些类型不是 Sized 。例如, [i32] 是一个不确定大小的数组。因为我们不知道它多长。如果我们想要为它实现我们的 Future trait,那么我们就不能对它调用 map

大多数组合子都遵循这个模式,因此接下来的文章我们就不需要分析的这么仔细了。

下面是一个 map 的完整实现,它的 Map 类型以及它对 Future 的实现

mod future {
trait Future {
// ...

fn map<U, F>(self, f: F) -> Map<Self, F>
where
F: FnOnce(Self::Output) -> U,
Self: Sized,
{
Map {
future: self,
f: Some(f),
}
}
}

// ...

pub struct Map<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, F, T> Future for Map<Fut, F>
where
Fut: Future,
F: FnOnce(Fut::Output) -> T,
{
type Output = T;

fn poll(&mut self, cx: &Context) -> Poll<T> {
match self.future.poll(cx) {
Poll::Ready(val) => {
let f = self.f.take().unwrap();
Poll::Ready(f(val))
}
Poll::Pending => Poll::Pending,
}
}
}
}

fn main() {
let my_future = future::ready(1).map(|val| val + 1);
println!("Output: {}", block_on(my_future));
}

Playground 链接 [7]

从高层次来讲,当我们调用一个 future 上的 map 时,我们构造了一个 Map 类型,该类型持有当前 future 的引用以及我们传入的闭包。 Map 对象自身也是一个 Future。当它被轮询时,它依次轮询底层的 future。当底层的 future 就绪后,它获取那个 future 的 Output 的值并且把它传入闭包,对 Poll::Ready 中的闭包返回的值进行包装(wrapping)并且把新值向上传递。

如果你阅读了最新的博客,你对在这里看到的东西应该感到很熟悉,但是在我们继续之前,我会快速地讲解作为一个复习。

  • pub struct Map<Fut, F> 是一个关于 future—— Fut 和函数 F 的泛型。

  • f: Option<F> 是一个包装了闭包了 Option 类型。这里是个小技巧,以保证闭包只被调用一次。当你获取一个 Option 的值,它会用 None 替换内部的值并且返回里面包含的值。如果在返回一个 Poll::Ready 之后被轮询,这个函数会 panic。在实际中,future 的 executor 不会允许这种情况发生。

  • type Output = T; 定义了 map future 的输出和我们的闭包的返回值是将会是相同的。

  • Poll::Read(f(val)) 返回带有闭包返回结果的就绪(ready)状态。

  • Poll::Pending => Poll::Pending 如果底层的 future 返回 pending,继续传递。

  • future::ready(1).map(|val| val + 1); 这对就绪(ready)future 的输出进行了 map,并对其加 1。它返回了一个 map future,其中带有对原先的 future 的一个引用。map future 在运行期间轮询原先的 future 是否就绪(ready)。这和我们的 AddOneFuture 做的是相同的事情。

这真的很酷,主要有以下几个原因。首先,你不必对每一个你想要进行的计算都实现一个新的 future,它们可以被包装(wrap)进组合子。事实上,除非你正在实现你自己的异步操作,否则你可能从来都不需要自己去实现 Future trait。

Then

现在我们有了 map ,我们可以把任何我们想要的计算链接起来,对么?答案是对的,但是对此还有一个相当大的警告。

想象一下,当你有一些函数,这些函数返回你想要链接起来的 future。对于这个例子,我们可以想象,它们是下面的 api 调用,这些调用返回包装(wrap)在 future 中的结果, get_userget_files_for_user

// does not compile
fn main() {
let files_future = get_user(1).map(|user| get_files_for_user(user));
println!("User Files: {}", block_on(files_future));
}

这段代码无法编译,但是你可以想象你在这里构建的类型,看起来应该像这样: Future<Output = Future<Output= FileList>> 。这在使用 ResultOption 类型的时候也是一个常见问题。使用 map 函数经常会导致嵌套的输出和对这些嵌套的繁琐处理。在这种情况下,你不得不去跟踪到底嵌套了多少层并且对每一个嵌套的 future 都调用 block_on

幸运地是, ResultOption 有一个被称为 and_then 的解决方案。 Optionand_then 通过对 T 应用一个函数来映射(map) Some(T) -> Some(U) ,并且返回闭包所返回的 Option 。对于 future,它是通过一个称为 then 的函数来实现的,该函数看起来很像映射(map),但是这个闭包应该它自己的 future。在一些语言中,这被称为 flatmap 。这里值得注意的是,传递给 then 的闭包返回的值必须是实现了 Future ,否则你将会得到一个编译器错误。

这里是我们的对于 thenThen 结构体和它的对 Future trait 的实现。其中的大部分内容和我们在 map 中做的很像。

mod future {
trait Future {
// ...
fn then<Fut, F>(self, f: F) -> Then<Self, F>
where
F: FnOnce(Self::Output) -> Fut,
Fut: Future,
Self: Sized,
{
Then {
future: self,
f: Some(f),
}
}
}

// ...

pub struct Then<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, NextFut, F> Future for Then<Fut, F>
where
Fut: Future,
NextFut: Future,
F: FnOnce(Fut::Output) -> NextFut,
{
type Output = NextFut::Output;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.poll(cx) {
Poll::Ready(val) => {
let f = self.f.take().unwrap();
f(val).poll(cx)
}
Poll::Pending => Poll::Pending,
}
}
}
}

fn main() {
let my_future = future::ready(1)
.map(|val| val + 1)
.then(|val| future::ready(val + 1));
println!("Output: {}", block_on(my_future));
}

Playground 链接 [8]

这里面没见过的代码可能是 f(val).poll(cx) 。它调用了带有先前 future 的闭包并且直接返回给你 poll 的值。

聪明的你可能会意识到,我们的 Then::poll 函数可能会 panic。如果第一个 future 返回就绪(ready)但是第二个 future 返回 Poll::Pending ,接着 let f = self.f.take().unwrap(); 这行代码就会在下次被轮询(poll)的时候 panic 并退出程序。在 future-preview 中,这种情况会通过一个称为 Chain [9] 的类型来处理。Chain 通过 unsafe 代码块来实现,并且使用了新类型—— Pin 。这些内容超出了本文的范围。目前来讲,我们可以假定任何通过 then 闭包返回的 future 都绝不会返回 Poll::Pending 。总体来讲,这不是个安全的假设。

Result 组合子

在 futures-rs 库的 0.1 版本中, Future trait 和 Result 类型紧密关联。 Future trait 的定义如下:

// does not compile
trait Future {
type Item;
type Error;

fn poll(self) -> Poll<Self::Item, Self::Error>;
}

Poll 类型里定义了成功状态、失败状态和未就绪状态。这意味着像 map 这种函数只有当 Poll 是就绪并且不是错误的情况下才能执行。尽管这会产生一些困扰,但是它在链接组合子并且根据成功或失败状态做决定的时候,会产生一些非常好的人体工程学(ergonomics )。

这与 std::future 的实现方式有所不同。现在 future 要么是就绪或者是未就绪,对于成功或失败语义是不可知的。它们可以包含任何值,包括一个 Result 。为了得到便利的组合子,比如像 map_err 能够让你只改变一个嵌套的 Result 中的错误类型,或者想 and_then 这样,允许你只改变嵌套 Result 中的值类型,我们需要实现一个新的 trait。下面是 TryFuture 的定义:

mod future {
//...
pub trait TryFuture {
type Ok;
type Error;

fn try_poll(self, cx: &mut Context) -> Poll<Result<Self::Ok, Self::Error>>;
}

impl<F, T, E> TryFuture for F
where
F: Future<Output = Result<T, E>>,
{
type Ok = T;
type Error = E;

fn try_poll(&mut self, cx: &Context) -> Poll<F::Output> {
self.poll(cx)
}
}
}

Playground 链接 [10]

TryFuture 是一个 trait,我们可以为任意的类型 <F, T, E> 实现这个 trait,其中 F 实现了 Future trait,它的 Output 类型是 Result<T,E> 。它只有一个实现者。那个实现者定义了一个 try_poll 函数,该函数与 Future trait 上的 poll 有相同的签名,它只是简单地调用了 poll 方法。

这意味着任何一个拥有 Result 的 Output 类型的 future 也能够访问它的成功/错误(success/error)状态。这也使得我们能够定义一些非常方便的组合子来处理这些内部 Result 类型,而不必在一个 mapand_then 组合子内显示地匹配 OkErr 类型。下面是一些能够阐述这个概念的实现。

AndThen

让我们回顾之前想象到的 API 函数。假定它们现在处于会发生网络分区和服务器中断的现实世界中,不会总是能返回一个值。这些 API 方法实际上会返回一个嵌有 result 的 future 以表明它已经完成,并且是要么是成功完成,要么是带有错误的完成。我们需要去处理这些结果,下面是我们可能是根据现有工具处理它的方式。

// does not compile
fn main() {
let files_future = get_user(1).then(|result| {
match result {
Ok(user) => get_files_for_user(user),
Err(err) => future::ready(Err(err)),
}
});

match block_on(files_future) {
Ok(files) => println!("User Files: {}", files),
Err(err) => println!("There was an error: {}", err),:w
};
}

情况还不算太坏,但是假定你想要链接更多的 future,事情很快就会变得一团糟。幸运的是,我们可以定义一个组合子—— and_then ,该组合子将会把类型 Future<Output = Result<T, E>> 映射到 Future<Output = Result<U, E>> ,其中我们把 T 变为了 U

下面是我们定义它的方式:

mod future {
pub trait TryFuture {
// ...

fn and_then<Fut, F>(self, f: F) -> AndThen<Self, F>
where
F: FnOnce(Self::Ok) -> Fut,
Fut: Future,
Self: Sized,
{
AndThen {
future: self,
f: Some(f),
}
}
}

// ...
pub struct AndThen<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, NextFut, F> Future for AndThen<Fut, F>
where
Fut: TryFuture,
NextFut: TryFuture<Error = Fut::Error>,
F: FnOnce(Fut::Ok) -> NextFut,
{
type Output = Result<NextFut::Ok, Fut::Error>;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.try_poll(cx) {
Poll::Ready(Ok(val)) => {
let f = self.f.take().unwrap();
f(val).try_poll(cx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
}

fn main() {
let my_future = future::ready(1)
.map(|val| val + 1)
.then(|val| future::ready(val + 1))
.map(Ok::<i32, ()>)
.and_then(|val| future::ready(Ok(val + 1)));

println!("Output: {:?}", block_on(my_future));
}

Playground 链接 [11]

你对此应该较为熟悉。事实上,这和 then 组合子的实现基本一致。只有一些关键的区别需要注意:

  • 函数定义在 TryFuture trait 中

  • type Output = Result<NextFut::Ok, Fut::Error>; 表明 AndThen future 的输出拥有新的 future 的值类型,以及在它之前的 future 的错误类型。换句话说,如果先前的 future 的输出包含一个错误类型,那么这个闭包将不会被执行。

  • 我们调用的是 try_poll 而不是 poll

值得注意的是,当你像这样来链接组合子的时候,它们的类型前面可能会变得很长且在编译错误信息中难以阅读。 and_then 函数要求 future 调用时的错误类型和由闭包返回的类型必须是相同的。

MapErr

回到我们的想象的 api 调用。假定调用的 api 都返回带有同一类错误的 future,但是你需要在调用之间进行额外的步骤。假定你必须解析第一个 api 结果然后把它传递给第二个。

// 无法编译
fn main() {
let files_future = get_user(1)
.and_then(|user_string| parse::<User>())
.and_then(|user| get_files_for_user(user));

match block_on(files_future) {
Ok(files) => println!("User Files: {}", files),
Err(err) => println!("There was an error: {}", err),:w
};
}

这看起来很好,但是无法编译,并且会有个晦涩的错误信息说它期望得到像 ApiError 的东西但是却找到了一个 ParseError 。你可以在解析返回的 Result 上使用过 map_err 组合子,但是对于 future 应该如何处理呢?如果我们为 TryFuture 实现一个 map_err ,那么我们可以重写成下面这样:

// 无法编译
fn main() {
let files_future = get_user(1)
.map_err(|e| format!("Api Error: {}", e))
.and_then(|user_string| parse::<User>())
.map_err(|e| format!("Parse Error: {}", e))
.and_then(|user| get_files_for_user(user))
.map_err(|e| format!("Api Error: {}", e));

match block_on(files_future) {
Ok(files) => println!("User Files: {}", files),
Err(err) => println!("There was an error: {}", err),:w
};
}

如果这让你看着比较混乱,请继续关注本系列的第三部分,我将谈谈如何处理这个问题和你可能会在使用 future 时遇到的其他问题。

下面是我们实现 map_err 的方式

mod future {
pub trait TryFuture {
// ...

fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
MapErr {
future: self,
f: Some(f),
}
}
}

// ...
pub struct MapErr<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, F, E> Future for MapErr<Fut, F>
where
Fut: TryFuture,
F: FnOnce(Fut::Error) -> E,
{
type Output = Result<Fut::Ok, E>;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.try_poll(cx) {
Poll::Ready(result) => {
let f = self.f.take().unwrap();
Poll::Ready(result.map_err(f))
}
Poll::Pending => Poll::Pending,
}
}
}
}

fn main() {
let my_future = future::ready(1)
.map(|val| val + 1)
.then(|val| future::ready(val + 1))
.map(Ok)
.and_then(|val| future::ready(Ok(val + 1)))
.map_err(|_: ()| 5);

println!("Output: {:?}", block_on(my_future));
}

Playground 链接 [12]

唯一比较陌生的地方是 Poll::Ready(result.map_err(f)) 。在这段代码里,我们传递我们的闭包到 Result 类型的 map_err 函数里。

包装 (Wrap Up)

现在,文章开头的代码可以运行了!比较酷的是这些全都是我们自己实现的。还有很多其他用途的组合子,但是它们几乎都是相同的方式构建的。读者可以自己练习一下,试试实现一个 map_ok 组合子,行为类似于 TryFuture 上的 map_err 但是适用于成功的结果。

Playground 链接 [13]

概要重述(Recap)

  • Rust 中的 Future 之所以如此强大,是因为有一套可以用于链接计算和异步调用的组合子。

  • 我们也学习了 Rust 强大的函数指针 trait, FnOnceFnMutFn

  • 我们已经了解了如何使用嵌入在 future 中的 Result 类型。

接下来

在第三部分中,我们将会介绍使错误处理没有那么痛苦的方式,当你有很多分支时,如何处理返回的 future,以及我们将深入到 async/await 这个令人激动的世界。

quqq6za.jpg!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK