diff options
Diffstat (limited to 'tokio/src/process/unix/reap.rs')
-rw-r--r-- | tokio/src/process/unix/reap.rs | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index 631025d4..8963805a 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -1,6 +1,7 @@ -use super::orphan::{OrphanQueue, Wait}; +use crate::process::imp::orphan::{OrphanQueue, Wait}; use crate::process::kill::Kill; -use futures_core::stream::Stream; +use crate::signal::unix::Signal; + use std::future::Future; use std::io; use std::ops::Deref; @@ -22,6 +23,17 @@ where signal: S, } +// Work around removal of `futures_core` dependency +pub(crate) trait Stream: Unpin { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; +} + +impl Stream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + Signal::poll_recv(self, cx) + } +} + impl<W, Q, S> Deref for Reaper<W, Q, S> where W: Wait + Unpin, @@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S> where W: Wait + Unpin, Q: OrphanQueue<W> + Unpin, - S: Stream + Unpin, + S: Stream, { type Output = io::Result<ExitStatus>; @@ -85,7 +97,7 @@ where // this future's task will be notified/woken up again. Since the // futures model allows for spurious wake ups this extra wakeup // should not cause significant issues with parent futures. - let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending(); + let registered_interest = self.signal.poll_recv(cx).is_pending(); self.orphan_queue.reap_orphans(); if let Some(status) = self.inner_mut().try_wait()? { @@ -134,11 +146,10 @@ where #[cfg(all(test, not(loom)))] mod test { use super::*; - use futures_core::stream::Stream; - use futures_util::future::FutureExt; + + use futures::future::FutureExt; use std::cell::{Cell, RefCell}; use std::os::unix::process::ExitStatusExt; - use std::pin::Pin; use std::process::ExitStatus; use std::task::Context; use std::task::Poll; @@ -201,13 +212,10 @@ mod test { } impl Stream for MockStream { - type Item = io::Result<()>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let inner = Pin::get_mut(self); - inner.total_polls += 1; - match inner.values.remove(0) { - Some(()) => Poll::Ready(Some(Ok(()))), + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { + self.total_polls += 1; + match self.values.remove(0) { + Some(()) => Poll::Ready(Some(())), None => Poll::Pending, } } @@ -247,7 +255,7 @@ mod test { MockStream::new(vec![None, Some(()), None, None, None]), ); - let waker = futures_util::task::noop_waker(); + let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); // Not yet exited, interest registered |