diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src')
93 files changed, 1370 insertions, 1269 deletions
diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs index 695358a3..64398cbb 100644 --- a/tokio/src/fs/blocking.rs +++ b/tokio/src/fs/blocking.rs @@ -1,7 +1,6 @@ use crate::fs::sys; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::cmp; use std::future::Future; use std::io; diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 3f18831e..0ff45025 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -7,7 +7,6 @@ use crate::fs::blocking::Buf; use crate::fs::{asyncify, sys}; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::fmt; use std::fs::{Metadata, Permissions}; use std::future::Future; @@ -430,7 +429,7 @@ impl File { } async fn complete_inflight(&mut self) { - use futures_util::future::poll_fn; + use crate::future::poll_fn; if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { self.last_write_err = Some(e.kind()); diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 9492a2f4..219c7b35 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -1,7 +1,5 @@ use crate::fs::{asyncify, sys}; -use futures_core::ready; -use futures_core::stream::Stream; use std::ffi::OsString; use std::fs::{FileType, Metadata}; use std::future::Future; @@ -50,10 +48,15 @@ enum State { Pending(sys::Blocking<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>), } -impl Stream for ReadDir { - type Item = io::Result<DirEntry>; +impl ReadDir { + /// Returns the next entry in the directory stream. + pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_next_entry(cx)).await + } - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[doc(hidden)] + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> { loop { match self.0 { State::Idle(ref mut std) => { @@ -68,7 +71,11 @@ impl Stream for ReadDir { let (ret, std) = ready!(Pin::new(rx).poll(cx))?; self.0 = State::Idle(Some(std)); - let ret = ret.map(|res| res.map(|std| DirEntry(Arc::new(std)))); + let ret = match ret { + Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))), + Some(Err(e)) => Err(e), + None => Ok(None), + }; return Poll::Ready(ret); } @@ -77,6 +84,19 @@ impl Stream for ReadDir { } } +#[cfg(feature = "stream")] +impl futures_core::Stream for ReadDir { + type Item = io::Result<DirEntry>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(match ready!(self.poll_next_entry(cx)) { + Ok(Some(entry)) => Some(Ok(entry)), + Ok(None) => None, + Err(err) => Some(Err(err)), + }) + } +} + /// Entries returned by the [`ReadDir`] stream. /// /// [`ReadDir`]: struct.ReadDir.html @@ -100,13 +120,11 @@ impl DirEntry { /// /// ```no_run /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; + /// while let Some(entry) = entries.next_entry().await? { /// println!("{:?}", entry.path()); /// } /// # Ok(()) @@ -133,13 +151,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; + /// while let Some(entry) = entries.next_entry().await? { /// println!("{:?}", entry.file_name()); /// } /// # Ok(()) @@ -164,14 +180,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; - /// + /// while let Some(entry) = entries.next_entry().await? { /// if let Ok(metadata) = entry.metadata().await { /// // Now let's show our entry's permissions! /// println!("{:?}: {:?}", entry.path(), metadata.permissions()); @@ -202,14 +215,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; - /// + /// while let Some(entry) = entries.next_entry().await? { /// if let Ok(file_type) = entry.file_type().await { /// // Now let's show our entry's file type! /// println!("{:?}: {:?}", entry.path(), file_type); diff --git a/tokio/src/future.rs b/tokio/src/future.rs deleted file mode 100644 index f6b7e4a7..00000000 --- a/tokio/src/future.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! Asynchronous values. - -#[cfg(feature = "time")] -use crate::time::Timeout; - -#[cfg(feature = "time")] -use std::time::Duration; - -#[doc(inline)] -pub use futures_util::future::{err, ok, pending, poll_fn, ready}; -#[doc(inline)] -pub use std::future::Future; - -/// An extension trait for `Future` that provides a variety of convenient -/// combinator functions. -/// -/// Currently, there only is a [`timeout`] function, but this will increase -/// over time. -/// -/// Users are not expected to implement this trait. All types that implement -/// `Future` already implement `FutureExt`. -/// -/// This trait can be imported directly or via the Tokio prelude: `use -/// tokio::prelude::*`. -/// -/// [`timeout`]: #method.timeout -pub trait FutureExt: Future { - /// Creates a new future which allows `self` until `timeout`. - /// - /// This combinator creates a new future which wraps the receiving future - /// with a timeout. The returned future is allowed to execute until it - /// completes or `timeout` has elapsed, whichever happens first. - /// - /// If the future completes before `timeout` then the future will resolve - /// with that item. Otherwise the future will resolve to an error. - /// - /// The future is guaranteed to be polled at least once, even if `timeout` - /// is set to zero. - /// - /// # Examples - /// - /// ``` - /// use tokio::prelude::*; - /// use std::time::Duration; - /// - /// async fn long_future() { - /// // do work here - /// } - /// - /// # async fn dox() { - /// let res = long_future() - /// .timeout(Duration::from_secs(1)) - /// .await; - /// - /// if res.is_err() { - /// println!("operation timed out"); - /// } - /// # } - /// ``` - #[cfg(feature = "time")] - fn timeout(self, timeout: Duration) -> Timeout<Self> - where - Self: Sized, - { - Timeout::new(self, timeout) - } -} - -impl<T: ?Sized> FutureExt for T where T: Future {} diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs new file mode 100644 index 00000000..5011544c --- /dev/null +++ b/tokio/src/future/maybe_done.rs @@ -0,0 +1,76 @@ +//! Definition of the MaybeDone combinator + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that may have completed. +#[derive(Debug)] +pub(crate) enum MaybeDone<Fut: Future> { + /// A not-yet-completed future + Future(Fut), + /// The output of the completed future + Done(Fut::Output), + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take_output`](MaybeDone::take_output) method. + Gone, +} + +// Safe because we never generate `Pin<&mut Fut::Output>` +impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} + +/// Wraps a future into a `MaybeDone` +pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { + MaybeDone::Future(future) +} + +impl<Fut: Future> MaybeDone<Fut> { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take_output`](MaybeDone::take_output) + /// has not yet been called. + pub(crate) fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempt to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(_) => {} + MaybeDone::Future(_) | MaybeDone::Gone => return None, + }; + if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { + Some(output) + } else { + unreachable!() + } + } + } +} + +impl<Fut: Future> Future for MaybeDone<Fut> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let res = unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)), + MaybeDone::Done(_) => return Poll::Ready(()), + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + }; + self.set(MaybeDone::Done(res)); + Poll::Ready(()) + } +} diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs new file mode 100644 index 00000000..9a155bf7 --- /dev/null +++ b/tokio/src/future/mod.rs @@ -0,0 +1,15 @@ +#![allow(unused_imports, dead_code)] + +//! Asynchronous values. + +mod maybe_done; +pub(crate) use maybe_done::{maybe_done, MaybeDone}; + +mod poll_fn; +pub(crate) use poll_fn::poll_fn; + +mod ready; + |