summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal
diff options
context:
space:
mode:
authorIvan Petkov <ivanppetkov@gmail.com>2020-09-22 15:40:44 -0700
committerGitHub <noreply@github.com>2020-09-22 15:40:44 -0700
commit7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb (patch)
tree990e22703eb7103b030f5e8bf117557684a9f205 /tokio/src/signal
parente09b90ea32385337170ce17eb55ab372f9388af5 (diff)
signal: move driver to runtime thread (#2835)
Refactors the signal infrastructure to move the driver to the runtime thread. This follows the model put forth by the I/O driver and time driver.
Diffstat (limited to 'tokio/src/signal')
-rw-r--r--tokio/src/signal/unix.rs81
-rw-r--r--tokio/src/signal/unix/driver.rs153
2 files changed, 161 insertions, 73 deletions
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index bc48bdfa..45a091d7 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -5,7 +5,6 @@
#![cfg(unix)]
-use crate::io::{AsyncRead, PollEvented, ReadBuf};
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
@@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
+pub(crate) mod driver;
+
pub(crate) type OsStorage = Vec<SignalInfo>;
// Number of different unix signals
@@ -202,9 +203,9 @@ impl Default for SignalInfo {
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
-/// 2. Wake up driver tasks by writing a byte to a pipe
+/// 2. Wake up the driver by writing a byte to a pipe
///
-/// Those two operations shoudl both be async-signal safe.
+/// Those two operations should both be async-signal safe.
fn action(globals: Pin<&'static Globals>, signal: c_int) {
globals.record_event(signal as EventId);
@@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
));
}
+ // Check that we have a signal driver running
+ driver::Handle::current().check_inner()?;
+
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
Some(slot) => slot,
@@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
}
-#[derive(Debug)]
-struct Driver {
- wakeup: PollEvented<UnixStream>,
-}
-
-impl Driver {
- fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Drain the data from the pipe and maintain interest in getting more
- self.drain(cx);
- // Broadcast any signals which were received
- globals().broadcast();
-
- Poll::Pending
- }
-}
-
-impl Driver {
- fn new() -> io::Result<Driver> {
- // NB: We give each driver a "fresh" reciever file descriptor to avoid
- // the issues described in alexcrichton/tokio-process#42.
- //
- // In the past we would reuse the actual receiver file descriptor and
- // swallow any errors around double registration of the same descriptor.
- // I'm not sure if the second (failed) registration simply doesn't end up
- // receiving wake up notifications, or there could be some race condition
- // when consuming readiness events, but having distinct descriptors for
- // distinct PollEvented instances appears to mitigate this.
- //
- // Unfortunately we cannot just use a single global PollEvented instance
- // either, since we can't compare Handles or assume they will always
- // point to the exact same reactor.
- let stream = globals().receiver.try_clone()?;
- let wakeup = PollEvented::new(stream)?;
-
- Ok(Driver { wakeup })
- }
-
- /// Drain all data in the global receiver, ensuring we'll get woken up when
- /// there is a write on the other end.
- ///
- /// We do *NOT* use the existence of any read bytes as evidence a signal was
- /// received since the `pending` flags would have already been set if that
- /// was the case. See
- /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
- /// info.
- fn drain(&mut self, cx: &mut Context<'_>) {
- let mut buf = [0; 128];
- let mut buf = ReadBuf::new(&mut buf);
- loop {
- match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) {
- Poll::Ready(Ok(())) => {
- if buf.filled().is_empty() {
- panic!("EOF on self-pipe")
- }
- buf.clear();
- }
- Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
- Poll::Pending => break,
- }
- }
- }
-}
-
/// A stream of events for receiving a particular type of OS signal.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
@@ -382,7 +323,6 @@ impl Driver {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Signal {
- driver: Driver,
rx: Receiver<()>,
}
@@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
// Turn the signal delivery on once we are ready for it
signal_enable(signal)?;
- // Ensure there's a driver for our associated event loop processing
- // signals.
- let driver = Driver::new()?;
-
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
globals().register_listener(signal as EventId, tx);
- Ok(Signal { driver, rx })
+ Ok(Signal { rx })
}
impl Signal {
@@ -484,7 +420,6 @@ impl Signal {
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}
diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs
new file mode 100644
index 00000000..0458893c
--- /dev/null
+++ b/tokio/src/signal/unix/driver.rs
@@ -0,0 +1,153 @@
+//! Signal driver
+
+use crate::io::driver::Driver as IoDriver;
+use crate::io::Registration;
+use crate::park::Park;
+use crate::runtime::context;
+use crate::signal::registry::globals;
+use mio_uds::UnixStream;
+use std::io::{self, Read};
+use std::sync::{Arc, Weak};
+use std::time::Duration;
+
+/// Responsible for registering wakeups when an OS signal is received, and
+/// subsequently dispatching notifications to any signal listeners as appropriate.
+///
+/// Note: this driver relies on having an enabled IO driver in order to listen to
+/// pipe write wakeups.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ /// Thread parker. The `Driver` park implementation delegates to this.
+ park: IoDriver,
+
+ /// A pipe for receiving wake events from the signal handler
+ receiver: UnixStream,
+
+ /// The actual registraiton for `receiver` when active.
+ /// Lazily bound at the first signal registration.
+ registration: Registration,
+
+ /// Shared state
+ inner: Arc<Inner>,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct Handle {
+ inner: Weak<Inner>,
+}
+
+#[derive(Debug)]
+pub(super) struct Inner(());
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
+ // NB: We give each driver a "fresh" reciever file descriptor to avoid
+ // the issues described in alexcrichton/tokio-process#42.
+ //
+ // In the past we would reuse the actual receiver file descriptor and
+ // swallow any errors around double registration of the same descriptor.
+ // I'm not sure if the second (failed) registration simply doesn't end up
+ // receiving wake up notifications, or there could be some race condition
+ // when consuming readiness events, but having distinct descriptors for
+ // distinct PollEvented instances appears to mitigate this.
+ //
+ // Unfortunately we cannot just use a single global PollEvented instance
+ // either, since we can't compare Handles or assume they will always
+ // point to the exact same reactor.
+ let receiver = globals().receiver.try_clone()?;
+ let registration =
+ Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?;
+
+ Ok(Self {
+ park,
+ receiver,
+ registration,
+ inner: Arc::new(Inner(())),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ pub(crate) fn handle(&self) -> Handle {
+ Handle {
+ inner: Arc::downgrade(&self.inner),
+ }
+ }
+
+ fn process(&self) {
+ // Check if the pipe is ready to read and therefore has "woken" us up
+ match self.registration.take_read_ready() {
+ Ok(Some(ready)) => assert!(ready.is_readable()),
+ Ok(None) => return, // No wake has arrived, bail
+ Err(e) => panic!("reactor gone: {}", e),
+ }
+
+ // Drain the pipe completely so we can receive a new readiness event
+ // if another signal has come in.
+ let mut buf = [0; 128];
+ loop {
+ match (&self.receiver).read(&mut buf) {
+ Ok(0) => panic!("EOF on self-pipe"),
+ Ok(_) => continue, // Keep reading
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
+ Err(e) => panic!("Bad read on self-pipe: {}", e),
+ }
+ }
+
+ // Broadcast any signals which were received
+ globals().broadcast();
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <IoDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ self.process();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ self.process();
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Returns a handle to the current driver
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current signal driver set.
+ pub(super) fn current() -> Self {
+ context::signal_handle().expect(
+ "there is no signal driver running, must be called from the context of Tokio runtime",
+ )
+ }
+
+ pub(super) fn check_inner(&self) -> io::Result<()> {
+ if self.inner.strong_count() > 0 {
+ Ok(())
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
+ }
+ }
+}