summaryrefslogtreecommitdiffstats
path: root/tokio/src/future
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/future
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/future')
-rw-r--r--tokio/src/future/maybe_done.rs76
-rw-r--r--tokio/src/future/mod.rs15
-rw-r--r--tokio/src/future/pending.rs44
-rw-r--r--tokio/src/future/poll_fn.rs38
-rw-r--r--tokio/src/future/ready.rs27
-rw-r--r--tokio/src/future/try_join.rs115
6 files changed, 315 insertions, 0 deletions
diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs
new file mode 100644
index 00000000..5011544c
--- /dev/null
+++ b/tokio/src/future/maybe_done.rs
@@ -0,0 +1,76 @@
+//! Definition of the MaybeDone combinator
+
+use std::future::Future;
+use std::mem;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// A future that may have completed.
+#[derive(Debug)]
+pub(crate) enum MaybeDone<Fut: Future> {
+ /// A not-yet-completed future
+ Future(Fut),
+ /// The output of the completed future
+ Done(Fut::Output),
+ /// The empty variant after the result of a [`MaybeDone`] has been
+ /// taken using the [`take_output`](MaybeDone::take_output) method.
+ Gone,
+}
+
+// Safe because we never generate `Pin<&mut Fut::Output>`
+impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
+
+/// Wraps a future into a `MaybeDone`
+pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
+ MaybeDone::Future(future)
+}
+
+impl<Fut: Future> MaybeDone<Fut> {
+ /// Returns an [`Option`] containing a mutable reference to the output of the future.
+ /// The output of this method will be [`Some`] if and only if the inner
+ /// future has been completed and [`take_output`](MaybeDone::take_output)
+ /// has not yet been called.
+ pub(crate) fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
+ }
+ }
+
+ /// Attempt to take the output of a `MaybeDone` without driving it
+ /// towards completion.
+ #[inline]
+ pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ match this {
+ MaybeDone::Done(_) => {}
+ MaybeDone::Future(_) | MaybeDone::Gone => return None,
+ };
+ if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
+ Some(output)
+ } else {
+ unreachable!()
+ }
+ }
+ }
+}
+
+impl<Fut: Future> Future for MaybeDone<Fut> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let res = unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
+ MaybeDone::Done(_) => return Poll::Ready(()),
+ MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+ }
+ };
+ self.set(MaybeDone::Done(res));
+ Poll::Ready(())
+ }
+}
diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs
new file mode 100644
index 00000000..9a155bf7
--- /dev/null
+++ b/tokio/src/future/mod.rs
@@ -0,0 +1,15 @@
+#![allow(unused_imports, dead_code)]
+
+//! Asynchronous values.
+
+mod maybe_done;
+pub(crate) use maybe_done::{maybe_done, MaybeDone};
+
+mod poll_fn;
+pub(crate) use poll_fn::poll_fn;
+
+mod ready;
+pub(crate) use ready::{ok, Ready};
+
+mod try_join;
+pub(crate) use try_join::try_join3;
diff --git a/tokio/src/future/pending.rs b/tokio/src/future/pending.rs
new file mode 100644
index 00000000..c844ebc3
--- /dev/null
+++ b/tokio/src/future/pending.rs
@@ -0,0 +1,44 @@
+use std::future::Future;
+use std::marker;
+use sdt::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Future for the [`pending()`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+struct Pending<T> {
+ _data: marker::PhantomData<T>,
+}
+
+/// Creates a future which never resolves, representing a computation that never
+/// finishes.
+///
+/// The returned future will forever return [`Poll::Pending`].
+///
+/// # Examples
+///
+/// ```no_run
+/// use tokio::future;
+///
+/// #[tokio::main]
+/// async fn main {
+/// future::pending().await;
+/// unreachable!();
+/// }
+/// ```
+pub async fn pending() -> ! {
+ Pending {
+ _data: marker::PhantomData,
+ }.await
+}
+
+impl<T> Future for Pending<T> {
+ type Output = !;
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
+ Poll::Pending
+ }
+}
+
+impl<T> Unpin for Pending<T> {
+}
diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs
new file mode 100644
index 00000000..ce2a5524
--- /dev/null
+++ b/tokio/src/future/poll_fn.rs
@@ -0,0 +1,38 @@
+//! Definition of the `PollFn` adapter combinator
+
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Future for the [`poll_fn`] function.
+pub(crate) struct PollFn<F> {
+ f: F,
+}
+
+impl<F> Unpin for PollFn<F> {}
+
+/// Creates a new future wrapping around a function returning [`Poll`].
+pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
+{
+ PollFn { f }
+}
+
+impl<F> fmt::Debug for PollFn<F> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PollFn").finish()
+ }
+}
+
+impl<T, F> Future for PollFn<F>
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
+{
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ (&mut self.f)(cx)
+ }
+}
diff --git a/tokio/src/future/ready.rs b/tokio/src/future/ready.rs
new file mode 100644
index 00000000..ba5d4804
--- /dev/null
+++ b/tokio/src/future/ready.rs
@@ -0,0 +1,27 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Future for the [`ready`](ready()) function.
+///
+/// `pub` in order to use the future as an associated type in a sealed trait.
+#[derive(Debug)]
+// Used as an associated type in a "sealed" trait.
+#[allow(unreachable_pub)]
+pub struct Ready<T>(Option<T>);
+
+impl<T> Unpin for Ready<T> {}
+
+impl<T> Future for Ready<T> {
+ type Output = T;
+
+ #[inline]
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
+ Poll::Ready(self.0.take().unwrap())
+ }
+}
+
+/// Create a future that is immediately ready with a success value.
+pub(crate) fn ok<T, E>(t: T) -> Ready<Result<T, E>> {
+ Ready(Some(Ok(t)))
+}
diff --git a/tokio/src/future/try_join.rs b/tokio/src/future/try_join.rs
new file mode 100644
index 00000000..478c69dc
--- /dev/null
+++ b/tokio/src/future/try_join.rs
@@ -0,0 +1,115 @@
+use crate::future::{maybe_done, MaybeDone};
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pub(crate) fn try_join3<T1, F1, T2, F2, T3, F3, E>(
+ future1: F1,
+ future2: F2,
+ future3: F3,
+) -> TryJoin3<F1, F2, F3>
+where
+ F1: Future<Output = Result<T1, E>>,
+ F2: Future<Output = Result<T2, E>>,
+ F3: Future<Output = Result<T3, E>>,
+{
+ TryJoin3 {
+ future1: maybe_done(future1),
+ future2: maybe_done(future2),
+ future3: maybe_done(future3),
+ }
+}
+
+pub(crate) struct TryJoin3<F1, F2, F3>
+where
+ F1: Future,
+ F2: Future,
+ F3: Future,
+{
+ future1: MaybeDone<F1>,
+ future2: MaybeDone<F2>,
+ future3: MaybeDone<F3>,
+}
+
+impl<T1, F1, T2, F2, T3, F3, E> Future for TryJoin3<F1, F2, F3>
+where
+ F1: Future<Output = Result<T1, E>>,
+ F2: Future<Output = Result<T2, E>>,
+ F3: Future<Output = Result<T3, E>>,
+{
+ type Output = Result<(T1, T2, T3), E>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut all_done = true;
+
+ // Safety: the fn takes `Pin`, we don't move any data out of `self`.
+ unsafe {
+ let me = self.get_unchecked_mut();
+
+ if Pin::new_unchecked(&mut me.future1).poll(cx).is_pending() {
+ all_done = false;
+ } else if Pin::new_unchecked(&mut me.future1)
+ .output_mut()
+ .unwrap()
+ .is_err()
+ {
+ return Poll::Ready(Err(Pin::new_unchecked(&mut me.future1)
+ .take_output()
+ .unwrap()
+ .err()
+ .unwrap()));
+ }
+
+ if Pin::new_unchecked(&mut me.future2).poll(cx).is_pending() {
+ all_done = false;
+ } else if Pin::new_unchecked(&mut me.future2)
+ .output_mut()
+ .unwrap()
+ .is_err()
+ {
+ return Poll::Ready(Err(Pin::new_unchecked(&mut me.future2)
+ .take_output()
+ .unwrap()
+ .err()
+ .unwrap()));
+ }
+
+ if Pin::new_unchecked(&mut me.future3).poll(cx).is_pending() {
+ all_done = false;
+ } else if Pin::new_unchecked(&mut me.future3)
+ .output_mut()
+ .unwrap()
+ .is_err()
+ {
+ return Poll::Ready(Err(Pin::new_unchecked(&mut me.future3)
+ .take_output()
+ .unwrap()
+ .err()
+ .unwrap()));
+ }
+
+ if all_done {
+ Poll::Ready(Ok((
+ Pin::new_unchecked(&mut me.future1)
+ .take_output()
+ .unwrap()
+ .ok()
+ .unwrap(),
+ Pin::new_unchecked(&mut me.future2)
+ .take_output()
+ .unwrap()
+ .ok()
+ .unwrap(),
+ Pin::new_unchecked(&mut me.future3)
+ .take_output()
+ .unwrap()
+ .ok()
+ .unwrap(),
+ )))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+}