diff options
author | Ivan Petkov <ivanppetkov@gmail.com> | 2020-09-22 15:40:44 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-22 15:40:44 -0700 |
commit | 7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb (patch) | |
tree | 990e22703eb7103b030f5e8bf117557684a9f205 /tokio/src/signal | |
parent | e09b90ea32385337170ce17eb55ab372f9388af5 (diff) |
signal: move driver to runtime thread (#2835)
Refactors the signal infrastructure to move the driver to the runtime
thread. This follows the model put forth by the I/O driver and time
driver.
Diffstat (limited to 'tokio/src/signal')
-rw-r--r-- | tokio/src/signal/unix.rs | 81 | ||||
-rw-r--r-- | tokio/src/signal/unix/driver.rs | 153 |
2 files changed, 161 insertions, 73 deletions
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index bc48bdfa..45a091d7 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -5,7 +5,6 @@ #![cfg(unix)] -use crate::io::{AsyncRead, PollEvented, ReadBuf}; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; @@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +pub(crate) mod driver; + pub(crate) type OsStorage = Vec<SignalInfo>; // Number of different unix signals @@ -202,9 +203,9 @@ impl Default for SignalInfo { /// The purpose of this signal handler is to primarily: /// /// 1. Flag that our specific signal was received (e.g. store an atomic flag) -/// 2. Wake up driver tasks by writing a byte to a pipe +/// 2. Wake up the driver by writing a byte to a pipe /// -/// Those two operations shoudl both be async-signal safe. +/// Those two operations should both be async-signal safe. fn action(globals: Pin<&'static Globals>, signal: c_int) { globals.record_event(signal as EventId); @@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> { )); } + // Check that we have a signal driver running + driver::Handle::current().check_inner()?; + let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, @@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } } -#[derive(Debug)] -struct Driver { - wakeup: PollEvented<UnixStream>, -} - -impl Driver { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Drain the data from the pipe and maintain interest in getting more - self.drain(cx); - // Broadcast any signals which were received - globals().broadcast(); - - Poll::Pending - } -} - -impl Driver { - fn new() -> io::Result<Driver> { - // NB: We give each driver a "fresh" reciever file descriptor to avoid - // the issues described in alexcrichton/tokio-process#42. - // - // In the past we would reuse the actual receiver file descriptor and - // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. - // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. - let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream)?; - - Ok(Driver { wakeup }) - } - - /// Drain all data in the global receiver, ensuring we'll get woken up when - /// there is a write on the other end. - /// - /// We do *NOT* use the existence of any read bytes as evidence a signal was - /// received since the `pending` flags would have already been set if that - /// was the case. See - /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more - /// info. - fn drain(&mut self, cx: &mut Context<'_>) { - let mut buf = [0; 128]; - let mut buf = ReadBuf::new(&mut buf); - loop { - match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) { - Poll::Ready(Ok(())) => { - if buf.filled().is_empty() { - panic!("EOF on self-pipe") - } - buf.clear(); - } - Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), - Poll::Pending => break, - } - } - } -} - /// A stream of events for receiving a particular type of OS signal. /// /// In general signal handling on Unix is a pretty tricky topic, and this @@ -382,7 +323,6 @@ impl Driver { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - driver: Driver, rx: Receiver<()>, } @@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> { // Turn the signal delivery on once we are ready for it signal_enable(signal)?; - // Ensure there's a driver for our associated event loop processing - // signals. - let driver = Driver::new()?; - // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); globals().register_listener(signal as EventId, tx); - Ok(Signal { driver, rx }) + Ok(Signal { rx }) } impl Signal { @@ -484,7 +420,6 @@ impl Signal { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { - let _ = self.driver.poll(cx); self.rx.poll_recv(cx) } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs new file mode 100644 index 00000000..0458893c --- /dev/null +++ b/tokio/src/signal/unix/driver.rs @@ -0,0 +1,153 @@ +//! Signal driver + +use crate::io::driver::Driver as IoDriver; +use crate::io::Registration; +use crate::park::Park; +use crate::runtime::context; +use crate::signal::registry::globals; +use mio_uds::UnixStream; +use std::io::{self, Read}; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +/// Responsible for registering wakeups when an OS signal is received, and +/// subsequently dispatching notifications to any signal listeners as appropriate. +/// +/// Note: this driver relies on having an enabled IO driver in order to listen to +/// pipe write wakeups. +#[derive(Debug)] +pub(crate) struct Driver { + /// Thread parker. The `Driver` park implementation delegates to this. + park: IoDriver, + + /// A pipe for receiving wake events from the signal handler + receiver: UnixStream, + + /// The actual registraiton for `receiver` when active. + /// Lazily bound at the first signal registration. + registration: Registration, + + /// Shared state + inner: Arc<Inner>, +} + +#[derive(Clone, Debug)] +pub(crate) struct Handle { + inner: Weak<Inner>, +} + +#[derive(Debug)] +pub(super) struct Inner(()); + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: IoDriver) -> io::Result<Self> { + // NB: We give each driver a "fresh" reciever file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end up + // receiving wake up notifications, or there could be some race condition + // when consuming readiness events, but having distinct descriptors for + // distinct PollEvented instances appears to mitigate this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + let receiver = globals().receiver.try_clone()?; + let registration = + Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + + Ok(Self { + park, + receiver, + registration, + inner: Arc::new(Inner(())), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn process(&self) { + // Check if the pipe is ready to read and therefore has "woken" us up + match self.registration.take_read_ready() { + Ok(Some(ready)) => assert!(ready.is_readable()), + Ok(None) => return, // No wake has arrived, bail + Err(e) => panic!("reactor gone: {}", e), + } + + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {}", e), + } + } + + // Broadcast any signals which were received + globals().broadcast(); + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = <IoDriver as Park>::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + context::signal_handle().expect( + "there is no signal driver running, must be called from the context of Tokio runtime", + ) + } + + pub(super) fn check_inner(&self) -> io::Result<()> { + if self.inner.strong_count() > 0 { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + } + } +} |