summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/fs/blocking.rs1
-rw-r--r--tokio/src/fs/file.rs3
-rw-r--r--tokio/src/fs/read_dir.rs50
-rw-r--r--tokio/src/future.rs69
-rw-r--r--tokio/src/future/maybe_done.rs76
-rw-r--r--tokio/src/future/mod.rs15
-rw-r--r--tokio/src/future/pending.rs44
-rw-r--r--tokio/src/future/poll_fn.rs38
-rw-r--r--tokio/src/future/ready.rs27
-rw-r--r--tokio/src/future/try_join.rs115
-rw-r--r--tokio/src/io/async_read.rs1
-rw-r--r--tokio/src/io/async_write.rs1
-rw-r--r--tokio/src/io/io/lines.rs70
-rw-r--r--tokio/src/io/io/split.rs67
-rw-r--r--tokio/src/io/mod.rs15
-rw-r--r--tokio/src/io/split.rs1
-rw-r--r--tokio/src/io/util/async_buf_read_ext.rs (renamed from tokio/src/io/io/async_buf_read_ext.rs)28
-rw-r--r--tokio/src/io/util/async_read_ext.rs (renamed from tokio/src/io/io/async_read_ext.rs)14
-rw-r--r--tokio/src/io/util/async_write_ext.rs (renamed from tokio/src/io/io/async_write_ext.rs)8
-rw-r--r--tokio/src/io/util/buf_reader.rs (renamed from tokio/src/io/io/buf_reader.rs)3
-rw-r--r--tokio/src/io/util/buf_stream.rs (renamed from tokio/src/io/io/buf_stream.rs)2
-rw-r--r--tokio/src/io/util/buf_writer.rs (renamed from tokio/src/io/io/buf_writer.rs)3
-rw-r--r--tokio/src/io/util/chain.rs (renamed from tokio/src/io/io/chain.rs)1
-rw-r--r--tokio/src/io/util/copy.rs (renamed from tokio/src/io/io/copy.rs)1
-rw-r--r--tokio/src/io/util/empty.rs (renamed from tokio/src/io/io/empty.rs)0
-rw-r--r--tokio/src/io/util/flush.rs (renamed from tokio/src/io/io/flush.rs)0
-rw-r--r--tokio/src/io/util/lines.rs113
-rw-r--r--tokio/src/io/util/mod.rs (renamed from tokio/src/io/io/mod.rs)66
-rw-r--r--tokio/src/io/util/read.rs (renamed from tokio/src/io/io/read.rs)0
-rw-r--r--tokio/src/io/util/read_exact.rs (renamed from tokio/src/io/io/read_exact.rs)1
-rw-r--r--tokio/src/io/util/read_line.rs (renamed from tokio/src/io/io/read_line.rs)3
-rw-r--r--tokio/src/io/util/read_to_end.rs (renamed from tokio/src/io/io/read_to_end.rs)1
-rw-r--r--tokio/src/io/util/read_to_string.rs (renamed from tokio/src/io/io/read_to_string.rs)3
-rw-r--r--tokio/src/io/util/read_until.rs (renamed from tokio/src/io/io/read_until.rs)1
-rw-r--r--tokio/src/io/util/repeat.rs (renamed from tokio/src/io/io/repeat.rs)0
-rw-r--r--tokio/src/io/util/shutdown.rs (renamed from tokio/src/io/io/shutdown.rs)0
-rw-r--r--tokio/src/io/util/sink.rs (renamed from tokio/src/io/io/sink.rs)0
-rw-r--r--tokio/src/io/util/split.rs111
-rw-r--r--tokio/src/io/util/take.rs (renamed from tokio/src/io/io/take.rs)1
-rw-r--r--tokio/src/io/util/write.rs (renamed from tokio/src/io/io/write.rs)0
-rw-r--r--tokio/src/io/util/write_all.rs (renamed from tokio/src/io/io/write_all.rs)1
-rw-r--r--tokio/src/lib.rs13
-rw-r--r--tokio/src/net/addr.rs9
-rw-r--r--tokio/src/net/tcp/incoming.rs25
-rw-r--r--tokio/src/net/tcp/listener.rs11
-rw-r--r--tokio/src/net/tcp/mod.rs8
-rw-r--r--tokio/src/net/tcp/stream.rs3
-rw-r--r--tokio/src/net/udp/socket.rs3
-rw-r--r--tokio/src/net/udp/split.rs4
-rw-r--r--tokio/src/net/unix/datagram.rs3
-rw-r--r--tokio/src/net/unix/incoming.rs26
-rw-r--r--tokio/src/net/unix/listener.rs9
-rw-r--r--tokio/src/net/unix/stream.rs3
-rw-r--r--tokio/src/net/util/poll_evented.rs3
-rw-r--r--tokio/src/prelude.rs15
-rw-r--r--tokio/src/process/mod.rs32
-rw-r--r--tokio/src/process/unix/mod.rs30
-rw-r--r--tokio/src/process/unix/reap.rs38
-rw-r--r--tokio/src/process/windows.rs8
-rw-r--r--tokio/src/runtime/mod.rs8
-rw-r--r--tokio/src/runtime/thread_pool/tests/pool.rs2
-rw-r--r--tokio/src/runtime/thread_pool/tests/queue.rs2
-rw-r--r--tokio/src/signal/ctrl_c.rs61
-rw-r--r--tokio/src/signal/mod.rs56
-rw-r--r--tokio/src/signal/registry.rs31
-rw-r--r--tokio/src/signal/unix.rs32
-rw-r--r--tokio/src/signal/windows.rs63
-rw-r--r--tokio/src/stream.rs78
-rw-r--r--tokio/src/sync/barrier.rs3
-rw-r--r--tokio/src/sync/mpsc/bounded.rs144
-rw-r--r--tokio/src/sync/mpsc/chan.rs98
-rw-r--r--tokio/src/sync/mpsc/error.rs86
-rw-r--r--tokio/src/sync/mpsc/mod.rs7
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs102
-rw-r--r--tokio/src/sync/mutex.rs2
-rw-r--r--tokio/src/sync/oneshot.rs3
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs2
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs2
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs2
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs3
-rw-r--r--tokio/src/sync/watch.rs110
-rw-r--r--tokio/src/task/tests/loom.rs6
-rw-r--r--tokio/src/task/tests/task.rs2
-rw-r--r--tokio/src/time/clock.rs4
-rw-r--r--tokio/src/time/delay.rs54
-rw-r--r--tokio/src/time/delay_queue.rs23
-rw-r--r--tokio/src/time/driver/registration.rs8
-rw-r--r--tokio/src/time/interval.rs150
-rw-r--r--tokio/src/time/mod.rs32
-rw-r--r--tokio/src/time/tests/test_delay.rs52
-rw-r--r--tokio/src/time/tests/test_queue.rs2
-rw-r--r--tokio/src/time/throttle.rs17
-rw-r--r--tokio/src/time/timeout.rs195
93 files changed, 1370 insertions, 1269 deletions
diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs
index 695358a3..64398cbb 100644
--- a/tokio/src/fs/blocking.rs
+++ b/tokio/src/fs/blocking.rs
@@ -1,7 +1,6 @@
use crate::fs::sys;
use crate::io::{AsyncRead, AsyncWrite};
-use futures_core::ready;
use std::cmp;
use std::future::Future;
use std::io;
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs
index 3f18831e..0ff45025 100644
--- a/tokio/src/fs/file.rs
+++ b/tokio/src/fs/file.rs
@@ -7,7 +7,6 @@ use crate::fs::blocking::Buf;
use crate::fs::{asyncify, sys};
use crate::io::{AsyncRead, AsyncWrite};
-use futures_core::ready;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
@@ -430,7 +429,7 @@ impl File {
}
async fn complete_inflight(&mut self) {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());
diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs
index 9492a2f4..219c7b35 100644
--- a/tokio/src/fs/read_dir.rs
+++ b/tokio/src/fs/read_dir.rs
@@ -1,7 +1,5 @@
use crate::fs::{asyncify, sys};
-use futures_core::ready;
-use futures_core::stream::Stream;
use std::ffi::OsString;
use std::fs::{FileType, Metadata};
use std::future::Future;
@@ -50,10 +48,15 @@ enum State {
Pending(sys::Blocking<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
}
-impl Stream for ReadDir {
- type Item = io::Result<DirEntry>;
+impl ReadDir {
+ /// Returns the next entry in the directory stream.
+ pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> {
+ use crate::future::poll_fn;
+ poll_fn(|cx| self.poll_next_entry(cx)).await
+ }
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[doc(hidden)]
+ pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
State::Idle(ref mut std) => {
@@ -68,7 +71,11 @@ impl Stream for ReadDir {
let (ret, std) = ready!(Pin::new(rx).poll(cx))?;
self.0 = State::Idle(Some(std));
- let ret = ret.map(|res| res.map(|std| DirEntry(Arc::new(std))));
+ let ret = match ret {
+ Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))),
+ Some(Err(e)) => Err(e),
+ None => Ok(None),
+ };
return Poll::Ready(ret);
}
@@ -77,6 +84,19 @@ impl Stream for ReadDir {
}
}
+#[cfg(feature = "stream")]
+impl futures_core::Stream for ReadDir {
+ type Item = io::Result<DirEntry>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(match ready!(self.poll_next_entry(cx)) {
+ Ok(Some(entry)) => Some(Ok(entry)),
+ Ok(None) => None,
+ Err(err) => Some(Err(err)),
+ })
+ }
+}
+
/// Entries returned by the [`ReadDir`] stream.
///
/// [`ReadDir`]: struct.ReadDir.html
@@ -100,13 +120,11 @@ impl DirEntry {
///
/// ```no_run
/// use tokio::fs;
- /// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
- /// while let Some(res) = entries.next().await {
- /// let entry = res?;
+ /// while let Some(entry) = entries.next_entry().await? {
/// println!("{:?}", entry.path());
/// }
/// # Ok(())
@@ -133,13 +151,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
- /// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
- /// while let Some(res) = entries.next().await {
- /// let entry = res?;
+ /// while let Some(entry) = entries.next_entry().await? {
/// println!("{:?}", entry.file_name());
/// }
/// # Ok(())
@@ -164,14 +180,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
- /// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
- /// while let Some(res) = entries.next().await {
- /// let entry = res?;
- ///
+ /// while let Some(entry) = entries.next_entry().await? {
/// if let Ok(metadata) = entry.metadata().await {
/// // Now let's show our entry's permissions!
/// println!("{:?}: {:?}", entry.path(), metadata.permissions());
@@ -202,14 +215,11 @@ impl DirEntry {
///
/// ```
/// use tokio::fs;
- /// use tokio::prelude::*;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut entries = fs::read_dir(".").await?;
///
- /// while let Some(res) = entries.next().await {
- /// let entry = res?;
- ///
+ /// while let Some(entry) = entries.next_entry().await? {
/// if let Ok(file_type) = entry.file_type().await {
/// // Now let's show our entry's file type!
/// println!("{:?}: {:?}", entry.path(), file_type);
diff --git a/tokio/src/future.rs b/tokio/src/future.rs
deleted file mode 100644
index f6b7e4a7..00000000
--- a/tokio/src/future.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-//! Asynchronous values.
-
-#[cfg(feature = "time")]
-use crate::time::Timeout;
-
-#[cfg(feature = "time")]
-use std::time::Duration;
-
-#[doc(inline)]
-pub use futures_util::future::{err, ok, pending, poll_fn, ready};
-#[doc(inline)]
-pub use std::future::Future;
-
-/// An extension trait for `Future` that provides a variety of convenient
-/// combinator functions.
-///
-/// Currently, there only is a [`timeout`] function, but this will increase
-/// over time.
-///
-/// Users are not expected to implement this trait. All types that implement
-/// `Future` already implement `FutureExt`.
-///
-/// This trait can be imported directly or via the Tokio prelude: `use
-/// tokio::prelude::*`.
-///
-/// [`timeout`]: #method.timeout
-pub trait FutureExt: Future {
- /// Creates a new future which allows `self` until `timeout`.
- ///
- /// This combinator creates a new future which wraps the receiving future
- /// with a timeout. The returned future is allowed to execute until it
- /// completes or `timeout` has elapsed, whichever happens first.
- ///
- /// If the future completes before `timeout` then the future will resolve
- /// with that item. Otherwise the future will resolve to an error.
- ///
- /// The future is guaranteed to be polled at least once, even if `timeout`
- /// is set to zero.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::prelude::*;
- /// use std::time::Duration;
- ///
- /// async fn long_future() {
- /// // do work here
- /// }
- ///
- /// # async fn dox() {
- /// let res = long_future()
- /// .timeout(Duration::from_secs(1))
- /// .await;
- ///
- /// if res.is_err() {
- /// println!("operation timed out");
- /// }
- /// # }
- /// ```
- #[cfg(feature = "time")]
- fn timeout(self, timeout: Duration) -> Timeout<Self>
- where
- Self: Sized,
- {
- Timeout::new(self, timeout)
- }
-}
-
-impl<T: ?Sized> FutureExt for T where T: Future {}
diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs
new file mode 100644
index 00000000..5011544c
--- /dev/null
+++ b/tokio/src/future/maybe_done.rs
@@ -0,0 +1,76 @@
+//! Definition of the MaybeDone combinator
+
+use std::future::Future;
+use std::mem;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// A future that may have completed.
+#[derive(Debug)]
+pub(crate) enum MaybeDone<Fut: Future> {
+ /// A not-yet-completed future
+ Future(Fut),
+ /// The output of the completed future
+ Done(Fut::Output),
+ /// The empty variant after the result of a [`MaybeDone`] has been
+ /// taken using the [`take_output`](MaybeDone::take_output) method.
+ Gone,
+}
+
+// Safe because we never generate `Pin<&mut Fut::Output>`
+impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
+
+/// Wraps a future into a `MaybeDone`
+pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
+ MaybeDone::Future(future)
+}
+
+impl<Fut: Future> MaybeDone<Fut> {
+ /// Returns an [`Option`] containing a mutable reference to the output of the future.
+ /// The output of this method will be [`Some`] if and only if the inner
+ /// future has been completed and [`take_output`](MaybeDone::take_output)
+ /// has not yet been called.
+ pub(crate) fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
+ }
+ }
+
+ /// Attempt to take the output of a `MaybeDone` without driving it
+ /// towards completion.
+ #[inline]
+ pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(_) => {}
+ MaybeDone::Future(_) | MaybeDone::Gone => return None,
+ };
+ if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
+ Some(output)
+ } else {
+ unreachable!()
+ }
+ }
+ }
+}
+
+impl<Fut: Future> Future for MaybeDone<Fut> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let res = unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
+ MaybeDone::Done(_) => return Poll::Ready(()),
+ MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+ }
+ };
+ self.set(MaybeDone::Done(res));
+ Poll::Ready(())
+ }
+}
diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs
new file mode 100644
index 00000000..9a155bf7
--- /dev/null
+++ b/tokio/src/future/mod.rs
@@ -0,0 +1,15 @@
+#![allow(unused_imports, dead_code)]
+
+//! Asynchronous values.
+
+mod maybe_done;
+pub(crate) use maybe_done::{maybe_done, MaybeDone};
+
+mod poll_fn;
+pub(crate) use poll_fn::poll_fn;
+
+mod ready;
+