summaryrefslogtreecommitdiffstats
path: root/tokio/src/process/unix/reap.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/process/unix/reap.rs
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/process/unix/reap.rs')
-rw-r--r--tokio/src/process/unix/reap.rs38
1 files changed, 23 insertions, 15 deletions
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs
index 631025d4..8963805a 100644
--- a/tokio/src/process/unix/reap.rs
+++ b/tokio/src/process/unix/reap.rs
@@ -1,6 +1,7 @@
-use super::orphan::{OrphanQueue, Wait};
+use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
-use futures_core::stream::Stream;
+use crate::signal::unix::Signal;
+
use std::future::Future;
use std::io;
use std::ops::Deref;
@@ -22,6 +23,17 @@ where
signal: S,
}
+// Work around removal of `futures_core` dependency
+pub(crate) trait Stream: Unpin {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
+}
+
+impl Stream for Signal {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ Signal::poll_recv(self, cx)
+ }
+}
+
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
- S: Stream + Unpin,
+ S: Stream,
{
type Output = io::Result<ExitStatus>;
@@ -85,7 +97,7 @@ where
// this future's task will be notified/woken up again. Since the
// futures model allows for spurious wake ups this extra wakeup
// should not cause significant issues with parent futures.
- let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending();
+ let registered_interest = self.signal.poll_recv(cx).is_pending();
self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
@@ -134,11 +146,10 @@ where
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
- use futures_core::stream::Stream;
- use futures_util::future::FutureExt;
+
+ use futures::future::FutureExt;
use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
- use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
@@ -201,13 +212,10 @@ mod test {
}
impl Stream for MockStream {
- type Item = io::Result<()>;
-
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let inner = Pin::get_mut(self);
- inner.total_polls += 1;
- match inner.values.remove(0) {
- Some(()) => Poll::Ready(Some(Ok(()))),
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.total_polls += 1;
+ match self.values.remove(0) {
+ Some(()) => Poll::Ready(Some(())),
None => Poll::Pending,
}
}
@@ -247,7 +255,7 @@ mod test {
MockStream::new(vec![None, Some(()), None, None, None]),
);
- let waker = futures_util::task::noop_waker();
+ let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
// Not yet exited, interest registered