![](/style/images/good.png)
![](/style/images/bad.png)
In nightly Rust, await! may never return (future cancelation)
source link: https://www.tuicool.com/articles/hit/UNN3Qrf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
I've been using the
proposed await!
and Future
features in nightly Rust
, and overall, I really like the design. But I did run into one surprise: await!
may never return, and has consequences I didn't fully understand. Let's take a look.
We're going to use Rust nightly-2019-02-08
, and
tokio-async-await
. This is highly experimental code, and it will require us to convert back and forth between tokio::Future
and the proposed std::future::Future
.
You can find the full code on GitHub. We'll start by enabling the experimental features we'll need:
#![feature(await_macro, async_await, futures_api)] #[macro_use] extern crate tokio_async_await;
Then we'll import some libraries, and declare two helper functions tokio_fut
and boxed_fut
, that make it easy to convert from std::future::Future
into tokio::Future
and into Box<tokio::Future<..>>
, respectively. You can look that code up on GitHub
.
Next, we define a function delay
, which returns a Future
that waits for the specified number of milliseconds:
fn delay(millis: u64) -> Delay { Delay::new( Instant::now() + Duration::from_millis(millis), ) }
Canceling a Future
Now, we can define two tasks:
/// An asynchronous function that completes quickly. async fn quick_task() -> Result<&'static str> { println!("START quick_task"); await!(delay(10)).context("delay failed")?; println!("END quick_task"); Ok("quick_task result") } /// An asynchronous function that completes very slowly. async fn slow_task() -> Result<&'static str> { println!("START slow_task"); await!(delay(10_000)).context("delay failed")?; println!("END slow_task"); Ok("slow_task result") }
Here, quick_task
waits for 10 milliseconds, and slow_task
waits for 10,000 milliseconds. We can combine them using select_all
:
/// Run our tasks in parallel, waiting for the first to /// complete. async fn wait_for_first_task() -> Result<()> { let all_futures = vec![ boxed_fut(quick_task()), boxed_fut(slow_task()), ]; let (result, _idx, others) = await!( future::select_all(all_futures) ) .map_err(|(err, idx, others)| { println!("A future failed, so throw the rest away"); drop(others); format_err!("task {} failed: {}", idx, err) })?; println!("Result from the first task: {}", result); println!("Dropping unneeded computations"); drop(others); Ok(()) }
The drop
calls above are unnecessary—the compiler would be happy to insert them—but I included them for clarity.
When we run this, we will see:
START quick_task START slow_task END quick_task Result from the first task: quick_task result Dropping unneeded computations
Here, slow_task
called await!
, but await!
never returned! It feels almost like an exception, but what really happened is that quick_task
and slow_task
are coroutines
, and they only run if somebody calls await!
on them. (Rust Future
s have "pull" semantics, not push semantics.) This means you can cancel a future by letting it drop
.
So when we called select_all
and drop
, we took the value of the first future to complete, and dropped the rest.
On the one hand, this is great, because it means that Rust Future
s are always
cancelable. On the other hand, it has surprising consequences: We never print END slow_task
.
Can we clean up resources when a Future
is canceled?
Yes, but we need to use RAII :
/// A structure which prints `msg` on `drop`. struct Cleanup { msg: &'static str, } impl Drop for Cleanup { fn drop(&mut self) { println!("{}", self.msg) } } /// An asynchronous function that cleans up after itself. async fn protected_slow_task() -> Result<&'static str> { println!("START protected_slow_task"); let _cleanup = Cleanup { msg: "CLEANUP protected_slow_task", }; await!(delay(10_000)).context("delay failed")?; println!("END protected_slow_task"); Ok("protected_slow_task result") }
...and add it to all_futures
:
let all_futures = vec![ boxed_fut(quick_task()), boxed_fut(slow_task()), boxed_fut(protected_slow_task()), ];
When run, we'll see:
START quick_task START slow_task START protected_slow_task END quick_task Result from the first task: quick_task result Dropping unneeded computations CLEANUP protected_slow_task
We still don't see END protected_slow_task
, but we do see our CLEANUP
. So that's nice. Or at least I could probably learn to live with it.
What does this mean for send
ing to channels?
In the recent discussion of Rust channel semantics ( here
, here
,here and here
), I've argued that we need to look at how the proposed await!
and std::future::Future
interact with channels.
Basically, imagine we have an asynchronous function that holds the receive
end of a channel. For example, this function
copies a Stream
(which might be the receiving end of a channel) into an AsyncWrite
:
/// Given a `Stream` of data chunks of type `BytesMut`, write /// the entire stream to an `AsyncWrite` implementation. pub(crate) async fn copy_stream_to_writer<S, W>( ctx: Context, mut stream: S, mut wtr: W, ) -> Result<()> where S: Stream<Item = BytesMut, Error = Error> + 'static, W: AsyncWrite + 'static, { loop { match await!(stream.into_future()) { Err((err, _rest_of_stream)) => { error!(ctx.log(), "error reading stream: {}", err); return Err(err); } Ok((None, _rest_of_stream)) => { trace!(ctx.log(), "end of stream"); return Ok(()); } Ok((Some(bytes), rest_of_stream)) => { stream = rest_of_stream; trace!(ctx.log(), "writing {} bytes", bytes.len()); await!(io::write_all(&mut wtr, bytes)).map_err(|e| { error!(ctx.log(), "write error: {}", e); format_err!("error writing data: {}", e) })?; } } } }
But what happens if io::write_all
returns an error? Well, we exit, which will drop stream
. If stream
is actually a channel, this will close the receiving end of the channel. In this case, we could
add code to notify the sending end of the channel (if stream
is actually a channel), and clean everything up. This seems like a lot of extra work for no real gain, but Go programmers apparently live with it every day, and it could be done.
But now let's consider a trickier case. What if copy_stream_to_writer
were wrapped inside a select_all
, and it lost out to a faster future? Or what if somebody wrapped the future in a timeout?
let fut = copy_stream_to_writer(...) .timeout(Duration::from_secs(5)); await!(fut)?;
In this case, the await!(stream.into_future())
expression in copy_stream_to_writer
will never return. And we'll drop stream
without any easy way to notify anybody about that.
So far, I'm cool with all this. Sometimes, the receiving end of a channel just silently disappears, because a timeout
or a select_all
or a drop
caused an await!
to never return. It's not too hard to program in this world—all I need to do is accept that channel.send(...)
can fail without explanation, and then everything works nicely.
But while I was thinking about this, a comment by ben0x539 made me realize there's an uglier side to this.
What does this mean for receiving from channels?
With typical Rust mpsc
channels, if all the senders are dropped, then the receiver will see the channel return an Ok(None)
. If all the senders were dropped normally, this is exactly what we want. If the senders were closed because of a normal error, then we could use something like
SyncStreamWriter
's send_error
to add that error at the end of our stream:
let result: Result<()> = try { let conn = connect(&url)?; let stmt = conn.prepare(&sql)?; stmt.copy_out(&[], &mut wtr)?; }; // Report any errors to our stream. if let Err(err) = result { error!(ctx.log(), "error reading from PostgreSQL: {}", err); if wtr.send_error(err).is_err() { error!(ctx.log(), "cannot report error to foreground thread"); } }
But what if our sender
were inside an async
function that got canceled, and that never returned from an await!
? Well, in this case, the channel will close normally, and the receiver will never realize that the sender failed early.
This bothers me lot
more than send
sometimes returning an error. I'll probably need to go audit a couple of pieces of code very carefully.
When I started thinking about all of this, I was absolutely convinced that the interactions between await!
, mpsc
channels and Future
s were surprisingly good. Sure, I had to accept than any send
might fail, but that was easy enough to deal with.
But now I want to think some more about await!
, and the fact that it might never return. I think this is natural consequence of many
other good things, and that it makes natural sense in terms of co-routines. But how does it interact with resource cleanup and channel shutdown?
Recommend
-
64
Is it possible to handle producer cancelation inside producer builder itself? It could be useful to unsubscribe from callback:private fun changes(key: String) = produce<Unit>(UI, CONFLATED)...
-
16
本文内容来自Writing an OS in Rust博客。 多任务处理 几乎所有操作系统的基本功能都包含多任务处理,即并发执行多个任务的能力(multitasking)。例如,你可能边...
-
0
Meet the Pixel Buds Pro coming this July with Active Noise Cancelation
-
13
Premium
-
6
After First Kill's shock cancelation, the showrunner has hit out at Netflix By Tom Goodwyn published about 9 hours ago T...
-
4
Home ...
-
3
AirPods Max Active Noise Cancelation Confirmed to Be Less Effective After Latest Firmware Update
-
0
Home Chevron icon...
-
4
Kuo: Apple 5G modem failure causes 2024 iPhone SE 4 cancelation Friday, January 6, 2023 2:00 pmFriday, January 6, 2023
-
3
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK