summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-25 12:50:15 -0700
committerGitHub <noreply@github.com>2019-10-25 12:50:15 -0700
commit227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch)
tree498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/src/signal
parent03a9378297c73c2e56a6d6b55db22b92427b850a (diff)
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `net` implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/signal')
-rw-r--r--tokio/src/signal/ctrl_c.rs46
-rw-r--r--tokio/src/signal/mod.rs93
-rw-r--r--tokio/src/signal/registry.rs310
-rw-r--r--tokio/src/signal/unix.rs426
-rw-r--r--tokio/src/signal/windows.rs217
5 files changed, 1092 insertions, 0 deletions
diff --git a/tokio/src/signal/ctrl_c.rs b/tokio/src/signal/ctrl_c.rs
new file mode 100644
index 00000000..f9dd4679
--- /dev/null
+++ b/tokio/src/signal/ctrl_c.rs
@@ -0,0 +1,46 @@
+#[cfg(unix)]
+use super::unix::{self as os_impl, Signal as Inner};
+#[cfg(windows)]
+use super::windows::{self as os_impl, Event as Inner};
+
+use futures_core::stream::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Represents a stream which receives "ctrl-c" notifications sent to the process.
+///
+/// In general signals are handled very differently across Unix and Windows, but
+/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
+/// event to a console process can be represented as a stream for both Windows
+/// and Unix.
+///
+/// Note that there are a number of caveats listening for signals, and you may
+/// wish to read up on the documentation in the `unix` or `windows` module to
+/// take a peek.
+///
+/// Notably, a notification to this process notifies *all* streams listening to
+/// this event. Moreover, the notifications **are coalesced** if they aren't processed
+/// quickly enough. This means that if two notifications are received back-to-back,
+/// then the stream may only receive one item about the two notifications.
+#[must_use = "streams do nothing unless polled"]
+#[derive(Debug)]
+pub struct CtrlC {
+ inner: Inner,
+}
+
+/// Creates a new stream which receives "ctrl-c" notifications sent to the
+/// process.
+///
+/// This function binds to the default reactor.
+pub fn ctrl_c() -> io::Result<CtrlC> {
+ os_impl::ctrl_c().map(|inner| CtrlC { inner })
+}
+
+impl Stream for CtrlC {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Pin::new(&mut self.inner).poll_next(cx)
+ }
+}
diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs
new file mode 100644
index 00000000..f695b07c
--- /dev/null
+++ b/tokio/src/signal/mod.rs
@@ -0,0 +1,93 @@
+//! Asynchronous signal handling for Tokio
+//!
+//! The primary type exported from this crate, `unix::Signal`, allows
+//! listening for arbitrary signals on Unix platforms, receiving them
+//! in an asynchronous fashion.
+//!
+//! Note that signal handling is in general a very tricky topic and should be
+//! used with great care. This crate attempts to implement 'best practice' for
+//! signal handling, but it should be evaluated for your own applications' needs
+//! to see if it's suitable.
+//!
+//! The are some fundamental limitations of this crate documented on the
+//! `Signal` structure as well.
+//!
+//! # Examples
+//!
+//! Print out all ctrl-C notifications received
+//!
+//! ```rust,no_run
+//! use tokio::signal;
+//!
+//! use futures_util::future;
+//! use futures_util::stream::StreamExt;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
+//! // on this stream may represent multiple ctrl-c signals.
+//! let ctrl_c = signal::ctrl_c()?;
+//!
+//! // Process each ctrl-c as it comes in
+//! let prog = ctrl_c.for_each(|_| {
+//! println!("ctrl-c received!");
+//! future::ready(())
+//! });
+//!
+//! prog.await;
+//!
+//! Ok(())
+//! }
+//! ```
+//!
+//! Wait for SIGHUP on Unix
+//!
+//! ```rust,no_run
+//! # #[cfg(unix)] {
+//!
+//! use tokio::signal::{self, unix::{signal, SignalKind}};
+//!
+//! use futures_util::future;
+//! use futures_util::stream::StreamExt;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
+//! // on this stream may represent multiple ctrl-c signals.
+//! let ctrl_c = signal::ctrl_c()?;
+//!
+//! // Process each ctrl-c as it comes in
+//! let prog = ctrl_c.for_each(|_| {
+//! println!("ctrl-c received!");
+//! future::ready(())
+//! });
+//!
+//! prog.await;
+//!
+//! // Like the previous example, this is an infinite stream of signals
+//! // being received, and signals may be coalesced while pending.
+//! let stream = signal(SignalKind::hangup())?;
+//!
+//! // Convert out stream into a future and block the program
+//! let (signal, _stream) = stream.into_future().await;
+//! println!("got signal {:?}", signal);
+//! Ok(())
+//! }
+//! # }
+//! ```
+
+mod ctrl_c;
+mod registry;
+
+mod os {
+ #[cfg(unix)]
+ pub(crate) use super::unix::{OsExtraData, OsStorage};
+
+ #[cfg(windows)]
+ pub(crate) use super::windows::{OsExtraData, OsStorage};
+}
+
+pub mod unix;
+pub mod windows;
+
+pub use self::ctrl_c::{ctrl_c, CtrlC};
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs
new file mode 100644
index 00000000..1b813966
--- /dev/null
+++ b/tokio/src/signal/registry.rs
@@ -0,0 +1,310 @@
+use crate::signal::os::{OsExtraData, OsStorage};
+
+use tokio_sync::mpsc::Sender;
+
+use lazy_static::lazy_static;
+use std::ops;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
+
+pub(crate) type EventId = usize;
+
+/// State for a specific event, whether a notification is pending delivery,
+/// and what listeners are registered.
+#[derive(Default, Debug)]
+pub(crate) struct EventInfo {
+ pending: AtomicBool,
+ recipients: Mutex<Vec<Sender<()>>>,
+}
+
+/// An interface for retrieving the `EventInfo` for a particular eventId.
+pub(crate) trait Storage {
+ /// Get the `EventInfo` for `id` if it exists.
+ fn event_info(&self, id: EventId) -> Option<&EventInfo>;
+
+ /// Invoke `f` once for each defined `EventInfo` in this storage.
+ fn for_each<'a, F>(&'a self, f: F)
+ where
+ F: FnMut(&'a EventInfo);
+}
+
+impl Storage for Vec<EventInfo> {
+ fn event_info(&self, id: EventId) -> Option<&EventInfo> {
+ self.get(id)
+ }
+
+ fn for_each<'a, F>(&'a self, f: F)
+ where
+ F: FnMut(&'a EventInfo),
+ {
+ self.iter().for_each(f)
+ }
+}
+
+/// An interface for initializing a type. Useful for situations where we cannot
+/// inject a configured instance in the constructor of another type.
+pub(crate) trait Init {
+ fn init() -> Self;
+}
+
+/// Manages and distributes event notifications to any registered listeners.
+///
+/// Generic over the underlying storage to allow for domain specific
+/// optimizations (e.g. eventIds may or may not be contiguous).
+#[derive(Debug)]
+pub(crate) struct Registry<S> {
+ storage: S,
+}
+
+impl<S> Registry<S> {
+ fn new(storage: S) -> Self {
+ Self { storage }
+ }
+}
+
+impl<S: Storage> Registry<S> {
+ /// Register a new listener for `event_id`.
+ fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
+ self.storage
+ .event_info(event_id)
+ .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
+ .recipients
+ .lock()
+ .unwrap()
+ .push(listener);
+ }
+
+ /// Mark `event_id` as having been delivered, without broadcasting it to
+ /// any listeners.
+ fn record_event(&self, event_id: EventId) {
+ if let Some(event_info) = self.storage.event_info(event_id) {
+ event_info.pending.store(true, Ordering::SeqCst)
+ }
+ }
+
+ /// Broadcast all previously recorded events to their respective listeners.
+ ///
+ /// Returns true if an event was delivered to at least one listener.
+ fn broadcast(&self) -> bool {
+ let mut did_notify = false;
+ self.storage.for_each(|event_info| {
+ // Any signal of this kind arrived since we checked last?
+ if !event_info.pending.swap(false, Ordering::SeqCst) {
+ return;
+ }
+
+ let mut recipients = event_info.recipients.lock().unwrap();
+
+ // Notify all waiters on this signal that the signal has been
+ // received. If we can't push a message into the queue then we don't
+ // worry about it as everything is coalesced anyway. If the channel
+ // has gone away then we can remove that slot.
+ for i in (0..recipients.len()).rev() {
+ match recipients[i].try_send(()) {
+ Ok(()) => did_notify = true,
+ Err(ref e) if e.is_closed() => {
+ recipients.swap_remove(i);
+ }
+
+ // Channel is full, ignore the error since the
+ // receiver has already been woken up
+ Err(e) => {
+ // Sanity check in case this error type ever gets
+ // additional variants we have not considered.
+ debug_assert!(e.is_full());
+ }
+ }
+ }
+ });
+
+ did_notify
+ }
+}
+
+pub(crate) struct Globals {
+ extra: OsExtraData,
+ registry: Registry<OsStorage>,
+}
+
+impl ops::Deref for Globals {
+ type Target = OsExtraData;
+
+ fn deref(&self) -> &Self::Target {
+ &self.extra
+ }
+}
+
+impl Globals {
+ /// Register a new listener for `event_id`.
+ pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
+ self.registry.register_listener(event_id, listener);
+ }
+
+ /// Mark `event_id` as having been delivered, without broadcasting it to
+ /// any listeners.
+ pub(crate) fn record_event(&self, event_id: EventId) {
+ self.registry.record_event(event_id);
+ }
+
+ /// Broadcast all previously recorded events to their respective listeners.
+ ///
+ /// Returns true if an event was delivered to at least one listener.
+ pub(crate) fn broadcast(&self) -> bool {
+ self.registry.broadcast()
+ }
+
+ #[cfg(unix)]
+ pub(crate) fn storage(&self) -> &OsStorage {
+ &self.registry.storage
+ }
+}
+
+pub(crate) fn globals() -> Pin<&'static Globals>
+where
+ OsExtraData: 'static + Send + Sync + Init,
+ OsStorage: 'static + Send + Sync + Init,
+{
+ lazy_static! {
+ static ref GLOBALS: Pin<Box<Globals>> = Box::pin(Globals {
+ extra: OsExtraData::init(),
+ registry: Registry::new(OsStorage::init()),
+ });
+ }
+
+ GLOBALS.as_ref()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::runtime::current_thread::Runtime;
+ use crate::sync::{mpsc, oneshot};
+ use futures::{future, StreamExt};
+
+ #[test]
+ fn smoke() {
+ let mut rt = Runtime::new().unwrap();
+ rt.block_on(async move {
+ let registry = Registry::new(vec![
+ EventInfo::default(),
+ EventInfo::default(),
+ EventInfo::default(),
+ ]);
+
+ let (first_tx, first_rx) = mpsc::channel(3);
+ let (second_tx, second_rx) = mpsc::channel(3);
+ let (third_tx, third_rx) = mpsc::channel(3);
+
+ registry.register_listener(0, first_tx);
+ registry.register_listener(1, second_tx);
+ registry.register_listener(2, third_tx);
+
+ let (fire, wait) = oneshot::channel();
+
+ crate::spawn(async {
+ wait.await.expect("wait failed");
+
+ // Record some events which should get coalesced
+ registry.record_event(0);
+ registry.record_event(0);
+ registry.record_event(1);
+ registry.record_event(1);
+ registry.broadcast();
+
+ // Send subsequent signal
+ registry.record_event(0);
+ registry.broadcast();
+
+ drop(registry);
+ });
+
+ let _ = fire.send(());
+ let all = future::join3(
+ first_rx.collect::<Vec<_>>(),
+ second_rx.collect::<Vec<_>>(),
+ third_rx.collect::<Vec<_>>(),
+ );
+
+ let (first_results, second_results, third_results) = all.await;
+ assert_eq!(2, first_results.len());
+ assert_eq!(1, second_results.len());
+ assert_eq!(0, third_results.len());
+ });
+ }
+
+ #[test]
+ #[should_panic = "invalid event_id: 1"]
+ fn register_panics_on_invalid_input() {
+ let registry = Registry::new(vec![EventInfo::default()]);
+
+ let (tx, _) = mpsc::channel(1);
+ registry.register_listener(1, tx);
+ }
+
+ #[test]
+ fn record_invalid_event_does_nothing() {
+ let registry = Registry::new(vec![EventInfo::default()]);
+ registry.record_event(42);
+ }
+
+ #[test]
+ fn broadcast_cleans_up_disconnected_listeners() {
+ let mut rt = Runtime::new().unwrap();
+
+ rt.block_on(async {
+ let registry = Registry::new(vec![EventInfo::default()]);
+
+ let (first_tx, first_rx) = mpsc::channel(1);
+ let (second_tx, second_rx) = mpsc::channel(1);
+ let (third_tx, third_rx) = mpsc::channel(1);
+
+ registry.register_listener(0, first_tx);
+ registry.register_listener(0, second_tx);
+ registry.register_listener(0, third_tx);
+
+ drop(first_rx);
+ drop(second_rx);
+
+ let (fire, wait) = oneshot::channel();
+
+ crate::spawn(async {
+ wait.await.expect("wait failed");
+
+ registry.record_event(0);
+ registry.broadcast();
+
+ assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len());
+ drop(registry);
+ });
+
+ let _ = fire.send(());
+ let results: Vec<()> = third_rx.collect().await;
+
+ assert_eq!(1, results.len());
+ });
+ }
+
+ #[test]
+ fn broadcast_returns_if_at_least_one_event_fired() {
+ let registry = Registry::new(vec![EventInfo::default()]);
+
+ registry.record_event(0);
+ assert_eq!(false, registry.broadcast());
+
+ let (first_tx, first_rx) = mpsc::channel(1);
+ let (second_tx, second_rx) = mpsc::channel(1);
+
+ registry.register_listener(0, first_tx);
+ registry.register_listener(0, second_tx);
+
+ registry.record_event(0);
+ assert_eq!(true, registry.broadcast());
+
+ drop(first_rx);
+ registry.record_event(0);
+ assert_eq!(false, registry.broadcast());
+
+ drop(second_rx);
+ }
+}
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
new file mode 100644
index 00000000..e9f63af8
--- /dev/null
+++ b/tokio/src/signal/unix.rs
@@ -0,0 +1,426 @@
+//! Unix-specific types for signal handling.
+//!
+//! This module is only defined on Unix platforms and contains the primary
+//! `Signal` type for receiving notifications of signals.
+
+#![cfg(unix)]
+
+use super::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
+use crate::net::util::PollEvented;
+
+use tokio_io::AsyncRead;
+use tokio_sync::mpsc::{channel, Receiver};
+
+use futures_core::stream::Stream;
+use libc::c_int;
+use mio_uds::UnixStream;
+use std::future::Future;
+use std::io::{self, Error, ErrorKind, Write};
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Once;
+use std::task::{Context, Poll};
+
+pub(crate) type OsStorage = Vec<SignalInfo>;
+
+// Number of different unix signals
+// (FreeBSD has 33)
+const SIGNUM: usize = 33;
+
+impl Init for OsStorage {
+ fn init() -> Self {
+ (0..SIGNUM).map(|_| SignalInfo::default()).collect()
+ }
+}
+
+impl Storage for OsStorage {
+ fn event_info(&self, id: EventId) -> Option<&EventInfo> {
+ self.get(id).map(|si| &si.event_info)
+ }
+
+ fn for_each<'a, F>(&'a self, f: F)
+ where
+ F: FnMut(&'a EventInfo),
+ {
+ self.iter().map(|si| &si.event_info).for_each(f)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OsExtraData {
+ sender: UnixStream,
+ receiver: UnixStream,
+}
+
+impl Init for OsExtraData {
+ fn init() -> Self {
+ let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream");
+
+ Self { sender, receiver }
+ }
+}
+
+/// Represents the specific kind of signal to listen for.
+#[derive(Debug, Clone, Copy)]
+pub struct SignalKind(c_int);
+
+impl SignalKind {
+ /// Allows for listening to any valid OS signal.
+ ///
+ /// For example, this can be used for listening for platform-specific
+ /// signals.
+ /// ```rust,no_run
+ /// # use tokio::signal::unix::SignalKind;
+ /// # let signum = -1;
+ /// // let signum = libc::OS_SPECIFIC_SIGNAL;
+ /// let kind = SignalKind::from_raw(signum);
+ /// ```
+ pub fn from_raw(signum: c_int) -> Self {
+ Self(signum)
+ }
+
+ /// Represents the SIGALRM signal.
+ ///
+ /// On Unix systems this signal is sent when a real-time timer has expired.
+ /// By default, the process is terminated by this signal.
+ pub fn alarm() -> Self {
+ Self(libc::SIGALRM)
+ }
+
+ /// Represents the SIGCHLD signal.
+ ///
+ /// On Unix systems this signal is sent when the status of a child process
+ /// has changed. By default, this signal is ignored.
+ pub fn child() -> Self {
+ Self(libc::SIGCHLD)
+ }
+
+ /// Represents the SIGHUP signal.
+ ///
+ /// On Unix systems this signal is sent when the terminal is disconnected.
+ /// By default, the process is terminated by this signal.
+ pub fn hangup() -> Self {
+ Self(libc::SIGHUP)
+ }
+
+ /// Represents the SIGINFO signal.
+ ///
+ /// On Unix systems this signal is sent to request a status update from the
+ /// process. By default, this signal is ignored.
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "macos",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ pub fn info() -> Self {
+ Self(libc::SIGINFO)
+ }
+
+ /// Represents the SIGINT signal.
+ ///
+ /// On Unix systems this signal is sent to interrupt a program.
+ /// By default, the process is terminated by this signal.
+ pub fn interrupt() -> Self {
+ Self(libc::SIGINT)
+ }
+
+ /// Represents the SIGIO signal.
+ ///
+ /// On Unix systems this signal is sent when I/O operations are possible
+ /// on some file descriptor. By default, this signal is ignored.
+ pub fn io() -> Self {
+ Self(libc::SIGIO)
+ }
+
+ /// Represents the SIGPIPE signal.
+ ///
+ /// On Unix systems this signal is sent when the process attempts to write
+ /// to a pipe which has no reader. By default, the process is terminated by
+ /// this signal.
+ pub fn pipe() -> Self {
+ Self(libc::SIGPIPE)
+ }
+
+ /// Represents the SIGQUIT signal.
+ ///
+ /// On Unix systems this signal is sent to issue a shutdown of the
+ /// process, after which the OS will dump the process core.
+ /// By default, the process is terminated by this signal.
+ pub fn quit() -> Self {
+ Self(libc::SIGQUIT)
+ }
+
+ /// Represents the SIGTERM signal.
+ ///
+ /// On Unix systems this signal is sent to issue a shutdown of the
+ /// process. By default, the process is terminated by this signal.
+ pub fn terminate() -> Self {
+ Self(libc::SIGTERM)
+ }
+
+ /// Represents the SIGUSR1 signal.
+ ///
+ /// On Unix systems this is a user defined signal.
+ /// By default, the process is terminated by this signal.
+ pub fn user_defined1() -> Self {
+ Self(libc::SIGUSR1)
+ }
+
+ /// Represents the SIGUSR2 signal.
+ ///
+ /// On Unix systems this is a user defined signal.
+ /// By default, the process is terminated by this signal.
+ pub fn user_defined2() -> Self {
+ Self(libc::SIGUSR2)
+ }
+
+ /// Represents the SIGWINCH signal.
+ ///
+ /// On Unix systems this signal is sent when the terminal window is resized.
+ /// By default, this signal is ignored.
+ pub fn window_change() -> Self {
+ Self(libc::SIGWINCH)
+ }
+}
+
+pub(crate) struct SignalInfo {
+ event_info: EventInfo,
+ init: Once,
+ initialized: AtomicBool,
+}
+
+impl Default for SignalInfo {
+ fn default() -> SignalInfo {
+ SignalInfo {
+ event_info: Default::default(),
+ init: Once::new(),
+ initialized: AtomicBool::new(false),
+ }
+ }
+}
+
+/// Our global signal handler for all signals registered by this module.
+///
+/// The purpose of this signal handler is to primarily:
+///
+/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
+/// 2. Wake up driver tasks by writing a byte to a pipe
+///
+/// Those two operations shoudl both be async-signal safe.
+fn action(globals: Pin<&'static Globals>, signal: c_int) {
+ globals.record_event(signal as EventId);
+
+ // Send a wakeup, ignore any errors (anything reasonably possible is
+ // full pipe and then it will wake up anyway).
+ let mut sender = &globals.sender;
+ drop(sender.write(&[1]));
+}
+
+/// Enable this module to receive signal notifications for the `signal`
+/// provided.
+///
+/// This will register the signal handler if it hasn't already been registered,
+/// returning any error along the way if that fails.
+fn signal_enable(signal: c_int) -> io::Result<()> {
+ if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
+ return Err(Error::new(
+ ErrorKind::Other,
+ format!("Refusing to register signal {}", signal),
+ ));
+ }
+
+ let globals = globals();
+ let siginfo = match globals.storage().get(signal as EventId) {
+ Some(slot) => slot,
+ None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
+ };
+ let mut registered = Ok(());
+ siginfo.init.call_once(|| {
+ registered = unsafe {
+ signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ())
+ };
+ if registered.is_ok() {
+ siginfo.initialized.store(true, Ordering::Relaxed);
+ }
+ });
+ registered?;
+ // If the call_once failed, it won't be retried on the next attempt to register the signal. In
+ // such case it is not run, registered is still `Ok(())`, initialized is still false.
+ if siginfo.initialized.load(Ordering::Relaxed) {
+ Ok(())
+ } else {
+ Err(Error::new(
+ ErrorKind::Other,
+ "Failed to register signal handler",
+ ))
+ }
+}
+
+#[derive(Debug)]
+struct Driver {
+ wakeup: PollEvented<UnixStream>,
+}
+
+impl Future for Driver {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // Drain the data from the pipe and maintain interest in getting more
+ self.drain(cx);
+ // Broadcast any signals which were received
+ globals().broadcast();
+
+ Poll::Pending
+ }
+}
+
+impl Driver {
+ fn new() -> io::Result<Driver> {
+ // NB: We give each driver a "fresh" reciever file descriptor to avoid
+ // the issues described in alexcrichton/tokio-process#42.
+ //
+ // In the past we would reuse the actual receiver file descriptor and
+ // swallow any errors around double registration of the same descriptor.
+ // I'm not sure if the second (failed) registration simply doesn't end up
+ // receiving wake up notifications, or there could be some race condition
+ // when consuming readiness events, but having distinct descriptors for
+ // distinct PollEvented instances appears to mitigate this.
+ //
+ // Unfortunately we cannot just use a single global PollEvented instance
+ // either, since we can't compare Handles or assume they will always
+ // point to the exact same reactor.
+ let stream = globals().receiver.try_clone()?;
+ let wakeup = PollEvented::new(stream)?;
+
+ Ok(Driver { wakeup })
+ }
+
+ /// Drain all data in the global receiver, ensuring we'll get woken up when
+ /// there is a write on the other end.
+ ///
+ /// We do *NOT* use the existence of any read bytes as evidence a sigal was
+ /// received since the `pending` flags would have already been set if that
+ /// was the case. See #38 for more info.
+ fn drain(mut self: Pin<&mut Self>, cx: &mut Context<'_>) {
+ loop {
+ match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
+ Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
+ Poll::Ready(Ok(_)) => {}
+ Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
+ Poll::Pending => break,
+ }
+ }
+ }
+}
+
+/// An implementation of `Stream` for receiving a particular type of signal.
+///
+/// This structure implements the `Stream` trait and represents notifications
+/// of the current process receiving a particular signal. The signal being
+/// listened for is passed to `Signal::new`, and the same signal number is then
+/// yielded as each element for the stream.
+///
+/// In general signal handling on Unix is a pretty tricky topic, and this
+/// structure is no exception! There are some important limitations to keep in
+/// mind when using `Signal` streams:
+///
+/// * Signals handling in Unix already necessitates coalescing signals
+/// together sometimes. This `Signal` stream is also no exception here in
+/// that it will also coalesce signals. That is, even if the signal handler
+/// for this process runs multiple times, the `Signal` stream may only return
+/// one signal notification. Specifically, before `poll` is called, all
+/// signal notifications are coalesced into one item returned from `poll`.
+/// Once `poll` has been called, however, a further signal is guaranteed to
+/// be yielded as an item.
+///
+/// Put another way, any element pulled off the returned stream corresponds to
+/// *at least one* signal, but possibly more.
+///
+/// * Signal handling in general is relatively inefficient. Although some
+/// improvements are possible in this crate, it's recommended to not plan on
+/// having millions of signal channels open.
+///
+/// * Currently the "driver task" to process incoming signals never exits. This
+/// driver task runs in the background of the event loop provided, and
+/// in general you shouldn't need to worry about it.
+///
+/// If you've got any questions about this feel free to open an issue on the
+/// repo, though, as I'd love to chat about this! In other words, I'd love to
+/// alleviate some of these limitations if possible!
+#[must_use = "streams do nothing unless polled"]
+#[derive(Debug)]
+pub struct Signal {
+ driver: Driver,
+ rx: Receiver<()>,
+}
+
+/// Creates a new stream which will receive notifications when the current
+/// process receives the signal `signal`.
+///
+/// This function will create a new stream which binds to the default reactor.
+/// The `Signal` stream is an infinite stream which will receive
+/// notifications whenever a signal is received. More documentation can be
+/// found on `Signal` itself, but to reiterate:
+///
+/// * Signals may be coalesced beyond what the kernel already does.
+/// * Once a signal handler is registered with the process the underlying
+/// libc signal handler is never unregistered.
+///
+/// A `Signal` stream can be created for a particular signal number
+/// multiple times. When a signal is received then all the associated
+/// channels will receive the signal notification.
+///
+/// # Errors
+///
+/// * If the lower-level C functions fail for some reason.
+/// * If the previous initialization of this specific signal failed.
+/// * If the signal is one of
+/// [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
+pub fn signal(kind: SignalKind) -> io::Result<Signal> {
+ let signal = kind.0;
+
+ // Turn the signal delivery on once we are ready for it
+ signal_enable(signal)?;
+
+ // Ensure there's a driver for our associated event loop processing
+ // signals.
+ let driver = Driver::new()?;
+
+ // One wakeup in a queue is enough, no need for us to buffer up any
+ // more.
+ let (tx, rx) = channel(1);
+ globals().register_listener(signal as EventId, tx);
+
+ Ok(Signal { driver, rx })
+}
+
+pub(crate) fn ctrl_c() -> io::Result<Signal> {
+ signal(SignalKind::interrupt())
+}
+
+impl Stream for Signal {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let _ = Pin::new(&mut self.driver).poll(cx);
+
+ self.rx.poll_recv(cx)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn signal_enable_error_on_invalid_input() {
+ signal_enable(-1).unwrap_err();
+ }
+
+ #[test]
+ fn signal_enable_error_on_forbidden_input() {
+ signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err();
+ }
+}
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
new file mode 100644
index 00000000..79a4aaab
--- /dev/null
+++ b/tokio/src/signal/windows.rs
@@ -0,0 +1,217 @@
+//! Windows-specific types for signal handling.
+//!
+//! This module is only defined on Windows and contains the primary `Event` type
+//! for receiving notifications of events. These events are listened for via the
+//! `SetConsoleCtrlHandler` function which receives events of the type
+//! `CTRL_C_EVENT` and `CTRL_BREAK_EVENT`
+
+#![cfg(windows)]
+
+use super::registry::{globals, EventId, EventInfo, Init, Storage};
+
+use tokio_sync::mpsc::{channel, Receiver};
+
+use futures_core::stream::Stream;
+use std::convert::TryFrom;
+use std::io;
+use std::pin::Pin;
+use std::sync::Once;
+use std::task::{Context, Poll};
+use winapi::shared::minwindef::*;
+use winapi::um::consoleapi::SetConsoleCtrlHandler;
+use winapi::um::wincon::*;
+
+#[derive(Debug)]
+pub(crate) struct OsStorage {
+ ctrl_c: EventInfo,
+ ctrl_break: EventInfo,
+}
+
+impl Init for OsStorage {
+ fn init() -> Self {
+ Self {
+ ctrl_c: EventInfo::default(),
+ ctrl_break: EventInfo::default(),
+ }
+ }
+}
+
+impl Storage for OsStorage {
+ fn event_info(&self, id: EventId) -> Option<&EventInfo> {
+ match DWORD::try_from(id) {
+ Ok(CTRL_C_EVENT) => Some(&self.ctrl_c),
+ Ok(CTRL_BREAK_EVENT) => Some(&self.ctrl_break),
+ _ => None,
+ }
+ }
+
+ fn for_each<'a, F>(&'a self, mut f: F)
+ where
+ F: FnMut(&'a EventInfo),
+ {
+ f(&self.ctrl_c);
+ f(&self.ctrl_break);
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OsExtraData {}
+
+impl Init for OsExtraData {
+ fn init() -> Self {
+ Self {}
+ }
+}
+
+/// Stream of events discovered via `SetConsoleCtrlHandler`.
+///
+/// This structure can be used to listen for events of the type `CTRL_C_EVENT`
+/// and `CTRL_BREAK_EVENT`. The `Stream` trait is implemented for this struct
+/// and will resolve for each notification received by the process. Note that
+/// there are few limitations with this as well:
+///
+/// * A notification to this process notifies *all* `Event` streams for that
+/// event type.
+/// * Notifications to an `Event` stream **are coalesced** if they aren't
+/// processed quickly enough. This means that if two notifications are
+/// received back-to-back, then the stream may only receive one item about the
+/// two notifications.
+// FIXME: refactor and combine with unix::Signal
+#[must_use = "streams do nothing unless polled"]
+#[derive(Debug)]
+pub(crate) struct Event {
+ rx: Receiver<()>,
+}
+
+impl Event {
+ fn new(signum: DWORD) -> io::Result<Self> {
+ global_init()?;
+
+ let (tx, rx) = channel(1);
+ globals().register_listener(signum as EventId, tx);
+
+ Ok(Event { rx })
+ }
+}
+
+pub(crate) fn ctrl_c() -> io::Result<Event> {
+ Event::new(CTRL_C_EVENT)
+}
+
+impl Stream for Event {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.rx.poll_recv(cx)
+ }
+}
+
+fn global_init() -> io::Result<()> {
+ static INIT: Once = Once::new();
+
+ let mut init = None;
+ INIT.call_once(|| unsafe {
+ let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
+ let ret = if rc == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ };
+
+ init = Some(ret);
+ });
+
+ init.unwrap_or_else(|| Ok(()))
+}
+
+unsafe extern "system" fn handler(ty: DWORD) -> BOOL {
+ let globals = globals();
+ globals.record_event(ty as EventId);
+
+ // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine
+ // the handler routine is always invoked in a new thread, thus we don't
+ // have the same restrictions as in Unix signal handlers, meaning we can
+ // go ahead and perform the broadcast here.
+ if globals.broadcast() {
+ TRUE
+ } else {
+ // No one is listening for this notification any more