From 8a7e57786a5dca139f5b4261685e22991ded0859 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 15 Nov 2019 22:11:13 -0800 Subject: Limit `futures` dependency to `Stream` via feature flag (#1774) In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns). --- tokio/Cargo.toml | 10 +- tokio/src/fs/blocking.rs | 1 - tokio/src/fs/file.rs | 3 +- tokio/src/fs/read_dir.rs | 50 ++++--- tokio/src/future.rs | 69 ---------- tokio/src/future/maybe_done.rs | 76 +++++++++++ tokio/src/future/mod.rs | 15 ++ tokio/src/future/pending.rs | 44 ++++++ tokio/src/future/poll_fn.rs | 38 ++++++ tokio/src/future/ready.rs | 27 ++++ tokio/src/future/try_join.rs | 115 ++++++++++++++++ tokio/src/io/async_read.rs | 1 - tokio/src/io/async_write.rs | 1 - tokio/src/io/io/async_buf_read_ext.rs | 106 -------------- tokio/src/io/io/async_read_ext.rs | 92 ------------- tokio/src/io/io/async_write_ext.rs | 42 ------ tokio/src/io/io/buf_reader.rs | 195 -------------------------- tokio/src/io/io/buf_stream.rs | 144 -------------------- tokio/src/io/io/buf_writer.rs | 197 --------------------------- tokio/src/io/io/chain.rs | 146 -------------------- tokio/src/io/io/copy.rs | 134 ------------------ tokio/src/io/io/empty.rs | 80 ----------- tokio/src/io/io/flush.rs | 45 ------ tokio/src/io/io/lines.rs | 70 ---------- tokio/src/io/io/mod.rs | 51 ------- tokio/src/io/io/read.rs | 53 ------- tokio/src/io/io/read_exact.rs | 75 ---------- tokio/src/io/io/read_line.rs | 81 ----------- tokio/src/io/io/read_to_end.rs | 109 --------------- tokio/src/io/io/read_to_string.rs | 82 ----------- tokio/src/io/io/read_until.rs | 85 ------------ tokio/src/io/io/repeat.rs | 67 --------- tokio/src/io/io/shutdown.rs | 45 ------ tokio/src/io/io/sink.rs | 78 ----------- tokio/src/io/io/split.rs | 67 --------- tokio/src/io/io/take.rs | 131 ------------------ tokio/src/io/io/write.rs | 46 ------- tokio/src/io/io/write_all.rs | 56 -------- tokio/src/io/mod.rs | 15 +- tokio/src/io/split.rs | 1 - tokio/src/io/util/async_buf_read_ext.rs | 122 +++++++++++++++++ tokio/src/io/util/async_read_ext.rs | 92 +++++++++++++ tokio/src/io/util/async_write_ext.rs | 42 ++++++ tokio/src/io/util/buf_reader.rs | 194 ++++++++++++++++++++++++++ tokio/src/io/util/buf_stream.rs | 144 ++++++++++++++++++++ tokio/src/io/util/buf_writer.rs | 196 ++++++++++++++++++++++++++ tokio/src/io/util/chain.rs | 145 ++++++++++++++++++++ tokio/src/io/util/copy.rs | 133 ++++++++++++++++++ tokio/src/io/util/empty.rs | 80 +++++++++++ tokio/src/io/util/flush.rs | 45 ++++++ tokio/src/io/util/lines.rs | 113 +++++++++++++++ tokio/src/io/util/mod.rs | 71 ++++++++++ tokio/src/io/util/read.rs | 53 +++++++ tokio/src/io/util/read_exact.rs | 74 ++++++++++ tokio/src/io/util/read_line.rs | 80 +++++++++++ tokio/src/io/util/read_to_end.rs | 108 +++++++++++++++ tokio/src/io/util/read_to_string.rs | 81 +++++++++++ tokio/src/io/util/read_until.rs | 84 ++++++++++++ tokio/src/io/util/repeat.rs | 67 +++++++++ tokio/src/io/util/shutdown.rs | 45 ++++++ tokio/src/io/util/sink.rs | 78 +++++++++++ tokio/src/io/util/split.rs | 111 +++++++++++++++ tokio/src/io/util/take.rs | 130 ++++++++++++++++++ tokio/src/io/util/write.rs | 46 +++++++ tokio/src/io/util/write_all.rs | 55 ++++++++ tokio/src/lib.rs | 13 +- tokio/src/net/addr.rs | 9 +- tokio/src/net/tcp/incoming.rs | 25 ++-- tokio/src/net/tcp/listener.rs | 11 +- tokio/src/net/tcp/mod.rs | 8 +- tokio/src/net/tcp/stream.rs | 3 +- tokio/src/net/udp/socket.rs | 3 +- tokio/src/net/udp/split.rs | 4 +- tokio/src/net/unix/datagram.rs | 3 +- tokio/src/net/unix/incoming.rs | 26 ++-- tokio/src/net/unix/listener.rs | 9 +- tokio/src/net/unix/stream.rs | 3 +- tokio/src/net/util/poll_evented.rs | 3 +- tokio/src/prelude.rs | 15 -- tokio/src/process/mod.rs | 32 ++--- tokio/src/process/unix/mod.rs | 30 ++-- tokio/src/process/unix/reap.rs | 38 ++++-- tokio/src/process/windows.rs | 8 +- tokio/src/runtime/mod.rs | 8 ++ tokio/src/runtime/thread_pool/tests/pool.rs | 2 +- tokio/src/runtime/thread_pool/tests/queue.rs | 2 +- tokio/src/signal/ctrl_c.rs | 61 ++++----- tokio/src/signal/mod.rs | 56 ++------ tokio/src/signal/registry.rs | 31 +++-- tokio/src/signal/unix.rs | 32 ++--- tokio/src/signal/windows.rs | 63 +++++---- tokio/src/stream.rs | 78 ----------- tokio/src/sync/barrier.rs | 3 +- tokio/src/sync/mpsc/bounded.rs | 144 +++----------------- tokio/src/sync/mpsc/chan.rs | 98 +++++++++---- tokio/src/sync/mpsc/error.rs | 86 ++++++++++++ tokio/src/sync/mpsc/mod.rs | 7 +- tokio/src/sync/mpsc/unbounded.rs | 102 ++------------ tokio/src/sync/mutex.rs | 2 +- tokio/src/sync/oneshot.rs | 3 +- tokio/src/sync/tests/loom_atomic_waker.rs | 2 +- tokio/src/sync/tests/loom_mpsc.rs | 2 +- tokio/src/sync/tests/loom_oneshot.rs | 2 +- tokio/src/sync/tests/loom_semaphore.rs | 3 +- tokio/src/sync/watch.rs | 110 ++++----------- tokio/src/task/tests/loom.rs | 6 +- tokio/src/task/tests/task.rs | 2 +- tokio/src/time/clock.rs | 4 +- tokio/src/time/delay.rs | 54 ++++---- tokio/src/time/delay_queue.rs | 23 +--- tokio/src/time/driver/registration.rs | 8 -- tokio/src/time/interval.rs | 150 +++++++++++--------- tokio/src/time/mod.rs | 32 ++--- tokio/src/time/tests/test_delay.rs | 52 +++---- tokio/src/time/tests/test_queue.rs | 2 +- tokio/src/time/throttle.rs | 17 +-- tokio/src/time/timeout.rs | 195 ++++++++++---------------- tokio/tests/fs_dir.rs | 51 +++++-- tokio/tests/fs_file_mocked.rs | 11 ++ tokio/tests/io_lines.rs | 18 ++- tokio/tests/net_driver.rs | 2 +- tokio/tests/process_issue_42.rs | 4 +- tokio/tests/rt_common.rs | 6 +- tokio/tests/signal_ctrl_c.rs | 7 +- tokio/tests/signal_drop_recv.rs | 5 +- tokio/tests/signal_drop_rt.rs | 5 +- tokio/tests/signal_drop_signal.rs | 5 +- tokio/tests/signal_multi_rt.rs | 5 +- tokio/tests/signal_notify_both.rs | 11 +- tokio/tests/signal_twice.rs | 6 +- tokio/tests/signal_usr1.rs | 5 +- tokio/tests/sync_errors.rs | 5 +- tokio/tests/sync_mpsc.rs | 113 ++++++--------- tokio/tests/sync_watch.rs | 157 +++++++++------------ tokio/tests/time_interval.rs | 46 +++++-- tokio/tests/time_rt.rs | 11 +- tokio/tests/time_throttle.rs | 68 --------- tokio/tests/time_timeout.rs | 71 +--------- 138 files changed, 3701 insertions(+), 3728 deletions(-) delete mode 100644 tokio/src/future.rs create mode 100644 tokio/src/future/maybe_done.rs create mode 100644 tokio/src/future/mod.rs create mode 100644 tokio/src/future/pending.rs create mode 100644 tokio/src/future/poll_fn.rs create mode 100644 tokio/src/future/ready.rs create mode 100644 tokio/src/future/try_join.rs delete mode 100644 tokio/src/io/io/async_buf_read_ext.rs delete mode 100644 tokio/src/io/io/async_read_ext.rs delete mode 100644 tokio/src/io/io/async_write_ext.rs delete mode 100644 tokio/src/io/io/buf_reader.rs delete mode 100644 tokio/src/io/io/buf_stream.rs delete mode 100644 tokio/src/io/io/buf_writer.rs delete mode 100644 tokio/src/io/io/chain.rs delete mode 100644 tokio/src/io/io/copy.rs delete mode 100644 tokio/src/io/io/empty.rs delete mode 100644 tokio/src/io/io/flush.rs delete mode 100644 tokio/src/io/io/lines.rs delete mode 100644 tokio/src/io/io/mod.rs delete mode 100644 tokio/src/io/io/read.rs delete mode 100644 tokio/src/io/io/read_exact.rs delete mode 100644 tokio/src/io/io/read_line.rs delete mode 100644 tokio/src/io/io/read_to_end.rs delete mode 100644 tokio/src/io/io/read_to_string.rs delete mode 100644 tokio/src/io/io/read_until.rs delete mode 100644 tokio/src/io/io/repeat.rs delete mode 100644 tokio/src/io/io/shutdown.rs delete mode 100644 tokio/src/io/io/sink.rs delete mode 100644 tokio/src/io/io/split.rs delete mode 100644 tokio/src/io/io/take.rs delete mode 100644 tokio/src/io/io/write.rs delete mode 100644 tokio/src/io/io/write_all.rs create mode 100644 tokio/src/io/util/async_buf_read_ext.rs create mode 100644 tokio/src/io/util/async_read_ext.rs create mode 100644 tokio/src/io/util/async_write_ext.rs create mode 100644 tokio/src/io/util/buf_reader.rs create mode 100644 tokio/src/io/util/buf_stream.rs create mode 100644 tokio/src/io/util/buf_writer.rs create mode 100644 tokio/src/io/util/chain.rs create mode 100644 tokio/src/io/util/copy.rs create mode 100644 tokio/src/io/util/empty.rs create mode 100644 tokio/src/io/util/flush.rs create mode 100644 tokio/src/io/util/lines.rs create mode 100644 tokio/src/io/util/mod.rs create mode 100644 tokio/src/io/util/read.rs create mode 100644 tokio/src/io/util/read_exact.rs create mode 100644 tokio/src/io/util/read_line.rs create mode 100644 tokio/src/io/util/read_to_end.rs create mode 100644 tokio/src/io/util/read_to_string.rs create mode 100644 tokio/src/io/util/read_until.rs create mode 100644 tokio/src/io/util/repeat.rs create mode 100644 tokio/src/io/util/shutdown.rs create mode 100644 tokio/src/io/util/sink.rs create mode 100644 tokio/src/io/util/split.rs create mode 100644 tokio/src/io/util/take.rs create mode 100644 tokio/src/io/util/write.rs create mode 100644 tokio/src/io/util/write_all.rs delete mode 100644 tokio/src/stream.rs create mode 100644 tokio/src/sync/mpsc/error.rs delete mode 100644 tokio/tests/time_throttle.rs (limited to 'tokio') diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 56ac0dc9..f1a34c02 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -32,6 +32,7 @@ default = [ "process", "rt-full", "signal", + "stream", "sync", "time", ] @@ -40,7 +41,7 @@ blocking = ["rt-core"] dns = ["blocking"] fs = ["blocking"] io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync -io-util = ["pin-project", "memchr"] +io-util = ["pin-project", "pin-project-lite", "memchr"] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ @@ -55,6 +56,7 @@ process = [ ] # Includes basic task execution capabilities rt-core = [] +# TODO: rename this -> `rt-threaded` rt-full = [ "macros", "num_cpus", @@ -72,6 +74,7 @@ signal = [ "winapi/consoleapi", "winapi/minwindef", ] +stream = ["futures-core"] sync = ["fnv"] test-util = [] tcp = ["io-driver"] @@ -84,18 +87,17 @@ uds = ["io-driver", "mio-uds", "libc"] tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } bytes = "0.4" -futures-core = "0.3.0" -futures-sink = "0.3.0" -futures-util = { version = "0.3.0", features = ["sink", "channel"] } iovec = "0.1" # Everything else is optional... fnv = { version = "1.0.6", optional = true } +futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } mio = { version = "0.6.14", optional = true } num_cpus = { version = "1.8.0", optional = true } pin-project = { version = "0.4", optional = true } +pin-project-lite = { version = "0.1", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs index 695358a3..64398cbb 100644 --- a/tokio/src/fs/blocking.rs +++ b/tokio/src/fs/blocking.rs @@ -1,7 +1,6 @@ use crate::fs::sys; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::cmp; use std::future::Future; use std::io; diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 3f18831e..0ff45025 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -7,7 +7,6 @@ use crate::fs::blocking::Buf; use crate::fs::{asyncify, sys}; use crate::io::{AsyncRead, AsyncWrite}; -use futures_core::ready; use std::fmt; use std::fs::{Metadata, Permissions}; use std::future::Future; @@ -430,7 +429,7 @@ impl File { } async fn complete_inflight(&mut self) { - use futures_util::future::poll_fn; + use crate::future::poll_fn; if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { self.last_write_err = Some(e.kind()); diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 9492a2f4..219c7b35 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -1,7 +1,5 @@ use crate::fs::{asyncify, sys}; -use futures_core::ready; -use futures_core::stream::Stream; use std::ffi::OsString; use std::fs::{FileType, Metadata}; use std::future::Future; @@ -50,10 +48,15 @@ enum State { Pending(sys::Blocking<(Option>, std::fs::ReadDir)>), } -impl Stream for ReadDir { - type Item = io::Result; +impl ReadDir { + /// Returns the next entry in the directory stream. + pub async fn next_entry(&mut self) -> io::Result> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_next_entry(cx)).await + } - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + #[doc(hidden)] + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match self.0 { State::Idle(ref mut std) => { @@ -68,7 +71,11 @@ impl Stream for ReadDir { let (ret, std) = ready!(Pin::new(rx).poll(cx))?; self.0 = State::Idle(Some(std)); - let ret = ret.map(|res| res.map(|std| DirEntry(Arc::new(std)))); + let ret = match ret { + Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))), + Some(Err(e)) => Err(e), + None => Ok(None), + }; return Poll::Ready(ret); } @@ -77,6 +84,19 @@ impl Stream for ReadDir { } } +#[cfg(feature = "stream")] +impl futures_core::Stream for ReadDir { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(match ready!(self.poll_next_entry(cx)) { + Ok(Some(entry)) => Some(Ok(entry)), + Ok(None) => None, + Err(err) => Some(Err(err)), + }) + } +} + /// Entries returned by the [`ReadDir`] stream. /// /// [`ReadDir`]: struct.ReadDir.html @@ -100,13 +120,11 @@ impl DirEntry { /// /// ```no_run /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; + /// while let Some(entry) = entries.next_entry().await? { /// println!("{:?}", entry.path()); /// } /// # Ok(()) @@ -133,13 +151,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; + /// while let Some(entry) = entries.next_entry().await? { /// println!("{:?}", entry.file_name()); /// } /// # Ok(()) @@ -164,14 +180,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; - /// + /// while let Some(entry) = entries.next_entry().await? { /// if let Ok(metadata) = entry.metadata().await { /// // Now let's show our entry's permissions! /// println!("{:?}: {:?}", entry.path(), metadata.permissions()); @@ -202,14 +215,11 @@ impl DirEntry { /// /// ``` /// use tokio::fs; - /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut entries = fs::read_dir(".").await?; /// - /// while let Some(res) = entries.next().await { - /// let entry = res?; - /// + /// while let Some(entry) = entries.next_entry().await? { /// if let Ok(file_type) = entry.file_type().await { /// // Now let's show our entry's file type! /// println!("{:?}: {:?}", entry.path(), file_type); diff --git a/tokio/src/future.rs b/tokio/src/future.rs deleted file mode 100644 index f6b7e4a7..00000000 --- a/tokio/src/future.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! Asynchronous values. - -#[cfg(feature = "time")] -use crate::time::Timeout; - -#[cfg(feature = "time")] -use std::time::Duration; - -#[doc(inline)] -pub use futures_util::future::{err, ok, pending, poll_fn, ready}; -#[doc(inline)] -pub use std::future::Future; - -/// An extension trait for `Future` that provides a variety of convenient -/// combinator functions. -/// -/// Currently, there only is a [`timeout`] function, but this will increase -/// over time. -/// -/// Users are not expected to implement this trait. All types that implement -/// `Future` already implement `FutureExt`. -/// -/// This trait can be imported directly or via the Tokio prelude: `use -/// tokio::prelude::*`. -/// -/// [`timeout`]: #method.timeout -pub trait FutureExt: Future { - /// Creates a new future which allows `self` until `timeout`. - /// - /// This combinator creates a new future which wraps the receiving future - /// with a timeout. The returned future is allowed to execute until it - /// completes or `timeout` has elapsed, whichever happens first. - /// - /// If the future completes before `timeout` then the future will resolve - /// with that item. Otherwise the future will resolve to an error. - /// - /// The future is guaranteed to be polled at least once, even if `timeout` - /// is set to zero. - /// - /// # Examples - /// - /// ``` - /// use tokio::prelude::*; - /// use std::time::Duration; - /// - /// async fn long_future() { - /// // do work here - /// } - /// - /// # async fn dox() { - /// let res = long_future() - /// .timeout(Duration::from_secs(1)) - /// .await; - /// - /// if res.is_err() { - /// println!("operation timed out"); - /// } - /// # } - /// ``` - #[cfg(feature = "time")] - fn timeout(self, timeout: Duration) -> Timeout - where - Self: Sized, - { - Timeout::new(self, timeout) - } -} - -impl FutureExt for T where T: Future {} diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs new file mode 100644 index 00000000..5011544c --- /dev/null +++ b/tokio/src/future/maybe_done.rs @@ -0,0 +1,76 @@ +//! Definition of the MaybeDone combinator + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that may have completed. +#[derive(Debug)] +pub(crate) enum MaybeDone { + /// A not-yet-completed future + Future(Fut), + /// The output of the completed future + Done(Fut::Output), + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take_output`](MaybeDone::take_output) method. + Gone, +} + +// Safe because we never generate `Pin<&mut Fut::Output>` +impl Unpin for MaybeDone {} + +/// Wraps a future into a `MaybeDone` +pub(crate) fn maybe_done(future: Fut) -> MaybeDone { + MaybeDone::Future(future) +} + +impl MaybeDone { + /// Returns an [`Option`] containing a mutable reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take_output`](MaybeDone::take_output) + /// has not yet been called. + pub(crate) fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + } + + /// Attempt to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub(crate) fn take_output(self: Pin<&mut Self>) -> Option { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(_) => {} + MaybeDone::Future(_) | MaybeDone::Gone => return None, + }; + if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { + Some(output) + } else { + unreachable!() + } + } + } +} + +impl Future for MaybeDone { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)), + MaybeDone::Done(_) => return Poll::Ready(()), + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + }; + self.set(MaybeDone::Done(res)); + Poll::Ready(()) + } +} diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs new file mode 100644 index 00000000..9a155bf7 --- /dev/null +++ b/tokio/src/future/mod.rs @@ -0,0 +1,15 @@ +#![allow(unused_imports, dead_code)] + +//! Asynchronous values. + +mod maybe_done; +pub(crate) use maybe_done::{maybe_done, MaybeDone}; + +mod poll_fn; +pub(crate) use poll_fn::poll_fn; + +mod ready; +pub(crate) use ready::{ok, Ready}; + +mod try_join; +pub(crate) use try_join::try_join3; diff --git a/tokio/src/future/pending.rs b/tokio/src/future/pending.rs new file mode 100644 index 00000000..c844ebc3 --- /dev/null +++ b/tokio/src/future/pending.rs @@ -0,0 +1,44 @@ +use std::future::Future; +use std::marker; +use sdt::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`pending()`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +struct Pending { + _data: marker::PhantomData, +} + +/// Creates a future which never resolves, representing a computation that never +/// finishes. +/// +/// The returned future will forever return [`Poll::Pending`]. +/// +/// # Examples +/// +/// ```no_run +/// use tokio::future; +/// +/// #[tokio::main] +/// async fn main { +/// future::pending().await; +/// unreachable!(); +/// } +/// ``` +pub async fn pending() -> ! { + Pending { + _data: marker::PhantomData, + }.await +} + +impl Future for Pending { + type Output = !; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Pending + } +} + +impl Unpin for Pending { +} diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs new file mode 100644 index 00000000..ce2a5524 --- /dev/null +++ b/tokio/src/future/poll_fn.rs @@ -0,0 +1,38 @@ +//! Definition of the `PollFn` adapter combinator + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`poll_fn`] function. +pub(crate) struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +/// Creates a new future wrapping around a function returning [`Poll`]. +pub(crate) fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +impl fmt::Debug for PollFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } +} diff --git a/tokio/src/future/ready.rs b/tokio/src/future/ready.rs new file mode 100644 index 00000000..ba5d4804 --- /dev/null +++ b/tokio/src/future/ready.rs @@ -0,0 +1,27 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`ready`](ready()) function. +/// +/// `pub` in order to use the future as an associated type in a sealed trait. +#[derive(Debug)] +// Used as an associated type in a "sealed" trait. +#[allow(unreachable_pub)] +pub struct Ready(Option); + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Ready(self.0.take().unwrap()) + } +} + +/// Create a future that is immediately ready with a success value. +pub(crate) fn ok(t: T) -> Ready> { + Ready(Some(Ok(t))) +} diff --git a/tokio/src/future/try_join.rs b/tokio/src/future/try_join.rs new file mode 100644 index 00000000..478c69dc --- /dev/null +++ b/tokio/src/future/try_join.rs @@ -0,0 +1,115 @@ +use crate::future::{maybe_done, MaybeDone}; + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub(crate) fn try_join3( + future1: F1, + future2: F2, + future3: F3, +) -> TryJoin3 +where + F1: Future>, + F2: Future>, + F3: Future>, +{ + TryJoin3 { + future1: maybe_done(future1), + future2: maybe_done(future2), + future3: maybe_done(future3), + } +} + +pub(crate) struct TryJoin3 +where + F1: Future, + F2: Future, + F3: Future, +{ + future1: MaybeDone, + future2: MaybeDone, + future3: MaybeDone, +} + +impl Future for TryJoin3 +where + F1: Future>, + F2: Future>, + F3: Future>, +{ + type Output = Result<(T1, T2, T3), E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut all_done = true; + + // Safety: the fn takes `Pin`, we don't move any data out of `self`. + unsafe { + let me = self.get_unchecked_mut(); + + if Pin::new_unchecked(&mut me.future1).poll(cx).is_pending() { + all_done = false; + } else if Pin::new_unchecked(&mut me.future1) + .output_mut() + .unwrap() + .is_err() + { + return Poll::Ready(Err(Pin::new_unchecked(&mut me.future1) + .take_output() + .unwrap() + .err() + .unwrap())); + } + + if Pin::new_unchecked(&mut me.future2).poll(cx).is_pending() { + all_done = false; + } else if Pin::new_unchecked(&mut me.future2) + .output_mut() + .unwrap() + .is_err() + { + return Poll::Ready(Err(Pin::new_unchecked(&mut me.future2) + .take_output() + .unwrap() + .err() + .unwrap())); + } + + if Pin::new_unchecked(&mut me.future3).poll(cx).is_pending() { + all_done = false; + } else if Pin::new_unchecked(&mut me.future3) + .output_mut() + .unwrap() + .is_err() + { + return Poll::Ready(Err(Pin::new_unchecked(&mut me.future3) + .take_output() + .unwrap() + .err() + .unwrap())); + } + + if all_done { + Poll::Ready(Ok(( + Pin::new_unchecked(&mut me.future1) + .take_output() + .unwrap() + .ok() + .unwrap(), + Pin::new_unchecked(&mut me.future2) + .take_output() + .unwrap() + .ok() + .unwrap(), + Pin::new_unchecked(&mut me.future3) + .take_output() + .unwrap() + .ok() + .unwrap(), + ))) + } else { + Poll::Pending + } + } + } +} diff --git a/tokio/src/io/async_read.rs b/tokio/src/io/async_read.rs index 8f6e0b98..974cf346 100644 --- a/tokio/src/io/async_read.rs +++ b/tokio/src/io/async_read.rs @@ -1,5 +1,4 @@ use bytes::BufMut; -use futures_core::ready; use std::io; use std::ops::DerefMut; use std::pin::Pin; diff --git a/tokio/src/io/async_write.rs b/tokio/src/io/async_write.rs index 0489fb7c..2d5e4578 100644 --- a/tokio/src/io/async_write.rs +++ b/tokio/src/io/async_write.rs @@ -1,5 +1,4 @@ use bytes::Buf; -use futures_core::ready; use std::io; use std::ops::DerefMut; use std::pin::Pin; diff --git a/tokio/src/io/io/async_buf_read_ext.rs b/tokio/src/io/io/async_buf_read_ext.rs deleted file mode 100644 index b60f55d5..00000000 --- a/tokio/src/io/io/async_buf_read_ext.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::io::io::lines::{lines, Lines}; -use crate::io::io::read_line::{read_line, ReadLine}; -use crate::io::io::read_until::{read_until, ReadUntil}; -use crate::io::io::split::{split, Split}; -use crate::io::AsyncBufRead; - -/// An extension trait which adds utility methods to `AsyncBufRead` types. -pub trait AsyncBufReadExt: AsyncBufRead { - /// Creates a future which will read all the bytes associated with this I/O - /// object into `buf` until the delimiter `byte` or EOF is reached. - /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). - /// - /// This function will read bytes from the underlying stream until the - /// delimiter or EOF is found. Once found, all bytes up to, and including, - /// the delimiter (if found) will be appended to `buf`. - /// - /// The returned future will resolve to the number of bytes read once the read - /// operation is completed. - /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. - fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec) -> ReadUntil<'a, Self> - where - Self: Unpin, - { - read_until(self, byte, buf) - } - - /// Creates a future which will read all the bytes associated with this I/O - /// object into `buf` until a newline (the 0xA byte) or EOF is reached, - /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). - /// - /// This function will read bytes from the underlying stream until the - /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes - /// up to, and including, the delimiter (if found) will be appended to - /// `buf`. - /// - /// The returned future will resolve to the number of bytes read once the read - /// operation is completed. - /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. - /// - /// # Errors - /// - /// This function has the same error semantics as [`read_until`] and will - /// also return an error if the read bytes are not valid UTF-8. If an I/O - /// error is encountered then `buf` may contain some bytes already read in - /// the event that all data read so far was valid UTF-8. - /// - /// [`read_until`]: AsyncBufReadExt::read_until - fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> - where - Self: Unpin, - { - read_line(self, buf) - } - - /// Returns a stream of the contents of this reader split on the byte - /// `byte`. - /// - /// This method is the async equivalent to - /// [`BufRead::split`](std::io::BufRead::split). - /// - /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`Vec`]`>`. Each vector returned will *not* have - /// the delimiter byte at the end. - /// - /// [`io::Result`]: std::io::Result - /// [`Vec`]: std::vec::Vec - /// - /// # Errors - /// - /// Each item of the stream has the same error semantics as - /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). - fn split(self, byte: u8) -> Split - where - Self: Sized, - { - split(self, byte) - } - - /// Returns a stream over the lines of this reader. - /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). - /// - /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline - /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. - /// - /// [`io::Result`]: std::io::Result - /// [`String`]: String - /// - /// # Errors - /// - /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. - /// - /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line - fn lines(self) -> Lines - where - Self: Sized, - { - lines(self) - } -} - -impl AsyncBufReadExt for R {} diff --git a/tokio/src/io/io/async_read_ext.rs b/tokio/src/io/io/async_read_ext.rs deleted file mode 100644 index c100a71f..00000000 --- a/tokio/src/io/io/async_read_ext.rs +++ /dev/null @@ -1,92 +0,0 @@ -use crate::io::io::chain::{chain, Chain}; -use crate::io::io::copy::{copy, Copy}; -use crate::io::io::read::{read, Read}; -use crate::io::io::read_exact::{read_exact, ReadExact}; -use crate::io::io::read_to_end::{read_to_end, ReadToEnd}; -use crate::io::io::read_to_string::{read_to_string, ReadToString}; -use crate::io::io::take::{take, Take}; -use crate::io::{AsyncRead, AsyncWrite}; - -/// An extension trait which adds utility methods to `AsyncRead` types. -pub trait AsyncReadExt: AsyncRead { - /// Creates an adaptor which will chain this stream with another. - /// - /// The returned `AsyncRead` instance will first read all bytes from this object - /// until EOF is encountered. Afterwards the output is equivalent to the - /// output of `next`. - fn chain(self, next: R) -> Chain - where - Self: Sized, - R: AsyncRead, - { - chain(self, next) - } - - /// Copy all data from `self` into the provided `AsyncWrite`. - /// - /// The returned future will copy all the bytes read from `reader` into the - /// `writer` specified. This future will only complete once the `reader` - /// has hit EOF and all bytes have been written to and flushed from the - /// `writer` provided. - /// - /// On success the number of bytes is returned and the `reader` and `writer` - /// are consumed. On error the error is returned and the I/O objects are - /// consumed as well. - fn copy<'a, W>(&'a mut self, dst: &'a mut W) -> Copy<'a, Self, W> - where - Self: Unpin, - W: AsyncWrite + Unpin + ?Sized, - { - copy(self, dst) - } - - /// Read data into the provided buffer. - /// - /// The returned future will resolve to the number of bytes read once the - /// read operation is completed. - fn read<'a>(&'a mut self, dst: &'a mut [u8]) -> Read<'a, Self> - where - Self: Unpin, - { - read(self, dst) - } - - /// Read exactly the amount of data needed to fill the provided buffer. - fn read_exact<'a>(&'a mut self, dst: &'a mut [u8]) -> ReadExact<'a, Self> - where - Self: Unpin, - { - read_exact(self, dst) - } - - /// Read all bytes until EOF in this source, placing them into `dst`. - /// - /// On success the total number of bytes read is returned. - fn read_to_end<'a>(&'a mut self, dst: &'a mut Vec) -> ReadToEnd<'a, Self> - where - Self: Unpin, - { - read_to_end(self, dst) - } - - /// Read all bytes until EOF in this source, placing them into `dst`. - /// - /// On success the total number of bytes read is returned. - fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> - where - Self: Unpin, - { - read_to_string(self, dst) - } - - /// Creates an AsyncRead adapter which will read at most `limit` bytes - /// from the underlying reader. - fn take(self, limit: u64) -> Take - where - Self: Sized, - { - take(self, limit) - } -} - -impl AsyncReadExt for R {} diff --git a/tokio/src/io/io/async_write_ext.rs b/tokio/src/io/io/async_write_ext.rs deleted file mode 100644 index ea6650d1..00000000 --- a/tokio/src/io/io/async_write_ext.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::io::io::flush::{flush, Flush}; -use crate::io::io::shutdown::{shutdown, Shutdown}; -use crate::io::io::write::{write, Write}; -use crate::io::io::write_all::{write_all, WriteAll}; -use crate::io::AsyncWrite; - -/// An extension trait which adds utility methods to `AsyncWrite` types. -pub trait AsyncWriteExt: AsyncWrite { - /// Write a buffer into this writter, returning how many bytes were written. - fn write<'a>(&'a mut self, src: &'a [u8]) -> Write<'a, Self> - where - Self: Unpin, - { - write(self, src) - } - - /// Attempt to write an entire buffer into this writter. - fn write_all<'a>(&'a mut self, src: &'a [u8]) -> WriteAll<'a, Self> - where - Self: Unpin, - { - write_all(self, src) - } - - /// Flush the contents of this writer. - fn flush(&mut self) -> Flush<'_, Self> - where - Self: Unpin, - { - flush(self) - } - - /// Shutdown this writer. - fn shutdown(&mut self) -> Shutdown<'_, Self> - where - Self: Unpin, - { - shutdown(self) - } -} - -impl AsyncWriteExt for W {} diff --git a/tokio/src/io/io/buf_reader.rs b/tokio/src/io/io/buf_reader.rs deleted file mode 100644 index c9698e16..00000000 --- a/tokio/src/io/io/buf_reader.rs +++ /dev/null @@ -1,195 +0,0 @@ -use crate::io::io::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; - -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::io::{self, Read}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{cmp, fmt}; - -/// The `BufReader` struct adds buffering to any reader. -/// -/// It can be excessively inefficient to work directly with a [`AsyncRead`] -/// instance. A `BufReader` performs large, infrequent reads on the underlying -/// [`AsyncRead`] and maintains an in-memory buffer of the results. -/// -/// `BufReader` can improve the speed of programs that make *small* and -/// *repeated* read calls to the same file or network socket. It does not -/// help when reading very large amounts at once, or reading just one or a few -/// times. It also provides no advantage when reading from a source that is -/// already in memory, like a `Vec`. -/// -/// When the `BufReader` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufReader` on the same -/// stream can cause data loss. -// TODO: Examples -#[pin_project] -pub struct BufReader { - #[pin] - pub(super) inner: R, - pub(super) buf: Box<[u8]>, - pub(super) pos: usize, - pub(super) cap: usize, -} - -impl BufReader { - /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: R) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufReader` with the specified buffer capacity. - pub fn with_capacity(capacity: usize, inner: R) -> Self { - unsafe { - let mut buffer = Vec::with_capacity(capacity); - buffer.set_len(capacity); - inner.prepare_uninitialized_buffer(&mut buffer); - Self { - inner, - buf: buffer.into_boxed_slice(), - pos: 0, - cap: 0, - } - } - } - - /// Gets a reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_ref(&self) -> &R { - &self.inner - } - - /// Gets a mutable reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_mut(&mut self) -> &mut R { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying reader. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> R { - self.inner - } - - /// Returns a reference to the internally buffered data. - /// - /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. - pub fn buffer(&self) -> &[u8] { - &self.buf[self.pos..self.cap] - } - - /// Invalidates all data in the internal buffer. - #[inline] - fn discard_buffer(self: Pin<&mut Self>) { - let me = self.project(); - *me.pos = 0; - *me.cap = 0; - } -} - -impl AsyncRead for BufReader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // If we don't have any buffered data and we're doing a massive read - // (larger than our internal buffer), bypass our internal buffer - // entirely. - if self.pos == self.cap && buf.len() >= self.buf.len() { - let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); - self.discard_buffer(); - return Poll::Ready(res); - } - let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; - let nread = rem.read(buf)?; - self.consume(nread); - Poll::Ready(Ok(nread)) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufReader { - #[project] - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - let BufReader { - inner, - buf, - cap, - pos, - } = self.project(); - - // If we've reached the end of our internal buffer then we need to fetch - // some more data from the underlying reader. - // Branch using `>=` instead of the more correct `==` - // to tell the compiler that the pos..cap slice is always valid. - if *pos >= *cap { - debug_assert!(*pos == *cap); - *cap = ready!(inner.poll_read(cx, buf))?; - *pos = 0; - } - Poll::Ready(Ok(&buf[*pos..*cap])) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - let me = self.project(); - *me.pos = cmp::min(*me.pos + amt, *me.cap); - } -} - -impl AsyncWrite for BufReader { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.get_pin_mut().poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_shutdown(cx) - } -} - -impl fmt::Debug for BufReader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufReader") - .field("reader", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), - ) - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/io/buf_stream.rs b/tokio/src/io/io/buf_stream.rs deleted file mode 100644 index 51b6bbe5..00000000 --- a/tokio/src/io/io/buf_stream.rs +++ /dev/null @@ -1,144 +0,0 @@ -use crate::io::io::{BufReader, BufWriter}; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; - -use pin_project::pin_project; -use std::io::{self}; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. -/// -/// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] -/// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall -/// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] -/// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps -/// one in the other so that both directions are buffered. See their documentation for details. -#[pin_project] -#[derive(Debug)] -pub struct BufStream(#[pin] BufReader>); - -impl BufStream { - /// Wrap a type in both [`BufWriter`] and [`BufReader`]. - /// - /// See the documentation for those types and [`BufStream`] for details. - pub fn new(stream: RW) -> BufStream { - BufStream(BufReader::new(BufWriter::new(stream))) - } - - /// Gets a reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_ref(&self) -> &RW { - self.0.get_ref().get_ref() - } - - /// Gets a mutable reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_mut(&mut self) -> &mut RW { - self.0.get_mut().get_mut() - } - - /// Gets a pinned mutable reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { - self.project().0.get_pin_mut().get_pin_mut() - } - - /// Consumes this `BufStream`, returning the underlying I/O object. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> RW { - self.0.into_inner().into_inner() - } -} - -impl From>> for BufStream { - fn from(b: BufReader>) -> Self { - BufStream(b) - } -} - -impl From>> for BufStream { - fn from(b: BufWriter>) -> Self { - // we need to "invert" the reader and writer - let BufWriter { - inner: - BufReader { - inner, - buf: rbuf, - pos, - cap, - }, - buf: wbuf, - written, - } = b; - - BufStream(BufReader { - inner: BufWriter { - inner, - buf: wbuf, - written, - }, - buf: rbuf, - pos, - cap, - }) - } -} - -impl AsyncWrite for BufStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().0.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_shutdown(cx) - } -} - -impl AsyncRead for BufStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().0.poll_read(cx, buf) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufStream { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().0.consume(amt) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/io/buf_writer.rs b/tokio/src/io/io/buf_writer.rs deleted file mode 100644 index 7415c878..00000000 --- a/tokio/src/io/io/buf_writer.rs +++ /dev/null @@ -1,197 +0,0 @@ -use crate::io::io::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; - -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::fmt; -use std::io::{self, Write}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Wraps a writer and buffers its output. -/// -/// It can be excessively inefficient to work directly with something that -/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and -/// writes it to an underlying writer in large, infrequent batches. -/// -/// `BufWriter` can improve the speed of programs that make *small* and -/// *repeated* write calls to the same file or network socket. It does not -/// help when writing very large amounts at once, or writing just one or a few -/// times. It also provides no advantage when writing to a destination that is -/// in memory, like a `Vec`. -/// -/// When the `BufWriter` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufWriter` on the same -/// stream can cause data loss. If you need to write out the contents of its -/// buffer, you must manually call flush before the writer is dropped. -/// -/// [`AsyncWrite`]: AsyncWrite -/// [`flush`]: super::AsyncWriteExt::flush -/// -// TODO: Examples -#[pin_project] -pub struct BufWriter { - #[pin] - pub(super) inner: W, - pub(super) buf: Vec, - pub(super) written: usize, -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: Vec::with_capacity(cap), - written: 0, - } - } - - #[project] - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - let BufWriter { - mut inner, - buf, - written, - } = self.project(); - - let len = buf.len(); - let mut ret = Ok(()); - while *written < len { - match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - if *written > 0 { - buf.drain(..*written); - } - *written = 0; - Poll::Ready(ret) - } - - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } - - /// Returns a reference to the internally buffered data. - pub fn buffer(&self) -> &[u8] { - &self.buf - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.buf.len() + buf.len() > self.buf.capacity() { - ready!(self.as_mut().flush_buf(cx))?; - } - - let me = self.project(); - if buf.len() >= me.buf.capacity() { - me.inner.poll_write(cx, buf) - } else { - Poll::Ready(me.buf.write(buf)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.get_pin_mut().poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.get_pin_mut().poll_shutdown(cx) - } -} - -impl AsyncRead for BufWriter { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.get_pin_mut().poll_read(cx, buf) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.get_ref().prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufWriter { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_pin_mut().consume(amt) - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buf.len(), self.buf.capacity()), - ) - .field("written", &self.written) - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/io/chain.rs b/tokio/src/io/io/chain.rs deleted file mode 100644 index 3f997ff5..00000000 --- a/tokio/src/io/io/chain.rs +++ /dev/null @@ -1,146 +0,0 @@ -use crate::io::{AsyncBufRead, AsyncRead}; - -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::fmt; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Stream for the [`chain`](super::AsyncReadExt::chain) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Chain { - #[pin] - first: T, - #[pin] - second: U, - done_first: bool, -} - -pub(super) fn chain(first: T, second: U) -> Chain -where - T: AsyncRead, - U: AsyncRead, -{ - Chain { - first, - second, - done_first: false, - } -} - -impl Chain -where - T: AsyncRead, - U: AsyncRead, -{ - /// Gets references to the underlying readers in this `Chain`. - pub fn get_ref(&self) -> (&T, &U) { - (&self.first, &self.second) - } - - /// Gets mutable references to the underlying readers in this `Chain`. - /// - /// Care should be taken to avoid modifying the internal I/O state of the - /// underlying readers as doing so may corrupt the internal state of this - /// `Chain`. - pub fn get_mut(&mut self) -> (&mut T, &mut U) { - (&mut self.first, &mut self.second) - } - - /// Gets pinned mutable references to the underlying readers in this `Chain`. - /// - /// Care should be taken to avoid modifying the internal I/O state of the - /// underlying readers as doing so may corrupt the internal state of this - /// `Chain`. - pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) { - let me = self.project(); - (me.first, me.second) - } - - /// Consumes the `Chain`, returning the wrapped readers. - pub fn into_inner(self) -> (T, U) { - (self.first, self.second) - } -} - -impl fmt::Debug for Chain -where - T: fmt::Debug, - U: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Chain") - .field("t", &self.first) - .field("u", &self.second) - .finish() - } -} - -impl AsyncRead for Chain -where - T: AsyncRead, - U: AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let me = self.project(); - - if !*me.done_first { - match ready!(me.first.poll_read(cx, buf)?) { - 0 if !buf.is_empty() => *me.done_first = true, - n => return Poll::Ready(Ok(n)), - } - } - me.second.poll_read(cx, buf) - } -} - -impl AsyncBufRead for Chain -where - T: AsyncBufRead, - U: AsyncBufRead, -{ - #[project] - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - let Chain { - first, - second, - done_first, - } = self.project(); - - if !*done_first { - match ready!(first.poll_fill_buf(cx)?) { - buf if buf.is_empty() => { - *done_first = true; - } - buf => return Poll::Ready(Ok(buf)), - } - } - second.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - let me = self.project(); - if !*me.done_first { - me.first.consume(amt) - } else { - me.second.consume(amt) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/io/copy.rs b/tokio/src/io/io/copy.rs deleted file mode 100644 index 88c87630..00000000 --- a/tokio/src/io/io/copy.rs +++ /dev/null @@ -1,134 +0,0 @@ -use crate::io::{AsyncRead, AsyncWrite}; - -use futures_core::ready; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A future that asynchronously copies the entire contents of a reader into a -/// writer. -/// -/// This struct is generally created by calling [`copy`][copy]. Please -/// see the documentation of `copy()` for more details. -/// -/// [copy]: fn.copy.html -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Copy<'a, R: ?Sized, W: ?Sized> { - reader: &'a mut R, - read_done: bool, - writer: &'a mut W, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, -} - -/// Asynchronously copies the entire contents of a reader into a writer. -/// -/// This function returns a future that will continuously read data from -/// `reader` and then write it into `writer` in a streaming fashion until -/// `reader` returns EOF. -/// -/// On success, the total number of bytes that were copied from -/// `reader` to `writer` is returned. -/// -/// This is an asynchronous version of [`std::io::copy`][std]. -/// -/// # Errors -/// -/// The returned future will finish with an error will return an error -/// immediately if any call to `poll_read` or `poll_write` returns an error. -/// -/// # Examples -/// -/// ``` -/// use tokio::io; -/// -/// # async fn dox() -> std::io::Result<()> { -/// let mut reader: &[u8] = b"hello"; -/// let mut writer: Vec = vec![]; -/// -/// io::copy(&mut reader, &mut writer).await?; -/// -/// assert_eq!(&b"hello"[..], &writer[..]); -/// # Ok(()) -/// # } -/// ``` -/// -/// [std]: https://doc.rust-lang.org/std/io/fn.copy.html -pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W> -where - R: AsyncRead + Unpin + ?Sized, - W: AsyncWrite + Unpin + ?Sized, -{ - Copy { - reade