diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-25 12:50:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-25 12:50:15 -0700 |
commit | 227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch) | |
tree | 498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/src/signal | |
parent | 03a9378297c73c2e56a6d6b55db22b92427b850a (diff) |
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The `net` implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src/signal')
-rw-r--r-- | tokio/src/signal/ctrl_c.rs | 46 | ||||
-rw-r--r-- | tokio/src/signal/mod.rs | 93 | ||||
-rw-r--r-- | tokio/src/signal/registry.rs | 310 | ||||
-rw-r--r-- | tokio/src/signal/unix.rs | 426 | ||||
-rw-r--r-- | tokio/src/signal/windows.rs | 217 |
5 files changed, 1092 insertions, 0 deletions
diff --git a/tokio/src/signal/ctrl_c.rs b/tokio/src/signal/ctrl_c.rs new file mode 100644 index 00000000..f9dd4679 --- /dev/null +++ b/tokio/src/signal/ctrl_c.rs @@ -0,0 +1,46 @@ +#[cfg(unix)] +use super::unix::{self as os_impl, Signal as Inner}; +#[cfg(windows)] +use super::windows::{self as os_impl, Event as Inner}; + +use futures_core::stream::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Represents a stream which receives "ctrl-c" notifications sent to the process. +/// +/// In general signals are handled very differently across Unix and Windows, but +/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c +/// event to a console process can be represented as a stream for both Windows +/// and Unix. +/// +/// Note that there are a number of caveats listening for signals, and you may +/// wish to read up on the documentation in the `unix` or `windows` module to +/// take a peek. +/// +/// Notably, a notification to this process notifies *all* streams listening to +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the stream may only receive one item about the two notifications. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlC { + inner: Inner, +} + +/// Creates a new stream which receives "ctrl-c" notifications sent to the +/// process. +/// +/// This function binds to the default reactor. +pub fn ctrl_c() -> io::Result<CtrlC> { + os_impl::ctrl_c().map(|inner| CtrlC { inner }) +} + +impl Stream for CtrlC { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Pin::new(&mut self.inner).poll_next(cx) + } +} diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs new file mode 100644 index 00000000..f695b07c --- /dev/null +++ b/tokio/src/signal/mod.rs @@ -0,0 +1,93 @@ +//! Asynchronous signal handling for Tokio +//! +//! The primary type exported from this crate, `unix::Signal`, allows +//! listening for arbitrary signals on Unix platforms, receiving them +//! in an asynchronous fashion. +//! +//! Note that signal handling is in general a very tricky topic and should be +//! used with great care. This crate attempts to implement 'best practice' for +//! signal handling, but it should be evaluated for your own applications' needs +//! to see if it's suitable. +//! +//! The are some fundamental limitations of this crate documented on the +//! `Signal` structure as well. +//! +//! # Examples +//! +//! Print out all ctrl-C notifications received +//! +//! ```rust,no_run +//! use tokio::signal; +//! +//! use futures_util::future; +//! use futures_util::stream::StreamExt; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create an infinite stream of "Ctrl+C" notifications. Each item received +//! // on this stream may represent multiple ctrl-c signals. +//! let ctrl_c = signal::ctrl_c()?; +//! +//! // Process each ctrl-c as it comes in +//! let prog = ctrl_c.for_each(|_| { +//! println!("ctrl-c received!"); +//! future::ready(()) +//! }); +//! +//! prog.await; +//! +//! Ok(()) +//! } +//! ``` +//! +//! Wait for SIGHUP on Unix +//! +//! ```rust,no_run +//! # #[cfg(unix)] { +//! +//! use tokio::signal::{self, unix::{signal, SignalKind}}; +//! +//! use futures_util::future; +//! use futures_util::stream::StreamExt; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create an infinite stream of "Ctrl+C" notifications. Each item received +//! // on this stream may represent multiple ctrl-c signals. +//! let ctrl_c = signal::ctrl_c()?; +//! +//! // Process each ctrl-c as it comes in +//! let prog = ctrl_c.for_each(|_| { +//! println!("ctrl-c received!"); +//! future::ready(()) +//! }); +//! +//! prog.await; +//! +//! // Like the previous example, this is an infinite stream of signals +//! // being received, and signals may be coalesced while pending. +//! let stream = signal(SignalKind::hangup())?; +//! +//! // Convert out stream into a future and block the program +//! let (signal, _stream) = stream.into_future().await; +//! println!("got signal {:?}", signal); +//! Ok(()) +//! } +//! # } +//! ``` + +mod ctrl_c; +mod registry; + +mod os { + #[cfg(unix)] + pub(crate) use super::unix::{OsExtraData, OsStorage}; + + #[cfg(windows)] + pub(crate) use super::windows::{OsExtraData, OsStorage}; +} + +pub mod unix; +pub mod windows; + +pub use self::ctrl_c::{ctrl_c, CtrlC}; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs new file mode 100644 index 00000000..1b813966 --- /dev/null +++ b/tokio/src/signal/registry.rs @@ -0,0 +1,310 @@ +use crate::signal::os::{OsExtraData, OsStorage}; + +use tokio_sync::mpsc::Sender; + +use lazy_static::lazy_static; +use std::ops; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; + +pub(crate) type EventId = usize; + +/// State for a specific event, whether a notification is pending delivery, +/// and what listeners are registered. +#[derive(Default, Debug)] +pub(crate) struct EventInfo { + pending: AtomicBool, + recipients: Mutex<Vec<Sender<()>>>, +} + +/// An interface for retrieving the `EventInfo` for a particular eventId. +pub(crate) trait Storage { + /// Get the `EventInfo` for `id` if it exists. + fn event_info(&self, id: EventId) -> Option<&EventInfo>; + + /// Invoke `f` once for each defined `EventInfo` in this storage. + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo); +} + +impl Storage for Vec<EventInfo> { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().for_each(f) + } +} + +/// An interface for initializing a type. Useful for situations where we cannot +/// inject a configured instance in the constructor of another type. +pub(crate) trait Init { + fn init() -> Self; +} + +/// Manages and distributes event notifications to any registered listeners. +/// +/// Generic over the underlying storage to allow for domain specific +/// optimizations (e.g. eventIds may or may not be contiguous). +#[derive(Debug)] +pub(crate) struct Registry<S> { + storage: S, +} + +impl<S> Registry<S> { + fn new(storage: S) -> Self { + Self { storage } + } +} + +impl<S: Storage> Registry<S> { + /// Register a new listener for `event_id`. + fn register_listener(&self, event_id: EventId, listener: Sender<()>) { + self.storage + .event_info(event_id) + .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) + .recipients + .lock() + .unwrap() + .push(listener); + } + + /// Mark `event_id` as having been delivered, without broadcasting it to + /// any listeners. + fn record_event(&self, event_id: EventId) { + if let Some(event_info) = self.storage.event_info(event_id) { + event_info.pending.store(true, Ordering::SeqCst) + } + } + + /// Broadcast all previously recorded events to their respective listeners. + /// + /// Returns true if an event was delivered to at least one listener. + fn broadcast(&self) -> bool { + let mut did_notify = false; + self.storage.for_each(|event_info| { + // Any signal of this kind arrived since we checked last? + if !event_info.pending.swap(false, Ordering::SeqCst) { + return; + } + + let mut recipients = event_info.recipients.lock().unwrap(); + + // Notify all waiters on this signal that the signal has been + // received. If we can't push a message into the queue then we don't + // worry about it as everything is coalesced anyway. If the channel + // has gone away then we can remove that slot. + for i in (0..recipients.len()).rev() { + match recipients[i].try_send(()) { + Ok(()) => did_notify = true, + Err(ref e) if e.is_closed() => { + recipients.swap_remove(i); + } + + // Channel is full, ignore the error since the + // receiver has already been woken up + Err(e) => { + // Sanity check in case this error type ever gets + // additional variants we have not considered. + debug_assert!(e.is_full()); + } + } + } + }); + + did_notify + } +} + +pub(crate) struct Globals { + extra: OsExtraData, + registry: Registry<OsStorage>, +} + +impl ops::Deref for Globals { + type Target = OsExtraData; + + fn deref(&self) -> &Self::Target { + &self.extra + } +} + +impl Globals { + /// Register a new listener for `event_id`. + pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) { + self.registry.register_listener(event_id, listener); + } + + /// Mark `event_id` as having been delivered, without broadcasting it to + /// any listeners. + pub(crate) fn record_event(&self, event_id: EventId) { + self.registry.record_event(event_id); + } + + /// Broadcast all previously recorded events to their respective listeners. + /// + /// Returns true if an event was delivered to at least one listener. + pub(crate) fn broadcast(&self) -> bool { + self.registry.broadcast() + } + + #[cfg(unix)] + pub(crate) fn storage(&self) -> &OsStorage { + &self.registry.storage + } +} + +pub(crate) fn globals() -> Pin<&'static Globals> +where + OsExtraData: 'static + Send + Sync + Init, + OsStorage: 'static + Send + Sync + Init, +{ + lazy_static! { + static ref GLOBALS: Pin<Box<Globals>> = Box::pin(Globals { + extra: OsExtraData::init(), + registry: Registry::new(OsStorage::init()), + }); + } + + GLOBALS.as_ref() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::runtime::current_thread::Runtime; + use crate::sync::{mpsc, oneshot}; + use futures::{future, StreamExt}; + + #[test] + fn smoke() { + let mut rt = Runtime::new().unwrap(); + rt.block_on(async move { + let registry = Registry::new(vec![ + EventInfo::default(), + EventInfo::default(), + EventInfo::default(), + ]); + + let (first_tx, first_rx) = mpsc::channel(3); + let (second_tx, second_rx) = mpsc::channel(3); + let (third_tx, third_rx) = mpsc::channel(3); + + registry.register_listener(0, first_tx); + registry.register_listener(1, second_tx); + registry.register_listener(2, third_tx); + + let (fire, wait) = oneshot::channel(); + + crate::spawn(async { + wait.await.expect("wait failed"); + + // Record some events which should get coalesced + registry.record_event(0); + registry.record_event(0); + registry.record_event(1); + registry.record_event(1); + registry.broadcast(); + + // Send subsequent signal + registry.record_event(0); + registry.broadcast(); + + drop(registry); + }); + + let _ = fire.send(()); + let all = future::join3( + first_rx.collect::<Vec<_>>(), + second_rx.collect::<Vec<_>>(), + third_rx.collect::<Vec<_>>(), + ); + + let (first_results, second_results, third_results) = all.await; + assert_eq!(2, first_results.len()); + assert_eq!(1, second_results.len()); + assert_eq!(0, third_results.len()); + }); + } + + #[test] + #[should_panic = "invalid event_id: 1"] + fn register_panics_on_invalid_input() { + let registry = Registry::new(vec![EventInfo::default()]); + + let (tx, _) = mpsc::channel(1); + registry.register_listener(1, tx); + } + + #[test] + fn record_invalid_event_does_nothing() { + let registry = Registry::new(vec![EventInfo::default()]); + registry.record_event(42); + } + + #[test] + fn broadcast_cleans_up_disconnected_listeners() { + let mut rt = Runtime::new().unwrap(); + + rt.block_on(async { + let registry = Registry::new(vec![EventInfo::default()]); + + let (first_tx, first_rx) = mpsc::channel(1); + let (second_tx, second_rx) = mpsc::channel(1); + let (third_tx, third_rx) = mpsc::channel(1); + + registry.register_listener(0, first_tx); + registry.register_listener(0, second_tx); + registry.register_listener(0, third_tx); + + drop(first_rx); + drop(second_rx); + + let (fire, wait) = oneshot::channel(); + + crate::spawn(async { + wait.await.expect("wait failed"); + + registry.record_event(0); + registry.broadcast(); + + assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len()); + drop(registry); + }); + + let _ = fire.send(()); + let results: Vec<()> = third_rx.collect().await; + + assert_eq!(1, results.len()); + }); + } + + #[test] + fn broadcast_returns_if_at_least_one_event_fired() { + let registry = Registry::new(vec![EventInfo::default()]); + + registry.record_event(0); + assert_eq!(false, registry.broadcast()); + + let (first_tx, first_rx) = mpsc::channel(1); + let (second_tx, second_rx) = mpsc::channel(1); + + registry.register_listener(0, first_tx); + registry.register_listener(0, second_tx); + + registry.record_event(0); + assert_eq!(true, registry.broadcast()); + + drop(first_rx); + registry.record_event(0); + assert_eq!(false, registry.broadcast()); + + drop(second_rx); + } +} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs new file mode 100644 index 00000000..e9f63af8 --- /dev/null +++ b/tokio/src/signal/unix.rs @@ -0,0 +1,426 @@ +//! Unix-specific types for signal handling. +//! +//! This module is only defined on Unix platforms and contains the primary +//! `Signal` type for receiving notifications of signals. + +#![cfg(unix)] + +use super::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::net::util::PollEvented; + +use tokio_io::AsyncRead; +use tokio_sync::mpsc::{channel, Receiver}; + +use futures_core::stream::Stream; +use libc::c_int; +use mio_uds::UnixStream; +use std::future::Future; +use std::io::{self, Error, ErrorKind, Write}; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; +use std::task::{Context, Poll}; + +pub(crate) type OsStorage = Vec<SignalInfo>; + +// Number of different unix signals +// (FreeBSD has 33) +const SIGNUM: usize = 33; + +impl Init for OsStorage { + fn init() -> Self { + (0..SIGNUM).map(|_| SignalInfo::default()).collect() + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id).map(|si| &si.event_info) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().map(|si| &si.event_info).for_each(f) + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData { + sender: UnixStream, + receiver: UnixStream, +} + +impl Init for OsExtraData { + fn init() -> Self { + let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream"); + + Self { sender, receiver } + } +} + +/// Represents the specific kind of signal to listen for. +#[derive(Debug, Clone, Copy)] +pub struct SignalKind(c_int); + +impl SignalKind { + /// Allows for listening to any valid OS signal. + /// + /// For example, this can be used for listening for platform-specific + /// signals. + /// ```rust,no_run + /// # use tokio::signal::unix::SignalKind; + /// # let signum = -1; + /// // let signum = libc::OS_SPECIFIC_SIGNAL; + /// let kind = SignalKind::from_raw(signum); + /// ``` + pub fn from_raw(signum: c_int) -> Self { + Self(signum) + } + + /// Represents the SIGALRM signal. + /// + /// On Unix systems this signal is sent when a real-time timer has expired. + /// By default, the process is terminated by this signal. + pub fn alarm() -> Self { + Self(libc::SIGALRM) + } + + /// Represents the SIGCHLD signal. + /// + /// On Unix systems this signal is sent when the status of a child process + /// has changed. By default, this signal is ignored. + pub fn child() -> Self { + Self(libc::SIGCHLD) + } + + /// Represents the SIGHUP signal. + /// + /// On Unix systems this signal is sent when the terminal is disconnected. + /// By default, the process is terminated by this signal. + pub fn hangup() -> Self { + Self(libc::SIGHUP) + } + + /// Represents the SIGINFO signal. + /// + /// On Unix systems this signal is sent to request a status update from the + /// process. By default, this signal is ignored. + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + pub fn info() -> Self { + Self(libc::SIGINFO) + } + + /// Represents the SIGINT signal. + /// + /// On Unix systems this signal is sent to interrupt a program. + /// By default, the process is terminated by this signal. + pub fn interrupt() -> Self { + Self(libc::SIGINT) + } + + /// Represents the SIGIO signal. + /// + /// On Unix systems this signal is sent when I/O operations are possible + /// on some file descriptor. By default, this signal is ignored. + pub fn io() -> Self { + Self(libc::SIGIO) + } + + /// Represents the SIGPIPE signal. + /// + /// On Unix systems this signal is sent when the process attempts to write + /// to a pipe which has no reader. By default, the process is terminated by + /// this signal. + pub fn pipe() -> Self { + Self(libc::SIGPIPE) + } + + /// Represents the SIGQUIT signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process, after which the OS will dump the process core. + /// By default, the process is terminated by this signal. + pub fn quit() -> Self { + Self(libc::SIGQUIT) + } + + /// Represents the SIGTERM signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process. By default, the process is terminated by this signal. + pub fn terminate() -> Self { + Self(libc::SIGTERM) + } + + /// Represents the SIGUSR1 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub fn user_defined1() -> Self { + Self(libc::SIGUSR1) + } + + /// Represents the SIGUSR2 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub fn user_defined2() -> Self { + Self(libc::SIGUSR2) + } + + /// Represents the SIGWINCH signal. + /// + /// On Unix systems this signal is sent when the terminal window is resized. + /// By default, this signal is ignored. + pub fn window_change() -> Self { + Self(libc::SIGWINCH) + } +} + +pub(crate) struct SignalInfo { + event_info: EventInfo, + init: Once, + initialized: AtomicBool, +} + +impl Default for SignalInfo { + fn default() -> SignalInfo { + SignalInfo { + event_info: Default::default(), + init: Once::new(), + initialized: AtomicBool::new(false), + } + } +} + +/// Our global signal handler for all signals registered by this module. +/// +/// The purpose of this signal handler is to primarily: +/// +/// 1. Flag that our specific signal was received (e.g. store an atomic flag) +/// 2. Wake up driver tasks by writing a byte to a pipe +/// +/// Those two operations shoudl both be async-signal safe. +fn action(globals: Pin<&'static Globals>, signal: c_int) { + globals.record_event(signal as EventId); + + // Send a wakeup, ignore any errors (anything reasonably possible is + // full pipe and then it will wake up anyway). + let mut sender = &globals.sender; + drop(sender.write(&[1])); +} + +/// Enable this module to receive signal notifications for the `signal` +/// provided. +/// +/// This will register the signal handler if it hasn't already been registered, +/// returning any error along the way if that fails. +fn signal_enable(signal: c_int) -> io::Result<()> { + if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { + return Err(Error::new( + ErrorKind::Other, + format!("Refusing to register signal {}", signal), + )); + } + + let globals = globals(); + let siginfo = match globals.storage().get(signal as EventId) { + Some(slot) => slot, + None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")), + }; + let mut registered = Ok(()); + siginfo.init.call_once(|| { + registered = unsafe { + signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ()) + }; + if registered.is_ok() { + siginfo.initialized.store(true, Ordering::Relaxed); + } + }); + registered?; + // If the call_once failed, it won't be retried on the next attempt to register the signal. In + // such case it is not run, registered is still `Ok(())`, initialized is still false. + if siginfo.initialized.load(Ordering::Relaxed) { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + "Failed to register signal handler", + )) + } +} + +#[derive(Debug)] +struct Driver { + wakeup: PollEvented<UnixStream>, +} + +impl Future for Driver { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Drain the data from the pipe and maintain interest in getting more + self.drain(cx); + // Broadcast any signals which were received + globals().broadcast(); + + Poll::Pending + } +} + +impl Driver { + fn new() -> io::Result<Driver> { + // NB: We give each driver a "fresh" reciever file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end up + // receiving wake up notifications, or there could be some race condition + // when consuming readiness events, but having distinct descriptors for + // distinct PollEvented instances appears to mitigate this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + let stream = globals().receiver.try_clone()?; + let wakeup = PollEvented::new(stream)?; + + Ok(Driver { wakeup }) + } + + /// Drain all data in the global receiver, ensuring we'll get woken up when + /// there is a write on the other end. + /// + /// We do *NOT* use the existence of any read bytes as evidence a sigal was + /// received since the `pending` flags would have already been set if that + /// was the case. See #38 for more info. + fn drain(mut self: Pin<&mut Self>, cx: &mut Context<'_>) { + loop { + match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) { + Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"), + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), + Poll::Pending => break, + } + } + } +} + +/// An implementation of `Stream` for receiving a particular type of signal. +/// +/// This structure implements the `Stream` trait and represents notifications +/// of the current process receiving a particular signal. The signal being +/// listened for is passed to `Signal::new`, and the same signal number is then +/// yielded as each element for the stream. +/// +/// In general signal handling on Unix is a pretty tricky topic, and this +/// structure is no exception! There are some important limitations to keep in +/// mind when using `Signal` streams: +/// +/// * Signals handling in Unix already necessitates coalescing signals +/// together sometimes. This `Signal` stream is also no exception here in +/// that it will also coalesce signals. That is, even if the signal handler +/// for this process runs multiple times, the `Signal` stream may only return +/// one signal notification. Specifically, before `poll` is called, all +/// signal notifications are coalesced into one item returned from `poll`. +/// Once `poll` has been called, however, a further signal is guaranteed to +/// be yielded as an item. +/// +/// Put another way, any element pulled off the returned stream corresponds to +/// *at least one* signal, but possibly more. +/// +/// * Signal handling in general is relatively inefficient. Although some +/// improvements are possible in this crate, it's recommended to not plan on +/// having millions of signal channels open. +/// +/// * Currently the "driver task" to process incoming signals never exits. This +/// driver task runs in the background of the event loop provided, and +/// in general you shouldn't need to worry about it. +/// +/// If you've got any questions about this feel free to open an issue on the +/// repo, though, as I'd love to chat about this! In other words, I'd love to +/// alleviate some of these limitations if possible! +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Signal { + driver: Driver, + rx: Receiver<()>, +} + +/// Creates a new stream which will receive notifications when the current +/// process receives the signal `signal`. +/// +/// This function will create a new stream which binds to the default reactor. +/// The `Signal` stream is an infinite stream which will receive +/// notifications whenever a signal is received. More documentation can be +/// found on `Signal` itself, but to reiterate: +/// +/// * Signals may be coalesced beyond what the kernel already does. +/// * Once a signal handler is registered with the process the underlying +/// libc signal handler is never unregistered. +/// +/// A `Signal` stream can be created for a particular signal number +/// multiple times. When a signal is received then all the associated +/// channels will receive the signal notification. +/// +/// # Errors +/// +/// * If the lower-level C functions fail for some reason. +/// * If the previous initialization of this specific signal failed. +/// * If the signal is one of +/// [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics) +pub fn signal(kind: SignalKind) -> io::Result<Signal> { + let signal = kind.0; + + // Turn the signal delivery on once we are ready for it + signal_enable(signal)?; + + // Ensure there's a driver for our associated event loop processing + // signals. + let driver = Driver::new()?; + + // One wakeup in a queue is enough, no need for us to buffer up any + // more. + let (tx, rx) = channel(1); + globals().register_listener(signal as EventId, tx); + + Ok(Signal { driver, rx }) +} + +pub(crate) fn ctrl_c() -> io::Result<Signal> { + signal(SignalKind::interrupt()) +} + +impl Stream for Signal { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let _ = Pin::new(&mut self.driver).poll(cx); + + self.rx.poll_recv(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn signal_enable_error_on_invalid_input() { + signal_enable(-1).unwrap_err(); + } + + #[test] + fn signal_enable_error_on_forbidden_input() { + signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err(); + } +} diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs new file mode 100644 index 00000000..79a4aaab --- /dev/null +++ b/tokio/src/signal/windows.rs @@ -0,0 +1,217 @@ +//! Windows-specific types for signal handling. +//! +//! This module is only defined on Windows and contains the primary `Event` type +//! for receiving notifications of events. These events are listened for via the +//! `SetConsoleCtrlHandler` function which receives events of the type +//! `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` + +#![cfg(windows)] + +use super::registry::{globals, EventId, EventInfo, Init, Storage}; + +use tokio_sync::mpsc::{channel, Receiver}; + +use futures_core::stream::Stream; +use std::convert::TryFrom; +use std::io; +use std::pin::Pin; +use std::sync::Once; +use std::task::{Context, Poll}; +use winapi::shared::minwindef::*; +use winapi::um::consoleapi::SetConsoleCtrlHandler; +use winapi::um::wincon::*; + +#[derive(Debug)] +pub(crate) struct OsStorage { + ctrl_c: EventInfo, + ctrl_break: EventInfo, +} + +impl Init for OsStorage { + fn init() -> Self { + Self { + ctrl_c: EventInfo::default(), + ctrl_break: EventInfo::default(), + } + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + match DWORD::try_from(id) { + Ok(CTRL_C_EVENT) => Some(&self.ctrl_c), + Ok(CTRL_BREAK_EVENT) => Some(&self.ctrl_break), + _ => None, + } + } + + fn for_each<'a, F>(&'a self, mut f: F) + where + F: FnMut(&'a EventInfo), + { + f(&self.ctrl_c); + f(&self.ctrl_break); + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData {} + +impl Init for OsExtraData { + fn init() -> Self { + Self {} + } +} + +/// Stream of events discovered via `SetConsoleCtrlHandler`. +/// +/// This structure can be used to listen for events of the type `CTRL_C_EVENT` +/// and `CTRL_BREAK_EVENT`. The `Stream` trait is implemented for this struct +/// and will resolve for each notification received by the process. Note that +/// there are few limitations with this as well: +/// +/// * A notification to this process notifies *all* `Event` streams for that +/// event type. +/// * Notifications to an `Event` stream **are coalesced** if they aren't +/// processed quickly enough. This means that if two notifications are +/// received back-to-back, then the stream may only receive one item about the +/// two notifications. +// FIXME: refactor and combine with unix::Signal +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub(crate) struct Event { + rx: Receiver<()>, +} + +impl Event { + fn new(signum: DWORD) -> io::Result<Self> { + global_init()?; + + let (tx, rx) = channel(1); + globals().register_listener(signum as EventId, tx); + + Ok(Event { rx }) + } +} + +pub(crate) fn ctrl_c() -> io::Result<Event> { + Event::new(CTRL_C_EVENT) +} + +impl Stream for Event { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.rx.poll_recv(cx) + } +} + +fn global_init() -> io::Result<()> { + static INIT: Once = Once::new(); + + let mut init = None; + INIT.call_once(|| unsafe { + let rc = SetConsoleCtrlHandler(Some(handler), TRUE); + let ret = if rc == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + }; + + init = Some(ret); + }); + + init.unwrap_or_else(|| Ok(())) +} + +unsafe extern "system" fn handler(ty: DWORD) -> BOOL { + let globals = globals(); + globals.record_event(ty as EventId); + + // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine + // the handler routine is always invoked in a new thread, thus we don't + // have the same restrictions as in Unix signal handlers, meaning we can + // go ahead and perform the broadcast here. + if globals.broadcast() { + TRUE + } else { + // No one is listening for this notification any more |