diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/process | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/process')
-rw-r--r-- | tokio/src/process/mod.rs | 32 | ||||
-rw-r--r-- | tokio/src/process/unix/mod.rs | 30 | ||||
-rw-r--r-- | tokio/src/process/unix/reap.rs | 38 | ||||
-rw-r--r-- | tokio/src/process/windows.rs | 8 |
4 files changed, 58 insertions, 50 deletions
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index e777da52..3013a65a 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -58,7 +58,6 @@ //! use tokio::io::{BufReader, AsyncBufReadExt}; //! use tokio::process::Command; //! -//! use futures_util::stream::StreamExt; //! use std::process::Stdio; //! //! #[tokio::main] @@ -89,8 +88,8 @@ //! println!("child status was: {}", status); //! }); //! -//! while let Some(line) = reader.next().await { -//! println!("Line: {}", line?); +//! while let Some(line) = reader.next_line().await? { +//! println!("Line: {}", line); //! } //! //! Ok(()) @@ -120,8 +119,6 @@ mod kill; use crate::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use crate::process::kill::Kill; -use futures_core::TryFuture; -use futures_util::future::try_join3; use std::ffi::OsStr; use std::future::Future; use std::io; @@ -681,11 +678,14 @@ impl<T: Kill> Drop for ChildDropGuard<T> { } } -impl<T: TryFuture + Kill + Unpin> Future for ChildDropGuard<T> { - type Output = Result<T::Ok, T::Error>; +impl<T, E, F> Future for ChildDropGuard<F> +where + F: Future<Output = Result<T, E>> + Kill + Unpin, +{ + type Output = Result<T, E>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let ret = Pin::new(&mut self.inner).try_poll(cx); + let ret = Pin::new(&mut self.inner).poll(cx); if let Poll::Ready(Ok(_)) = ret { // Avoid the overhead of trying to kill a reaped process @@ -766,6 +766,8 @@ impl Child { /// new pipes between parent and child. Use `stdout(Stdio::piped())` or /// `stderr(Stdio::piped())`, respectively, when creating a `Command`. pub async fn wait_with_output(mut self) -> io::Result<Output> { + use crate::future::try_join3; + async fn read_to_end<A: AsyncRead + Unpin>(io: Option<A>) -> io::Result<Vec<u8>> { let mut vec = Vec::new(); if let Some(mut io) = io { @@ -940,16 +942,14 @@ mod sys { #[cfg(all(test, not(loom)))] mod test { + use super::kill::Kill; + use super::ChildDropGuard; + + use futures::future::FutureExt; use std::future::Future; use std::io; use std::pin::Pin; - use std::task::Context; - use std::task::Poll; - - use futures_util::future::FutureExt; - - use super::kill::Kill; - use super::ChildDropGuard; + use std::task::{Context, Poll}; struct Mock { num_kills: usize, @@ -1021,7 +1021,7 @@ mod test { let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(()))); let mut mock_err = Mock::with_result(Poll::Ready(Err(()))); - let waker = futures_util::task::noop_waker(); + let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); { let mut guard = ChildDropGuard::new(&mut mock_pending); diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index dfb69f21..72f6f0bf 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -22,14 +22,16 @@ //! bad in theory... mod orphan; +use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; + mod reap; +use reap::Reaper; -use self::orphan::{OrphanQueue, OrphanQueueImpl, Wait}; -use self::reap::Reaper; -use super::SpawnedChild; use crate::net::util::PollEvented; use crate::process::kill::Kill; +use crate::process::SpawnedChild; use crate::signal::unix::{signal, Signal, SignalKind}; + use mio::event::Evented; use mio::unix::{EventedFd, UnixReady}; use mio::{Poll as MioPoll, PollOpt, Ready, Token}; @@ -38,11 +40,11 @@ use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; -use std::process::{self, ExitStatus}; +use std::process::ExitStatus; use std::task::Context; use std::task::Poll; -impl Wait for process::Child { +impl Wait for std::process::Child { fn id(&self) -> u32 { self.id() } @@ -52,14 +54,14 @@ impl Wait for process::Child { } } -impl Kill for process::Child { +impl Kill for std::process::Child { fn kill(&mut self) -> io::Result<()> { self.kill() } } lazy_static::lazy_static! { - static ref ORPHAN_QUEUE: OrphanQueueImpl<process::Child> = OrphanQueueImpl::new(); + static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new(); } struct GlobalOrphanQueue; @@ -70,8 +72,8 @@ impl fmt::Debug for GlobalOrphanQueue { } } -impl OrphanQueue<process::Child> for GlobalOrphanQueue { - fn push_orphan(&self, orphan: process::Child) { +impl OrphanQueue<std::process::Child> for GlobalOrphanQueue { + fn push_orphan(&self, orphan: std::process::Child) { ORPHAN_QUEUE.push_orphan(orphan) } @@ -82,7 +84,7 @@ impl OrphanQueue<process::Child> for GlobalOrphanQueue { #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { - inner: Reaper<process::Child, GlobalOrphanQueue, Signal>, + inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>, } impl fmt::Debug for Child { @@ -93,7 +95,7 @@ impl fmt::Debug for Child { } } -pub(crate) fn spawn_child(cmd: &mut process::Command) -> io::Result<SpawnedChild> { +pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { let mut child = cmd.spawn()?; let stdin = stdio(child.stdin.take())?; let stdout = stdio(child.stdout.take())?; @@ -196,9 +198,9 @@ where } } -pub(crate) type ChildStdin = PollEvented<Fd<process::ChildStdin>>; -pub(crate) type ChildStdout = PollEvented<Fd<process::ChildStdout>>; -pub(crate) type ChildStderr = PollEvented<Fd<process::ChildStderr>>; +pub(crate) type ChildStdin = PollEvented<Fd<std::process::ChildStdin>>; +pub(crate) type ChildStdout = PollEvented<Fd<std::process::ChildStdout>>; +pub(crate) type ChildStderr = PollEvented<Fd<std::process::ChildStderr>>; fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Fd<T>>>> where diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index 631025d4..8963805a 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -1,6 +1,7 @@ -use super::orphan::{OrphanQueue, Wait}; +use crate::process::imp::orphan::{OrphanQueue, Wait}; use crate::process::kill::Kill; -use futures_core::stream::Stream; +use crate::signal::unix::Signal; + use std::future::Future; use std::io; use std::ops::Deref; @@ -22,6 +23,17 @@ where signal: S, } +// Work around removal of `futures_core` dependency +pub(crate) trait Stream: Unpin { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; +} + +impl Stream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + Signal::poll_recv(self, cx) + } +} + impl<W, Q, S> Deref for Reaper<W, Q, S> where W: Wait + Unpin, @@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S> where W: Wait + Unpin, Q: OrphanQueue<W> + Unpin, - S: Stream + Unpin, + S: Stream, { type Output = io::Result<ExitStatus>; @@ -85,7 +97,7 @@ where // this future's task will be notified/woken up again. Since the // futures model allows for spurious wake ups this extra wakeup // should not cause significant issues with parent futures. - let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending(); + let registered_interest = self.signal.poll_recv(cx).is_pending(); self.orphan_queue.reap_orphans(); if let Some(status) = self.inner_mut().try_wait()? { @@ -134,11 +146,10 @@ where #[cfg(all(test, not(loom)))] mod test { use super::*; - use futures_core::stream::Stream; - use futures_util::future::FutureExt; + + use futures::future::FutureExt; use std::cell::{Cell, RefCell}; use std::os::unix::process::ExitStatusExt; - use std::pin::Pin; use std::process::ExitStatus; use std::task::Context; use std::task::Poll; @@ -201,13 +212,10 @@ mod test { } impl Stream for MockStream { - type Item = io::Result<()>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let inner = Pin::get_mut(self); - inner.total_polls += 1; - match inner.values.remove(0) { - Some(()) => Poll::Ready(Some(Ok(()))), + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { + self.total_polls += 1; + match self.values.remove(0) { + Some(()) => Poll::Ready(Some(())), None => Poll::Pending, } } @@ -247,7 +255,7 @@ mod test { MockStream::new(vec![None, Some(()), None, None, None]), ); - let waker = futures_util::task::noop_waker(); + let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); // Not yet exited, interest registered diff --git a/tokio/src/process/windows.rs b/tokio/src/process/windows.rs index 013e9bb7..d25807d6 100644 --- a/tokio/src/process/windows.rs +++ b/tokio/src/process/windows.rs @@ -20,8 +20,6 @@ use crate::process::kill::Kill; use crate::process::SpawnedChild; use crate::sync::oneshot; -use futures_util::future::Fuse; -use futures_util::future::FutureExt; use mio_named_pipes::NamedPipe; use std::fmt; use std::future::Future; @@ -59,7 +57,7 @@ impl fmt::Debug for Child { } struct Waiting { - rx: Fuse<oneshot::Receiver<()>>, + rx: oneshot::Receiver<()>, wait_object: HANDLE, tx: *mut Option<oneshot::Sender<()>>, } @@ -103,7 +101,7 @@ impl Future for Child { let inner = Pin::get_mut(self); loop { if let Some(ref mut w) = inner.waiting { - match w.rx.poll_unpin(cx) { + match Pin::new(&mut w.rx).poll(cx) { Poll::Ready(Ok(())) => {} Poll::Ready(Err(_)) => panic!("should not be canceled"), Poll::Pending => return Poll::Pending, @@ -134,7 +132,7 @@ impl Future for Child { return Poll::Ready(Err(err)); } inner.waiting = Some(Waiting { - rx: rx.fuse(), + rx, wait_object, tx: ptr, }); |