diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio')
115 files changed, 1617 insertions, 1644 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 56ac0dc9..f1a34c02 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -32,6 +32,7 @@ default = [ "process", "rt-full", "signal", + "stream", "sync", "time", ] @@ -40,7 +41,7 @@ blocking = ["rt-core"] dns = ["blocking"] fs = ["blocking"] io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync -io-util = ["pin-project", "memchr"] +io-util = ["pin-project", "pin-project-lite", "memchr"] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ @@ -55,6 +56,7 @@ process = [ ] # Includes basic task execution capabilities rt-core = [] +# TODO: rename this -> `rt-threaded` rt-full = [ "macros", "num_cpus", @@ -72,6 +74,7 @@ signal = [ "winapi/consoleapi", "winapi/minwindef", ] +stream = ["futures-core"] sync = ["fnv"] test-util = [] tcp = ["io-driver"] @@ -84,18 +87,17 @@ uds = ["io-driver", "mio-uds", "libc"] tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } bytes = "0.4" -futures-core = "0.3.0" -futures-sink = "0.3.0" -futures-util = { version = "0.3.0", features = ["sink", "channel"] } iovec = "0.1" # Everything else is optional... fnv = { version = "1.0.6", optional = true } +futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } mio = { version = "0.6.14", optional = true } num_cpus = { version = "1.8.0", optional = true } pin-project = { version = "0.4", optional = true } +pin-project-lite = { version = "0.1", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs index 695358a3..64398cbb 100644 --- a/tokio/src/fs/blocking.rs +++ b/tokio/src/fs/blocking.rs @@ -1,7 +1,6 @@ use crate::fs::sys; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::cmp; use std::future::Future; use std::io; diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 3f18831e..0ff45025 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -7,7 +7,6 @@ use crate::fs::blocking::Buf; use crate::fs::{asyncify, sys}; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::fmt; use std::fs::{Metadata, Permissions}; use std::future::Future; @@ -430,7 +429,7 @@ impl File { } async fn complete_inflight(&mut self) { - use futures_util::future::poll_fn; + use crate::future::poll_fn; if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { self.last_write_err = Some(e.kind()); diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 9492a2f4..219c7b35 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -1,7 +1,5 @@ use crate::fs::{asyncify, sys}; -use futures_core::ready; -use futures_core::stream::Stream; use std::ffi::OsString; use std::fs::{FileType, Metadata}; use std::future::Future; @@ -50,10 +48,15 @@ enum State { Pending(sys::Blocking<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>), } -impl Stream for ReadDir { - type Item = io::Result<DirEntry>; +impl ReadDir { + /// Returns the next entry in the directory stream. + pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_next_entry(cx)).await + } - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[doc(hidden)] + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> { loop { match self.0 { State::Idle(ref mut std) => { @@ -68,7 +71,11 @@ impl Stream for ReadDir { let (ret, std) = ready!(Pin::new(rx).poll(cx))?; self.0 = State::Idle(Some(std)); - let ret = ret.map(|res| res.map(|std| DirEntry(Arc::new(std)))); + let ret = match ret { + Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))), + Some(Err(e)) => Err(e), + None => Ok(None), + }; return Poll::Ready(ret); } @@ -77,6 +84,19 @@ impl Stream for ReadDir { } } +#[cfg(feature = "stream")] +impl futures_core::Stream for ReadDir { + type Item = io::Result<DirEntry>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(match ready!(self.poll_next_entry(cx)) { + Ok(Some(entry)) => Some(Ok(entry)), + Ok(None) => None, + Err(err) => Some(Err(err)), + }) + } +} + /// Entries returned by the [`ReadDir`] stream. /// /// [`ReadDir`]: struct.ReadDir.html @@ -100,13 +120,11 @@ impl DirEntry { /// /// ```no_run /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; + /// while let Some(entry) = entries.next_entry().await? { /// println!("{:?}", entry.path()); /// } /// # Ok(()) @@ -133,13 +151,11 @@ impl DirEntry { /// |