diff options
Diffstat (limited to 'tokio/src/signal')
-rw-r--r-- | tokio/src/signal/ctrl_c.rs | 61 | ||||
-rw-r--r-- | tokio/src/signal/mod.rs | 56 | ||||
-rw-r--r-- | tokio/src/signal/registry.rs | 31 | ||||
-rw-r--r-- | tokio/src/signal/unix.rs | 32 | ||||
-rw-r--r-- | tokio/src/signal/windows.rs | 63 |
5 files changed, 107 insertions, 136 deletions
diff --git a/tokio/src/signal/ctrl_c.rs b/tokio/src/signal/ctrl_c.rs index f9dd4679..35ef2393 100644 --- a/tokio/src/signal/ctrl_c.rs +++ b/tokio/src/signal/ctrl_c.rs @@ -1,46 +1,35 @@ #[cfg(unix)] -use super::unix::{self as os_impl, Signal as Inner}; +use super::unix::{self as os_impl}; #[cfg(windows)] -use super::windows::{self as os_impl, Event as Inner}; +use super::windows::{self as os_impl}; -use futures_core::stream::Stream; use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -/// Represents a stream which receives "ctrl-c" notifications sent to the process. +/// Completes when a "ctrl-c" notification is sent to the process. /// -/// In general signals are handled very differently across Unix and Windows, but -/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c -/// event to a console process can be represented as a stream for both Windows -/// and Unix. +/// While signals are handled very differently between Unix and Windows, both +/// platforms support receiving a signal on "ctrl-c". This function provides a +/// portable API for receiving this notification. /// -/// Note that there are a number of caveats listening for signals, and you may -/// wish to read up on the documentation in the `unix` or `windows` module to -/// take a peek. +/// Once the returned future is polled, a listener a listener is registered. The +/// future will complete on the first received `ctrl-c` **after** the initial +/// call to either `Future::poll` or `.await`. /// -/// Notably, a notification to this process notifies *all* streams listening to -/// this event. Moreover, the notifications **are coalesced** if they aren't processed -/// quickly enough. This means that if two notifications are received back-to-back, -/// then the stream may only receive one item about the two notifications. -#[must_use = "streams do nothing unless polled"] -#[derive(Debug)] -pub struct CtrlC { - inner: Inner, -} - -/// Creates a new stream which receives "ctrl-c" notifications sent to the -/// process. +/// # Examples /// -/// This function binds to the default reactor. -pub fn ctrl_c() -> io::Result<CtrlC> { - os_impl::ctrl_c().map(|inner| CtrlC { inner }) -} - -impl Stream for CtrlC { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - Pin::new(&mut self.inner).poll_next(cx) - } +/// ```rust,no_run +/// use tokio::signal; +/// +/// #[tokio::main] +/// async fn main() { +/// println!("waiting for ctrl-c"); +/// +/// signal::ctrl_c().await.expect("failed to listen for event"); +/// +/// println!("received ctrl-c event"); +/// } +/// ``` +pub async fn ctrl_c() -> io::Result<()> { + os_impl::ctrl_c()?.recv().await; + Ok(()) } diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs index f695b07c..6b36bc4e 100644 --- a/tokio/src/signal/mod.rs +++ b/tokio/src/signal/mod.rs @@ -14,28 +14,15 @@ //! //! # Examples //! -//! Print out all ctrl-C notifications received +//! Print on "ctrl-c" notification. //! //! ```rust,no_run //! use tokio::signal; //! -//! use futures_util::future; -//! use futures_util::stream::StreamExt; -//! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! // Create an infinite stream of "Ctrl+C" notifications. Each item received -//! // on this stream may represent multiple ctrl-c signals. -//! let ctrl_c = signal::ctrl_c()?; -//! -//! // Process each ctrl-c as it comes in -//! let prog = ctrl_c.for_each(|_| { -//! println!("ctrl-c received!"); -//! future::ready(()) -//! }); -//! -//! prog.await; -//! +//! signal::ctrl_c().await?; +//! println!("ctrl-c received!"); //! Ok(()) //! } //! ``` @@ -45,38 +32,25 @@ //! ```rust,no_run //! # #[cfg(unix)] { //! -//! use tokio::signal::{self, unix::{signal, SignalKind}}; -//! -//! use futures_util::future; -//! use futures_util::stream::StreamExt; +//! use tokio::signal::unix::{signal, SignalKind}; //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! // Create an infinite stream of "Ctrl+C" notifications. Each item received -//! // on this stream may represent multiple ctrl-c signals. -//! let ctrl_c = signal::ctrl_c()?; -//! -//! // Process each ctrl-c as it comes in -//! let prog = ctrl_c.for_each(|_| { -//! println!("ctrl-c received!"); -//! future::ready(()) -//! }); -//! -//! prog.await; -//! -//! // Like the previous example, this is an infinite stream of signals -//! // being received, and signals may be coalesced while pending. -//! let stream = signal(SignalKind::hangup())?; -//! -//! // Convert out stream into a future and block the program -//! let (signal, _stream) = stream.into_future().await; -//! println!("got signal {:?}", signal); -//! Ok(()) +//! // An infinite stream of hangup signals. +//! let mut stream = signal(SignalKind::hangup())?; +//! +//! // Print whenever a HUP signal is received +//! loop { +//! stream.recv().await; +//! println!("got signal HUP"); +//! } //! } //! # } //! ``` mod ctrl_c; +pub use ctrl_c::ctrl_c; + mod registry; mod os { @@ -89,5 +63,3 @@ mod os { pub mod unix; pub mod windows; - -pub use self::ctrl_c::{ctrl_c, CtrlC}; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index d608539c..0e017965 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> { /// /// Returns true if an event was delivered to at least one listener. fn broadcast(&self) -> bool { + use crate::sync::mpsc::error::TrySendError; + let mut did_notify = false; self.storage.for_each(|event_info| { // Any signal of this kind arrived since we checked last? @@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> { for i in (0..recipients.len()).rev() { match recipients[i].try_send(()) { Ok(()) => did_notify = true, - Err(ref e) if e.is_closed() => { + Err(TrySendError::Closed(..)) => { recipients.swap_remove(i); } // Channel is full, ignore the error since the // receiver has already been woken up - Err(e) => { - // Sanity check in case this error type ever gets - // additional variants we have not considered. - debug_assert!(e.is_full()); - } + Err(_) => {} } } }); @@ -180,7 +178,8 @@ mod tests { use super::*; use crate::runtime::{self, Runtime}; use crate::sync::{mpsc, oneshot}; - use futures::{future, StreamExt}; + + use futures::future; #[test] fn smoke() { @@ -220,11 +219,7 @@ mod tests { }); let _ = fire.send(()); - let all = future::join3( - first_rx.collect::<Vec<_>>(), - second_rx.collect::<Vec<_>>(), - third_rx.collect::<Vec<_>>(), - ); + let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx)); let (first_results, second_results, third_results) = all.await; assert_eq!(2, first_results.len()); @@ -279,7 +274,7 @@ mod tests { }); let _ = fire.send(()); - let results: Vec<()> = third_rx.collect().await; + let results = collect(third_rx).await; assert_eq!(1, results.len()); }); @@ -311,4 +306,14 @@ mod tests { fn rt() -> Runtime { runtime::Builder::new().current_thread().build().unwrap() } + + async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> { + let mut ret = vec![]; + + while let Some(v) = rx.recv().await { + ret.push(v); + } + + ret + } } diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 87871503..8551e85c 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -10,10 +10,8 @@ use crate::net::util::PollEvented; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; -use futures_core::stream::Stream; use libc::c_int; use mio_uds::UnixStream; -use std::future::Future; use std::io::{self, Error, ErrorKind, Write}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; @@ -262,10 +260,8 @@ struct Driver { wakeup: PollEvented<UnixStream>, } -impl Future for Driver { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { +impl Driver { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { // Drain the data from the pipe and maintain interest in getting more self.drain(cx); // Broadcast any signals which were received @@ -302,7 +298,7 @@ impl Driver { /// We do *NOT* use the existence of any read bytes as evidence a sigal was /// received since the `pending` flags would have already been set if that /// was the case. See #38 for more info. - fn drain(mut self: Pin<&mut Self>, cx: &mut Context<'_>) { + fn drain(&mut self, cx: &mut Context<'_>) { loop { match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) { Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"), @@ -395,20 +391,24 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> { Ok(Signal { driver, rx }) } -pub(crate) fn ctrl_c() -> io::Result<Signal> { - signal(SignalKind::interrupt()) -} - -impl Stream for Signal { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let _ = Pin::new(&mut self.driver).poll(cx); +impl Signal { + #[doc(hidden)] // TODO: Dox + pub async fn recv(&mut self) -> Option<()> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_recv(cx)).await + } + #[doc(hidden)] // TODO: document + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + let _ = self.driver.poll(cx); self.rx.poll_recv(cx) } } +pub(crate) fn ctrl_c() -> io::Result<Signal> { + signal(SignalKind::interrupt()) +} + #[cfg(all(test, not(loom)))] mod tests { use super::*; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index 96e585ba..6758566a 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -10,10 +10,8 @@ use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; -use futures_core::stream::Stream; use std::convert::TryFrom; use std::io; -use std::pin::Pin; use std::sync::Once; use std::task::{Context, Poll}; use winapi::shared::minwindef::*; @@ -82,6 +80,10 @@ pub(crate) struct Event { rx: Receiver<()>, } +pub(crate) fn ctrl_c() -> io::Result<Event> { + Event::new(CTRL_C_EVENT) +} + impl Event { fn new(signum: DWORD) -> io::Result<Self> { global_init()?; @@ -91,17 +93,10 @@ impl Event { Ok(Event { rx }) } -} - -pub(crate) fn ctrl_c() -> io::Result<Event> { - Event::new(CTRL_C_EVENT) -} - -impl Stream for Event { - type Item = (); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - self.rx.poll_recv(cx) + pub(crate) async fn recv(&mut self) -> Option<()> { + use crate::future::poll_fn; + poll_fn(|cx| self.rx.poll_recv(cx)).await } } @@ -109,6 +104,7 @@ fn global_init() -> io::Result<()> { static INIT: Once = Once::new(); let mut init = None; + INIT.call_once(|| unsafe { let rc = SetConsoleCtrlHandler(Some(handler), TRUE); let ret = if rc == 0 { @@ -153,6 +149,22 @@ pub struct CtrlBreak { inner: Event, } +impl CtrlBreak { + #[doc(hidden)] // TODO: document + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.rx.poll_recv(cx) + } +} + +#[cfg(feature = "stream")] +impl futures_core::Stream for CtrlBreak { + type Item = (); + + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.poll_recv(cx) + } +} + /// Creates a new stream which receives "ctrl-break" notifications sent to the /// process. /// @@ -161,29 +173,22 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> { Event::new(CTRL_BREAK_EVENT).map(|inner| CtrlBreak { inner }) } -impl Stream for CtrlBreak { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - Pin::new(&mut self.inner) - .poll_next(cx) - .map(|item| item.map(|_| ())) - } -} - #[cfg(all(test, not(loom)))] mod tests { use super::*; use crate::runtime::Runtime; + use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; - use futures_util::stream::StreamExt; + use futures::stream::StreamExt; #[test] fn ctrl_c() { - let mut rt = rt(); + let rt = rt(); - rt.block_on(async { - let ctrl_c = crate::signal::ctrl_c().expect("failed to create CtrlC"); + rt.enter(|| { + let mut ctrl_c = task::spawn(crate::signal::ctrl_c()); + + assert_pending!(ctrl_c.poll()); // Windows doesn't have a good programmatic way of sending events // like sending signals on Unix, so we'll stub out the actual OS @@ -192,7 +197,7 @@ mod tests { super::handler(CTRL_C_EVENT); } - let _ = ctrl_c.into_future().await; + assert_ready_ok!(ctrl_c.poll()); }); } @@ -201,7 +206,7 @@ mod tests { let mut rt = rt(); rt.block_on(async { - let ctrl_break = super::ctrl_break().expect("failed to create CtrlC"); + let mut ctrl_break = assert_ok!(super::ctrl_break()); // Windows doesn't have a good programmatic way of sending events // like sending signals on Unix, so we'll stub out the actual OS @@ -210,7 +215,7 @@ mod tests { super::handler(CTRL_BREAK_EVENT); } - let _ = ctrl_break.into_future().await; + ctrl_break.next().await.unwrap(); }); } |