From 4cf45c038b9691f24fac22df13594c2223b185f6 Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Tue, 6 Oct 2020 10:30:16 -0700 Subject: process: add ProcessDriver to handle orphan reaping (#2907) --- tokio/src/signal/unix.rs | 37 ++++++++++++++++++++++++++++++++----- tokio/src/signal/unix/driver.rs | 2 +- 2 files changed, 33 insertions(+), 6 deletions(-) (limited to 'tokio/src/signal') diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 77f300bb..aaaa75ed 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -6,6 +6,7 @@ #![cfg(unix)] use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{channel, Receiver}; use libc::c_int; @@ -17,6 +18,7 @@ use std::sync::Once; use std::task::{Context, Poll}; pub(crate) mod driver; +use self::driver::Handle; pub(crate) type OsStorage = Vec; @@ -220,7 +222,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: c_int) -> io::Result<()> { +fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> { if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( ErrorKind::Other, @@ -229,7 +231,7 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } // Check that we have a signal driver running - driver::Handle::current().check_inner()?; + handle.check_inner()?; let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { @@ -349,10 +351,14 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result { + signal_with_handle(kind, Handle::current()) +} + +pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result { let signal = kind.0; // Turn the signal delivery on once we are ready for it - signal_enable(signal)?; + signal_enable(signal, handle)?; // One wakeup in a queue is enough, no need for us to buffer up any // more. @@ -394,6 +400,11 @@ impl Signal { pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.rx.poll_recv(cx) } + + /// Try to receive a signal notification without blocking or registering a waker. + pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.rx.try_recv() + } } cfg_stream! { @@ -406,6 +417,22 @@ cfg_stream! { } } +// Work around for abstracting streams internally +pub(crate) trait InternalStream: Unpin { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; + fn try_recv(&mut self) -> Result<(), TryRecvError>; +} + +impl InternalStream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_recv(cx) + } + + fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.try_recv() + } +} + pub(crate) fn ctrl_c() -> io::Result { signal(SignalKind::interrupt()) } @@ -416,11 +443,11 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(-1).unwrap_err(); + signal_enable(-1, Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { - signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err(); + signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err(); } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index ae60c22f..d0615312 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -30,7 +30,7 @@ pub(crate) struct Driver { inner: Arc, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub(crate) struct Handle { inner: Weak, } -- cgit v1.2.3