summaryrefslogtreecommitdiffstats
path: root/tokio/src/process
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/process
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/process')
-rw-r--r--tokio/src/process/mod.rs32
-rw-r--r--tokio/src/process/unix/mod.rs30
-rw-r--r--tokio/src/process/unix/reap.rs38
-rw-r--r--tokio/src/process/windows.rs8
4 files changed, 58 insertions, 50 deletions
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs
index e777da52..3013a65a 100644
--- a/tokio/src/process/mod.rs
+++ b/tokio/src/process/mod.rs
@@ -58,7 +58,6 @@
//! use tokio::io::{BufReader, AsyncBufReadExt};
//! use tokio::process::Command;
//!
-//! use futures_util::stream::StreamExt;
//! use std::process::Stdio;
//!
//! #[tokio::main]
@@ -89,8 +88,8 @@
//! println!("child status was: {}", status);
//! });
//!
-//! while let Some(line) = reader.next().await {
-//! println!("Line: {}", line?);
+//! while let Some(line) = reader.next_line().await? {
+//! println!("Line: {}", line);
//! }
//!
//! Ok(())
@@ -120,8 +119,6 @@ mod kill;
use crate::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use crate::process::kill::Kill;
-use futures_core::TryFuture;
-use futures_util::future::try_join3;
use std::ffi::OsStr;
use std::future::Future;
use std::io;
@@ -681,11 +678,14 @@ impl<T: Kill> Drop for ChildDropGuard<T> {
}
}
-impl<T: TryFuture + Kill + Unpin> Future for ChildDropGuard<T> {
- type Output = Result<T::Ok, T::Error>;
+impl<T, E, F> Future for ChildDropGuard<F>
+where
+ F: Future<Output = Result<T, E>> + Kill + Unpin,
+{
+ type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let ret = Pin::new(&mut self.inner).try_poll(cx);
+ let ret = Pin::new(&mut self.inner).poll(cx);
if let Poll::Ready(Ok(_)) = ret {
// Avoid the overhead of trying to kill a reaped process
@@ -766,6 +766,8 @@ impl Child {
/// new pipes between parent and child. Use `stdout(Stdio::piped())` or
/// `stderr(Stdio::piped())`, respectively, when creating a `Command`.
pub async fn wait_with_output(mut self) -> io::Result<Output> {
+ use crate::future::try_join3;
+
async fn read_to_end<A: AsyncRead + Unpin>(io: Option<A>) -> io::Result<Vec<u8>> {
let mut vec = Vec::new();
if let Some(mut io) = io {
@@ -940,16 +942,14 @@ mod sys {
#[cfg(all(test, not(loom)))]
mod test {
+ use super::kill::Kill;
+ use super::ChildDropGuard;
+
+ use futures::future::FutureExt;
use std::future::Future;
use std::io;
use std::pin::Pin;
- use std::task::Context;
- use std::task::Poll;
-
- use futures_util::future::FutureExt;
-
- use super::kill::Kill;
- use super::ChildDropGuard;
+ use std::task::{Context, Poll};
struct Mock {
num_kills: usize,
@@ -1021,7 +1021,7 @@ mod test {
let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(())));
let mut mock_err = Mock::with_result(Poll::Ready(Err(())));
- let waker = futures_util::task::noop_waker();
+ let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
{
let mut guard = ChildDropGuard::new(&mut mock_pending);
diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs
index dfb69f21..72f6f0bf 100644
--- a/tokio/src/process/unix/mod.rs
+++ b/tokio/src/process/unix/mod.rs
@@ -22,14 +22,16 @@
//! bad in theory...
mod orphan;
+use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
+
mod reap;
+use reap::Reaper;
-use self::orphan::{OrphanQueue, OrphanQueueImpl, Wait};
-use self::reap::Reaper;
-use super::SpawnedChild;
use crate::net::util::PollEvented;
use crate::process::kill::Kill;
+use crate::process::SpawnedChild;
use crate::signal::unix::{signal, Signal, SignalKind};
+
use mio::event::Evented;
use mio::unix::{EventedFd, UnixReady};
use mio::{Poll as MioPoll, PollOpt, Ready, Token};
@@ -38,11 +40,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
-use std::process::{self, ExitStatus};
+use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
-impl Wait for process::Child {
+impl Wait for std::process::Child {
fn id(&self) -> u32 {
self.id()
}
@@ -52,14 +54,14 @@ impl Wait for process::Child {
}
}
-impl Kill for process::Child {
+impl Kill for std::process::Child {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}
lazy_static::lazy_static! {
- static ref ORPHAN_QUEUE: OrphanQueueImpl<process::Child> = OrphanQueueImpl::new();
+ static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
}
struct GlobalOrphanQueue;
@@ -70,8 +72,8 @@ impl fmt::Debug for GlobalOrphanQueue {
}
}
-impl OrphanQueue<process::Child> for GlobalOrphanQueue {
- fn push_orphan(&self, orphan: process::Child) {
+impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
+ fn push_orphan(&self, orphan: std::process::Child) {
ORPHAN_QUEUE.push_orphan(orphan)
}
@@ -82,7 +84,7 @@ impl OrphanQueue<process::Child> for GlobalOrphanQueue {
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
- inner: Reaper<process::Child, GlobalOrphanQueue, Signal>,
+ inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
}
impl fmt::Debug for Child {
@@ -93,7 +95,7 @@ impl fmt::Debug for Child {
}
}
-pub(crate) fn spawn_child(cmd: &mut process::Command) -> io::Result<SpawnedChild> {
+pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
let mut child = cmd.spawn()?;
let stdin = stdio(child.stdin.take())?;
let stdout = stdio(child.stdout.take())?;
@@ -196,9 +198,9 @@ where
}
}
-pub(crate) type ChildStdin = PollEvented<Fd<process::ChildStdin>>;
-pub(crate) type ChildStdout = PollEvented<Fd<process::ChildStdout>>;
-pub(crate) type ChildStderr = PollEvented<Fd<process::ChildStderr>>;
+pub(crate) type ChildStdin = PollEvented<Fd<std::process::ChildStdin>>;
+pub(crate) type ChildStdout = PollEvented<Fd<std::process::ChildStdout>>;
+pub(crate) type ChildStderr = PollEvented<Fd<std::process::ChildStderr>>;
fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Fd<T>>>>
where
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs
index 631025d4..8963805a 100644
--- a/tokio/src/process/unix/reap.rs
+++ b/tokio/src/process/unix/reap.rs
@@ -1,6 +1,7 @@
-use super::orphan::{OrphanQueue, Wait};
+use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
-use futures_core::stream::Stream;
+use crate::signal::unix::Signal;
+
use std::future::Future;
use std::io;
use std::ops::Deref;
@@ -22,6 +23,17 @@ where
signal: S,
}
+// Work around removal of `futures_core` dependency
+pub(crate) trait Stream: Unpin {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
+}
+
+impl Stream for Signal {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ Signal::poll_recv(self, cx)
+ }
+}
+
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
- S: Stream + Unpin,
+ S: Stream,
{
type Output = io::Result<ExitStatus>;
@@ -85,7 +97,7 @@ where
// this future's task will be notified/woken up again. Since the
// futures model allows for spurious wake ups this extra wakeup
// should not cause significant issues with parent futures.
- let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending();
+ let registered_interest = self.signal.poll_recv(cx).is_pending();
self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
@@ -134,11 +146,10 @@ where
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
- use futures_core::stream::Stream;
- use futures_util::future::FutureExt;
+
+ use futures::future::FutureExt;
use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
- use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
@@ -201,13 +212,10 @@ mod test {
}
impl Stream for MockStream {
- type Item = io::Result<()>;
-
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let inner = Pin::get_mut(self);
- inner.total_polls += 1;
- match inner.values.remove(0) {
- Some(()) => Poll::Ready(Some(Ok(()))),
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.total_polls += 1;
+ match self.values.remove(0) {
+ Some(()) => Poll::Ready(Some(())),
None => Poll::Pending,
}
}
@@ -247,7 +255,7 @@ mod test {
MockStream::new(vec![None, Some(()), None, None, None]),
);
- let waker = futures_util::task::noop_waker();
+ let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
// Not yet exited, interest registered
diff --git a/tokio/src/process/windows.rs b/tokio/src/process/windows.rs
index 013e9bb7..d25807d6 100644
--- a/tokio/src/process/windows.rs
+++ b/tokio/src/process/windows.rs
@@ -20,8 +20,6 @@ use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::sync::oneshot;
-use futures_util::future::Fuse;
-use futures_util::future::FutureExt;
use mio_named_pipes::NamedPipe;
use std::fmt;
use std::future::Future;
@@ -59,7 +57,7 @@ impl fmt::Debug for Child {
}
struct Waiting {
- rx: Fuse<oneshot::Receiver<()>>,
+ rx: oneshot::Receiver<()>,
wait_object: HANDLE,
tx: *mut Option<oneshot::Sender<()>>,
}
@@ -103,7 +101,7 @@ impl Future for Child {
let inner = Pin::get_mut(self);
loop {
if let Some(ref mut w) = inner.waiting {
- match w.rx.poll_unpin(cx) {
+ match Pin::new(&mut w.rx).poll(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => panic!("should not be canceled"),
Poll::Pending => return Poll::Pending,
@@ -134,7 +132,7 @@ impl Future for Child {
return Poll::Ready(Err(err));
}
inner.waiting = Some(Waiting {
- rx: rx.fuse(),
+ rx,
wait_object,
tx: ptr,
});