summaryrefslogtreecommitdiffstats
path: root/tokio/src/process
diff options
context:
space:
mode:
authorIvan Petkov <ivanppetkov@gmail.com>2020-10-06 10:30:16 -0700
committerGitHub <noreply@github.com>2020-10-06 17:30:16 +0000
commit4cf45c038b9691f24fac22df13594c2223b185f6 (patch)
tree2e9a92280ce9860daf2a8b3d57ae11441264097b /tokio/src/process
parent9730317e94cd5bfca237376549405a6feb815223 (diff)
process: add ProcessDriver to handle orphan reaping (#2907)
Diffstat (limited to 'tokio/src/process')
-rw-r--r--tokio/src/process/mod.rs5
-rw-r--r--tokio/src/process/unix/driver.rs154
-rw-r--r--tokio/src/process/unix/mod.rs30
-rw-r--r--tokio/src/process/unix/orphan.rs78
-rw-r--r--tokio/src/process/unix/reap.rs57
5 files changed, 240 insertions, 84 deletions
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs
index 7da9783b..ff84428e 100644
--- a/tokio/src/process/mod.rs
+++ b/tokio/src/process/mod.rs
@@ -113,6 +113,11 @@
#[cfg(unix)]
mod imp;
+#[cfg(unix)]
+pub(crate) mod unix {
+ pub(crate) use super::imp::*;
+}
+
#[path = "windows.rs"]
#[cfg(windows)]
mod imp;
diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs
new file mode 100644
index 00000000..2eea0043
--- /dev/null
+++ b/tokio/src/process/unix/driver.rs
@@ -0,0 +1,154 @@
+//! Process driver
+
+use crate::park::Park;
+use crate::process::unix::orphan::ReapOrphanQueue;
+use crate::process::unix::GlobalOrphanQueue;
+use crate::signal::unix::driver::Driver as SignalDriver;
+use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
+use crate::sync::mpsc::error::TryRecvError;
+
+use std::io;
+use std::time::Duration;
+
+/// Responsible for cleaning up orphaned child processes on Unix platforms.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ park: SignalDriver,
+ inner: CoreDriver<Signal, GlobalOrphanQueue>,
+}
+
+#[derive(Debug)]
+struct CoreDriver<S, Q> {
+ sigchild: S,
+ orphan_queue: Q,
+}
+
+// ===== impl CoreDriver =====
+
+impl<S, Q> CoreDriver<S, Q>
+where
+ S: InternalStream,
+ Q: ReapOrphanQueue,
+{
+ fn got_signal(&mut self) -> bool {
+ match self.sigchild.try_recv() {
+ Ok(()) => true,
+ Err(TryRecvError::Empty) => false,
+ Err(TryRecvError::Closed) => panic!("signal was deregistered"),
+ }
+ }
+
+ fn process(&mut self) {
+ if self.got_signal() {
+ // Drain all notifications which may have been buffered
+ // so we can try to reap all orphans in one batch
+ while self.got_signal() {}
+
+ self.orphan_queue.reap_orphans();
+ }
+ }
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: SignalDriver) -> io::Result<Self> {
+ let sigchild = signal_with_handle(SignalKind::child(), park.handle())?;
+ let inner = CoreDriver {
+ sigchild,
+ orphan_queue: GlobalOrphanQueue,
+ };
+
+ Ok(Self { park, inner })
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <SignalDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ self.inner.process();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ self.inner.process();
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::process::unix::orphan::test::MockQueue;
+ use crate::sync::mpsc::error::TryRecvError;
+ use std::task::{Context, Poll};
+
+ struct MockStream {
+ total_try_recv: usize,
+ values: Vec<Option<()>>,
+ }
+
+ impl MockStream {
+ fn new(values: Vec<Option<()>>) -> Self {
+ Self {
+ total_try_recv: 0,
+ values,
+ }
+ }
+ }
+
+ impl InternalStream for MockStream {
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ unimplemented!();
+ }
+
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.total_try_recv += 1;
+ match self.values.remove(0) {
+ Some(()) => Ok(()),
+ None => Err(TryRecvError::Empty),
+ }
+ }
+ }
+
+ #[test]
+ fn no_reap_if_no_signal() {
+ let mut driver = CoreDriver {
+ sigchild: MockStream::new(vec![None]),
+ orphan_queue: MockQueue::<()>::new(),
+ };
+
+ driver.process();
+
+ assert_eq!(1, driver.sigchild.total_try_recv);
+ assert_eq!(0, driver.orphan_queue.total_reaps.get());
+ }
+
+ #[test]
+ fn coalesce_signals_before_reaping() {
+ let mut driver = CoreDriver {
+ sigchild: MockStream::new(vec![Some(()), Some(()), None]),
+ orphan_queue: MockQueue::<()>::new(),
+ };
+
+ driver.process();
+
+ assert_eq!(3, driver.sigchild.total_try_recv);
+ assert_eq!(1, driver.orphan_queue.total_reaps.get());
+ }
+}
diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs
index 46f1d790..db9d592c 100644
--- a/tokio/src/process/unix/mod.rs
+++ b/tokio/src/process/unix/mod.rs
@@ -21,8 +21,10 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...
-mod orphan;
-use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
+pub(crate) mod driver;
+
+pub(crate) mod orphan;
+use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait};
mod reap;
use reap::Reaper;
@@ -39,11 +41,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
-use std::process::ExitStatus;
+use std::process::{Child as StdChild, ExitStatus};
use std::task::Context;
use std::task::Poll;
-impl Wait for std::process::Child {
+impl Wait for StdChild {
fn id(&self) -> u32 {
self.id()
}
@@ -53,17 +55,17 @@ impl Wait for std::process::Child {
}
}
-impl Kill for std::process::Child {
+impl Kill for StdChild {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}
lazy_static::lazy_static! {
- static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
+ static ref ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
}
-struct GlobalOrphanQueue;
+pub(crate) struct GlobalOrphanQueue;
impl fmt::Debug for GlobalOrphanQueue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -71,19 +73,21 @@ impl fmt::Debug for GlobalOrphanQueue {
}
}
-impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
- fn push_orphan(&self, orphan: std::process::Child) {
- ORPHAN_QUEUE.push_orphan(orphan)
- }
-
+impl ReapOrphanQueue for GlobalOrphanQueue {
fn reap_orphans(&self) {
ORPHAN_QUEUE.reap_orphans()
}
}
+impl OrphanQueue<StdChild> for GlobalOrphanQueue {
+ fn push_orphan(&self, orphan: StdChild) {
+ ORPHAN_QUEUE.push_orphan(orphan)
+ }
+}
+
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
- inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
+ inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
}
impl fmt::Debug for Child {
diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs
index 6c449a90..8a1e1278 100644
--- a/tokio/src/process/unix/orphan.rs
+++ b/tokio/src/process/unix/orphan.rs
@@ -20,23 +20,29 @@ impl<T: Wait> Wait for &mut T {
}
}
-/// An interface for queueing up an orphaned process so that it can be reaped.
-pub(crate) trait OrphanQueue<T> {
- /// Adds an orphan to the queue.
- fn push_orphan(&self, orphan: T);
+/// An interface for reaping a set of orphaned processes.
+pub(crate) trait ReapOrphanQueue {
/// Attempts to reap every process in the queue, ignoring any errors and
/// enqueueing any orphans which have not yet exited.
fn reap_orphans(&self);
}
+impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
+ fn reap_orphans(&self) {
+ (**self).reap_orphans()
+ }
+}
+
+/// An interface for queueing up an orphaned process so that it can be reaped.
+pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
+ /// Adds an orphan to the queue.
+ fn push_orphan(&self, orphan: T);
+}
+
impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}
-
- fn reap_orphans(&self) {
- (**self).reap_orphans()
- }
}
/// An implementation of `OrphanQueue`.
@@ -62,42 +68,62 @@ impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
fn push_orphan(&self, orphan: T) {
self.queue.lock().unwrap().push(orphan)
}
+}
+impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
fn reap_orphans(&self) {
let mut queue = self.queue.lock().unwrap();
let queue = &mut *queue;
- let mut i = 0;
- while i < queue.len() {
+ for i in (0..queue.len()).rev() {
match queue[i].try_wait() {
- Ok(Some(_)) => {}
- Err(_) => {
- // TODO: bubble up error some how. Is this an internal bug?
- // Shoudl we panic? Is it OK for this to be silently
- // dropped?
- }
- // Still not done yet
- Ok(None) => {
- i += 1;
- continue;
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
}
}
-
- queue.remove(i);
}
}
}
#[cfg(all(test, not(loom)))]
-mod test {
- use super::Wait;
- use super::{OrphanQueue, OrphanQueueImpl};
- use std::cell::Cell;
+pub(crate) mod test {
+ use super::*;
+ use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::rc::Rc;
+ pub(crate) struct MockQueue<W> {
+ pub(crate) all_enqueued: RefCell<Vec<W>>,
+ pub(crate) total_reaps: Cell<usize>,
+ }
+
+ impl<W> MockQueue<W> {
+ pub(crate) fn new() -> Self {
+ Self {
+ all_enqueued: RefCell::new(Vec::new()),
+ total_reaps: Cell::new(0),
+ }
+ }
+ }
+
+ impl<W> OrphanQueue<W> for MockQueue<W> {
+ fn push_orphan(&self, orphan: W) {
+ self.all_enqueued.borrow_mut().push(orphan);
+ }
+ }
+
+ impl<W> ReapOrphanQueue for MockQueue<W> {
+ fn reap_orphans(&self) {
+ self.total_reaps.set(self.total_reaps.get() + 1);
+ }
+ }
+
struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs
index c51a20b9..de483c44 100644
--- a/tokio/src/process/unix/reap.rs
+++ b/tokio/src/process/unix/reap.rs
@@ -1,6 +1,6 @@
use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
-use crate::signal::unix::Signal;
+use crate::signal::unix::InternalStream;
use std::future::Future;
use std::io;
@@ -23,17 +23,6 @@ where
signal: S,
}
-// Work around removal of `futures_core` dependency
-pub(crate) trait Stream: Unpin {
- fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
-}
-
-impl Stream for Signal {
- fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- Signal::poll_recv(self, cx)
- }
-}
-
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@@ -72,7 +61,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
- S: Stream,
+ S: InternalStream,
{
type Output = io::Result<ExitStatus>;
@@ -80,10 +69,8 @@ where
loop {
// If the child hasn't exited yet, then it's our responsibility to
// ensure the current task gets notified when it might be able to
- // make progress.
- //
- // As described in `spawn` above, we just indicate that we can
- // next make progress once a SIGCHLD is received.
+ // make progress. We can use the delivery of a SIGCHLD signal as a
+ // sign that we can potentially make progress.
//
// However, we will register for a notification on the next signal
// BEFORE we poll the child. Otherwise it is possible that the child
@@ -99,7 +86,6 @@ where
// should not cause significant issues with parent futures.
let registered_interest = self.signal.poll_recv(cx).is_pending();
- self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
return Poll::Ready(Ok(status));
}
@@ -147,8 +133,9 @@ where
mod test {
use super::*;
+ use crate::process::unix::orphan::test::MockQueue;
+ use crate::sync::mpsc::error::TryRecvError;
use futures::future::FutureExt;
- use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::task::Context;
@@ -211,7 +198,7 @@ mod test {
}
}
- impl Stream for MockStream {
+ impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
self.total_polls += 1;
match self.values.remove(0) {
@@ -219,29 +206,9 @@ mod test {
None => Poll::Pending,
}
}
- }
- struct MockQueue<W> {
- all_enqueued: RefCell<Vec<W>>,
- total_reaps: Cell<usize>,
- }
-
- impl<W> MockQueue<W> {
- fn new() -> Self {
- Self {
- all_enqueued: RefCell::new(Vec::new()),
- total_reaps: Cell::new(0),
- }
- }
- }
-
- impl<W: Wait> OrphanQueue<W> for MockQueue<W> {
- fn push_orphan(&self, orphan: W) {
- self.all_enqueued.borrow_mut().push(orphan);
- }
-
- fn reap_orphans(&self) {
- self.total_reaps.set(self.total_reaps.get() + 1);
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ unimplemented!();
}
}
@@ -262,7 +229,7 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(1, grim.signal.total_polls);
assert_eq!(1, grim.total_waits);
- assert_eq!(1, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Not yet exited, couldn't register interest the first time
@@ -270,7 +237,7 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(3, grim.signal.total_polls);
assert_eq!(3, grim.total_waits);
- assert_eq!(3, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Exited
@@ -283,7 +250,7 @@ mod test {
}
assert_eq!(4, grim.signal.total_polls);
assert_eq!(4, grim.total_waits);
- assert_eq!(4, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
}