summaryrefslogtreecommitdiffstats
path: root/tokio/src/process/unix/reap.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/process/unix/reap.rs')
-rw-r--r--tokio/src/process/unix/reap.rs38
1 files changed, 23 insertions, 15 deletions
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs
index 631025d4..8963805a 100644
--- a/tokio/src/process/unix/reap.rs
+++ b/tokio/src/process/unix/reap.rs
@@ -1,6 +1,7 @@
-use super::orphan::{OrphanQueue, Wait};
+use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
-use futures_core::stream::Stream;
+use crate::signal::unix::Signal;
+
use std::future::Future;
use std::io;
use std::ops::Deref;
@@ -22,6 +23,17 @@ where
signal: S,
}
+// Work around removal of `futures_core` dependency
+pub(crate) trait Stream: Unpin {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
+}
+
+impl Stream for Signal {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ Signal::poll_recv(self, cx)
+ }
+}
+
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@@ -60,7 +72,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
- S: Stream + Unpin,
+ S: Stream,
{
type Output = io::Result<ExitStatus>;
@@ -85,7 +97,7 @@ where
// this future's task will be notified/woken up again. Since the
// futures model allows for spurious wake ups this extra wakeup
// should not cause significant issues with parent futures.
- let registered_interest = Pin::new(&mut self.signal).poll_next(cx).is_pending();
+ let registered_interest = self.signal.poll_recv(cx).is_pending();
self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
@@ -134,11 +146,10 @@ where
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
- use futures_core::stream::Stream;
- use futures_util::future::FutureExt;
+
+ use futures::future::FutureExt;
use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
- use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
@@ -201,13 +212,10 @@ mod test {
}
impl Stream for MockStream {
- type Item = io::Result<()>;
-
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let inner = Pin::get_mut(self);
- inner.total_polls += 1;
- match inner.values.remove(0) {
- Some(()) => Poll::Ready(Some(Ok(()))),
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.total_polls += 1;
+ match self.values.remove(0) {
+ Some(()) => Poll::Ready(Some(())),
None => Poll::Pending,
}
}
@@ -247,7 +255,7 @@ mod test {
MockStream::new(vec![None, Some(()), None, None, None]),
);
- let waker = futures_util::task::noop_waker();
+ let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
// Not yet exited, interest registered