diff options
author | Ivan Petkov <ivanppetkov@gmail.com> | 2020-10-06 10:30:16 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-06 17:30:16 +0000 |
commit | 4cf45c038b9691f24fac22df13594c2223b185f6 (patch) | |
tree | 2e9a92280ce9860daf2a8b3d57ae11441264097b /tokio/src/process | |
parent | 9730317e94cd5bfca237376549405a6feb815223 (diff) |
process: add ProcessDriver to handle orphan reaping (#2907)
Diffstat (limited to 'tokio/src/process')
-rw-r--r-- | tokio/src/process/mod.rs | 5 | ||||
-rw-r--r-- | tokio/src/process/unix/driver.rs | 154 | ||||
-rw-r--r-- | tokio/src/process/unix/mod.rs | 30 | ||||
-rw-r--r-- | tokio/src/process/unix/orphan.rs | 78 | ||||
-rw-r--r-- | tokio/src/process/unix/reap.rs | 57 |
5 files changed, 240 insertions, 84 deletions
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 7da9783b..ff84428e 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -113,6 +113,11 @@ #[cfg(unix)] mod imp; +#[cfg(unix)] +pub(crate) mod unix { + pub(crate) use super::imp::*; +} + #[path = "windows.rs"] #[cfg(windows)] mod imp; diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs new file mode 100644 index 00000000..2eea0043 --- /dev/null +++ b/tokio/src/process/unix/driver.rs @@ -0,0 +1,154 @@ +//! Process driver + +use crate::park::Park; +use crate::process::unix::orphan::ReapOrphanQueue; +use crate::process::unix::GlobalOrphanQueue; +use crate::signal::unix::driver::Driver as SignalDriver; +use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind}; +use crate::sync::mpsc::error::TryRecvError; + +use std::io; +use std::time::Duration; + +/// Responsible for cleaning up orphaned child processes on Unix platforms. +#[derive(Debug)] +pub(crate) struct Driver { + park: SignalDriver, + inner: CoreDriver<Signal, GlobalOrphanQueue>, +} + +#[derive(Debug)] +struct CoreDriver<S, Q> { + sigchild: S, + orphan_queue: Q, +} + +// ===== impl CoreDriver ===== + +impl<S, Q> CoreDriver<S, Q> +where + S: InternalStream, + Q: ReapOrphanQueue, +{ + fn got_signal(&mut self) -> bool { + match self.sigchild.try_recv() { + Ok(()) => true, + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Closed) => panic!("signal was deregistered"), + } + } + + fn process(&mut self) { + if self.got_signal() { + // Drain all notifications which may have been buffered + // so we can try to reap all orphans in one batch + while self.got_signal() {} + + self.orphan_queue.reap_orphans(); + } + } +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: SignalDriver) -> io::Result<Self> { + let sigchild = signal_with_handle(SignalKind::child(), park.handle())?; + let inner = CoreDriver { + sigchild, + orphan_queue: GlobalOrphanQueue, + }; + + Ok(Self { park, inner }) + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = <SignalDriver as Park>::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.inner.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.inner.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::process::unix::orphan::test::MockQueue; + use crate::sync::mpsc::error::TryRecvError; + use std::task::{Context, Poll}; + + struct MockStream { + total_try_recv: usize, + values: Vec<Option<()>>, + } + + impl MockStream { + fn new(values: Vec<Option<()>>) -> Self { + Self { + total_try_recv: 0, + values, + } + } + } + + impl InternalStream for MockStream { + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { + unimplemented!(); + } + + fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.total_try_recv += 1; + match self.values.remove(0) { + Some(()) => Ok(()), + None => Err(TryRecvError::Empty), + } + } + } + + #[test] + fn no_reap_if_no_signal() { + let mut driver = CoreDriver { + sigchild: MockStream::new(vec![None]), + orphan_queue: MockQueue::<()>::new(), + }; + + driver.process(); + + assert_eq!(1, driver.sigchild.total_try_recv); + assert_eq!(0, driver.orphan_queue.total_reaps.get()); + } + + #[test] + fn coalesce_signals_before_reaping() { + let mut driver = CoreDriver { + sigchild: MockStream::new(vec![Some(()), Some(()), None]), + orphan_queue: MockQueue::<()>::new(), + }; + + driver.process(); + + assert_eq!(3, driver.sigchild.total_try_recv); + assert_eq!(1, driver.orphan_queue.total_reaps.get()); + } +} diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 46f1d790..db9d592c 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -21,8 +21,10 @@ //! processes in general aren't scalable (e.g. millions) so it shouldn't be that //! bad in theory... -mod orphan; -use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; +pub(crate) mod driver; + +pub(crate) mod orphan; +use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait}; mod reap; use reap::Reaper; @@ -39,11 +41,11 @@ use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; -use std::process::ExitStatus; +use std::process::{Child as StdChild, ExitStatus}; use std::task::Context; use std::task::Poll; -impl Wait for std::process::Child { +impl Wait for StdChild { fn id(&self) -> u32 { self.id() } @@ -53,17 +55,17 @@ impl Wait for std::process::Child { } } -impl Kill for std::process::Child { +impl Kill for StdChild { fn kill(&mut self) -> io::Result<()> { self.kill() } } lazy_static::lazy_static! { - static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new(); + static ref ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new(); } -struct GlobalOrphanQueue; +pub(crate) struct GlobalOrphanQueue; impl fmt::Debug for GlobalOrphanQueue { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -71,19 +73,21 @@ impl fmt::Debug for GlobalOrphanQueue { } } -impl OrphanQueue<std::process::Child> for GlobalOrphanQueue { - fn push_orphan(&self, orphan: std::process::Child) { - ORPHAN_QUEUE.push_orphan(orphan) - } - +impl ReapOrphanQueue for GlobalOrphanQueue { fn reap_orphans(&self) { ORPHAN_QUEUE.reap_orphans() } } +impl OrphanQueue<StdChild> for GlobalOrphanQueue { + fn push_orphan(&self, orphan: StdChild) { + ORPHAN_QUEUE.push_orphan(orphan) + } +} + #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { - inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>, + inner: Reaper<StdChild, GlobalOrphanQueue, Signal>, } impl fmt::Debug for Child { diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index 6c449a90..8a1e1278 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -20,23 +20,29 @@ impl<T: Wait> Wait for &mut T { } } -/// An interface for queueing up an orphaned process so that it can be reaped. -pub(crate) trait OrphanQueue<T> { - /// Adds an orphan to the queue. - fn push_orphan(&self, orphan: T); +/// An interface for reaping a set of orphaned processes. +pub(crate) trait ReapOrphanQueue { /// Attempts to reap every process in the queue, ignoring any errors and /// enqueueing any orphans which have not yet exited. fn reap_orphans(&self); } +impl<T: ReapOrphanQueue> ReapOrphanQueue for &T { + fn reap_orphans(&self) { + (**self).reap_orphans() + } +} + +/// An interface for queueing up an orphaned process so that it can be reaped. +pub(crate) trait OrphanQueue<T>: ReapOrphanQueue { + /// Adds an orphan to the queue. + fn push_orphan(&self, orphan: T); +} + impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { fn push_orphan(&self, orphan: T) { (**self).push_orphan(orphan); } - - fn reap_orphans(&self) { - (**self).reap_orphans() - } } /// An implementation of `OrphanQueue`. @@ -62,42 +68,62 @@ impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> { fn push_orphan(&self, orphan: T) { self.queue.lock().unwrap().push(orphan) } +} +impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> { fn reap_orphans(&self) { let mut queue = self.queue.lock().unwrap(); let queue = &mut *queue; - let mut i = 0; - while i < queue.len() { + for i in (0..queue.len()).rev() { match queue[i].try_wait() { - Ok(Some(_)) => {} - Err(_) => { - // TODO: bubble up error some how. Is this an internal bug? - // Shoudl we panic? Is it OK for this to be silently - // dropped? - } - // Still not done yet - Ok(None) => { - i += 1; - continue; + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); } } - - queue.remove(i); } } } #[cfg(all(test, not(loom)))] -mod test { - use super::Wait; - use super::{OrphanQueue, OrphanQueueImpl}; - use std::cell::Cell; +pub(crate) mod test { + use super::*; + use std::cell::{Cell, RefCell}; use std::io; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; use std::rc::Rc; + pub(crate) struct MockQueue<W> { + pub(crate) all_enqueued: RefCell<Vec<W>>, + pub(crate) total_reaps: Cell<usize>, + } + + impl<W> MockQueue<W> { + pub(crate) fn new() -> Self { + Self { + all_enqueued: RefCell::new(Vec::new()), + total_reaps: Cell::new(0), + } + } + } + + impl<W> OrphanQueue<W> for MockQueue<W> { + fn push_orphan(&self, orphan: W) { + self.all_enqueued.borrow_mut().push(orphan); + } + } + + impl<W> ReapOrphanQueue for MockQueue<W> { + fn reap_orphans(&self) { + self.total_reaps.set(self.total_reaps.get() + 1); + } + } + struct MockWait { total_waits: Rc<Cell<usize>>, num_wait_until_status: usize, diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index c51a20b9..de483c44 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -1,6 +1,6 @@ use crate::process::imp::orphan::{OrphanQueue, Wait}; use crate::process::kill::Kill; -use crate::signal::unix::Signal; +use crate::signal::unix::InternalStream; use std::future::Future; use std::io; @@ -23,17 +23,6 @@ where signal: S, } -// Work around removal of `futures_core` dependency -pub(crate) trait Stream: Unpin { - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; -} - -impl Stream for Signal { - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { - Signal::poll_recv(self, cx) - } -} - impl<W, Q, S> Deref for Reaper<W, Q, S> where W: Wait + Unpin, @@ -72,7 +61,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S> where W: Wait + Unpin, Q: OrphanQueue<W> + Unpin, - S: Stream, + S: InternalStream, { type Output = io::Result<ExitStatus>; @@ -80,10 +69,8 @@ where loop { // If the child hasn't exited yet, then it's our responsibility to // ensure the current task gets notified when it might be able to - // make progress. - // - // As described in `spawn` above, we just indicate that we can - // next make progress once a SIGCHLD is received. + // make progress. We can use the delivery of a SIGCHLD signal as a + // sign that we can potentially make progress. // // However, we will register for a notification on the next signal // BEFORE we poll the child. Otherwise it is possible that the child @@ -99,7 +86,6 @@ where // should not cause significant issues with parent futures. let registered_interest = self.signal.poll_recv(cx).is_pending(); - self.orphan_queue.reap_orphans(); if let Some(status) = self.inner_mut().try_wait()? { return Poll::Ready(Ok(status)); } @@ -147,8 +133,9 @@ where mod test { use super::*; + use crate::process::unix::orphan::test::MockQueue; + use crate::sync::mpsc::error::TryRecvError; use futures::future::FutureExt; - use std::cell::{Cell, RefCell}; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; use std::task::Context; @@ -211,7 +198,7 @@ mod test { } } - impl Stream for MockStream { + impl InternalStream for MockStream { fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { self.total_polls += 1; match self.values.remove(0) { @@ -219,29 +206,9 @@ mod test { None => Poll::Pending, } } - } - struct MockQueue<W> { - all_enqueued: RefCell<Vec<W>>, - total_reaps: Cell<usize>, - } - - impl<W> MockQueue<W> { - fn new() -> Self { - Self { - all_enqueued: RefCell::new(Vec::new()), - total_reaps: Cell::new(0), - } - } - } - - impl<W: Wait> OrphanQueue<W> for MockQueue<W> { - fn push_orphan(&self, orphan: W) { - self.all_enqueued.borrow_mut().push(orphan); - } - - fn reap_orphans(&self) { - self.total_reaps.set(self.total_reaps.get() + 1); + fn try_recv(&mut self) -> Result<(), TryRecvError> { + unimplemented!(); } } @@ -262,7 +229,7 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); - assert_eq!(1, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Not yet exited, couldn't register interest the first time @@ -270,7 +237,7 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); - assert_eq!(3, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited @@ -283,7 +250,7 @@ mod test { } assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); - assert_eq!(4, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } |