summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/signal
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/signal')
-rw-r--r--tokio/src/signal/ctrl_c.rs61
-rw-r--r--tokio/src/signal/mod.rs56
-rw-r--r--tokio/src/signal/registry.rs31
-rw-r--r--tokio/src/signal/unix.rs32
-rw-r--r--tokio/src/signal/windows.rs63
5 files changed, 107 insertions, 136 deletions
diff --git a/tokio/src/signal/ctrl_c.rs b/tokio/src/signal/ctrl_c.rs
index f9dd4679..35ef2393 100644
--- a/tokio/src/signal/ctrl_c.rs
+++ b/tokio/src/signal/ctrl_c.rs
@@ -1,46 +1,35 @@
#[cfg(unix)]
-use super::unix::{self as os_impl, Signal as Inner};
+use super::unix::{self as os_impl};
#[cfg(windows)]
-use super::windows::{self as os_impl, Event as Inner};
+use super::windows::{self as os_impl};
-use futures_core::stream::Stream;
use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-/// Represents a stream which receives "ctrl-c" notifications sent to the process.
+/// Completes when a "ctrl-c" notification is sent to the process.
///
-/// In general signals are handled very differently across Unix and Windows, but
-/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
-/// event to a console process can be represented as a stream for both Windows
-/// and Unix.
+/// While signals are handled very differently between Unix and Windows, both
+/// platforms support receiving a signal on "ctrl-c". This function provides a
+/// portable API for receiving this notification.
///
-/// Note that there are a number of caveats listening for signals, and you may
-/// wish to read up on the documentation in the `unix` or `windows` module to
-/// take a peek.
+/// Once the returned future is polled, a listener a listener is registered. The
+/// future will complete on the first received `ctrl-c` **after** the initial
+/// call to either `Future::poll` or `.await`.
///
-/// Notably, a notification to this process notifies *all* streams listening to
-/// this event. Moreover, the notifications **are coalesced** if they aren't processed
-/// quickly enough. This means that if two notifications are received back-to-back,
-/// then the stream may only receive one item about the two notifications.
-#[must_use = "streams do nothing unless polled"]
-#[derive(Debug)]
-pub struct CtrlC {
- inner: Inner,
-}
-
-/// Creates a new stream which receives "ctrl-c" notifications sent to the
-/// process.
+/// # Examples
///
-/// This function binds to the default reactor.
-pub fn ctrl_c() -> io::Result<CtrlC> {
- os_impl::ctrl_c().map(|inner| CtrlC { inner })
-}
-
-impl Stream for CtrlC {
- type Item = ();
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- Pin::new(&mut self.inner).poll_next(cx)
- }
+/// ```rust,no_run
+/// use tokio::signal;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// println!("waiting for ctrl-c");
+///
+/// signal::ctrl_c().await.expect("failed to listen for event");
+///
+/// println!("received ctrl-c event");
+/// }
+/// ```
+pub async fn ctrl_c() -> io::Result<()> {
+ os_impl::ctrl_c()?.recv().await;
+ Ok(())
}
diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs
index f695b07c..6b36bc4e 100644
--- a/tokio/src/signal/mod.rs
+++ b/tokio/src/signal/mod.rs
@@ -14,28 +14,15 @@
//!
//! # Examples
//!
-//! Print out all ctrl-C notifications received
+//! Print on "ctrl-c" notification.
//!
//! ```rust,no_run
//! use tokio::signal;
//!
-//! use futures_util::future;
-//! use futures_util::stream::StreamExt;
-//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
-//! // on this stream may represent multiple ctrl-c signals.
-//! let ctrl_c = signal::ctrl_c()?;
-//!
-//! // Process each ctrl-c as it comes in
-//! let prog = ctrl_c.for_each(|_| {
-//! println!("ctrl-c received!");
-//! future::ready(())
-//! });
-//!
-//! prog.await;
-//!
+//! signal::ctrl_c().await?;
+//! println!("ctrl-c received!");
//! Ok(())
//! }
//! ```
@@ -45,38 +32,25 @@
//! ```rust,no_run
//! # #[cfg(unix)] {
//!
-//! use tokio::signal::{self, unix::{signal, SignalKind}};
-//!
-//! use futures_util::future;
-//! use futures_util::stream::StreamExt;
+//! use tokio::signal::unix::{signal, SignalKind};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
-//! // on this stream may represent multiple ctrl-c signals.
-//! let ctrl_c = signal::ctrl_c()?;
-//!
-//! // Process each ctrl-c as it comes in
-//! let prog = ctrl_c.for_each(|_| {
-//! println!("ctrl-c received!");
-//! future::ready(())
-//! });
-//!
-//! prog.await;
-//!
-//! // Like the previous example, this is an infinite stream of signals
-//! // being received, and signals may be coalesced while pending.
-//! let stream = signal(SignalKind::hangup())?;
-//!
-//! // Convert out stream into a future and block the program
-//! let (signal, _stream) = stream.into_future().await;
-//! println!("got signal {:?}", signal);
-//! Ok(())
+//! // An infinite stream of hangup signals.
+//! let mut stream = signal(SignalKind::hangup())?;
+//!
+//! // Print whenever a HUP signal is received
+//! loop {
+//! stream.recv().await;
+//! println!("got signal HUP");
+//! }
//! }
//! # }
//! ```
mod ctrl_c;
+pub use ctrl_c::ctrl_c;
+
mod registry;
mod os {
@@ -89,5 +63,3 @@ mod os {
pub mod unix;
pub mod windows;
-
-pub use self::ctrl_c::{ctrl_c, CtrlC};
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs
index d608539c..0e017965 100644
--- a/tokio/src/signal/registry.rs
+++ b/tokio/src/signal/registry.rs
@@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> {
///
/// Returns true if an event was delivered to at least one listener.
fn broadcast(&self) -> bool {
+ use crate::sync::mpsc::error::TrySendError;
+
let mut did_notify = false;
self.storage.for_each(|event_info| {
// Any signal of this kind arrived since we checked last?
@@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> {
for i in (0..recipients.len()).rev() {
match recipients[i].try_send(()) {
Ok(()) => did_notify = true,
- Err(ref e) if e.is_closed() => {
+ Err(TrySendError::Closed(..)) => {
recipients.swap_remove(i);
}
// Channel is full, ignore the error since the
// receiver has already been woken up
- Err(e) => {
- // Sanity check in case this error type ever gets
- // additional variants we have not considered.
- debug_assert!(e.is_full());
- }
+ Err(_) => {}
}
}
});
@@ -180,7 +178,8 @@ mod tests {
use super::*;
use crate::runtime::{self, Runtime};
use crate::sync::{mpsc, oneshot};
- use futures::{future, StreamExt};
+
+ use futures::future;
#[test]
fn smoke() {
@@ -220,11 +219,7 @@ mod tests {
});
let _ = fire.send(());
- let all = future::join3(
- first_rx.collect::<Vec<_>>(),
- second_rx.collect::<Vec<_>>(),
- third_rx.collect::<Vec<_>>(),
- );
+ let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx));
let (first_results, second_results, third_results) = all.await;
assert_eq!(2, first_results.len());
@@ -279,7 +274,7 @@ mod tests {
});
let _ = fire.send(());
- let results: Vec<()> = third_rx.collect().await;
+ let results = collect(third_rx).await;
assert_eq!(1, results.len());
});
@@ -311,4 +306,14 @@ mod tests {
fn rt() -> Runtime {
runtime::Builder::new().current_thread().build().unwrap()
}
+
+ async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {
+ let mut ret = vec![];
+
+ while let Some(v) = rx.recv().await {
+ ret.push(v);
+ }
+
+ ret
+ }
}
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index 87871503..8551e85c 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -10,10 +10,8 @@ use crate::net::util::PollEvented;
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
-use futures_core::stream::Stream;
use libc::c_int;
use mio_uds::UnixStream;
-use std::future::Future;
use std::io::{self, Error, ErrorKind, Write};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -262,10 +260,8 @@ struct Driver {
wakeup: PollEvented<UnixStream>,
}
-impl Future for Driver {
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+impl Driver {
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Drain the data from the pipe and maintain interest in getting more
self.drain(cx);
// Broadcast any signals which were received
@@ -302,7 +298,7 @@ impl Driver {
/// We do *NOT* use the existence of any read bytes as evidence a sigal was
/// received since the `pending` flags would have already been set if that
/// was the case. See #38 for more info.
- fn drain(mut self: Pin<&mut Self>, cx: &mut Context<'_>) {
+ fn drain(&mut self, cx: &mut Context<'_>) {
loop {
match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
@@ -395,20 +391,24 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
Ok(Signal { driver, rx })
}
-pub(crate) fn ctrl_c() -> io::Result<Signal> {
- signal(SignalKind::interrupt())
-}
-
-impl Stream for Signal {
- type Item = ();
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let _ = Pin::new(&mut self.driver).poll(cx);
+impl Signal {
+ #[doc(hidden)] // TODO: Dox
+ pub async fn recv(&mut self) -> Option<()> {
+ use crate::future::poll_fn;
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+ #[doc(hidden)] // TODO: document
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}
+pub(crate) fn ctrl_c() -> io::Result<Signal> {
+ signal(SignalKind::interrupt())
+}
+
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index 96e585ba..6758566a 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -10,10 +10,8 @@
use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
-use futures_core::stream::Stream;
use std::convert::TryFrom;
use std::io;
-use std::pin::Pin;
use std::sync::Once;
use std::task::{Context, Poll};
use winapi::shared::minwindef::*;
@@ -82,6 +80,10 @@ pub(crate) struct Event {
rx: Receiver<()>,
}
+pub(crate) fn ctrl_c() -> io::Result<Event> {
+ Event::new(CTRL_C_EVENT)
+}
+
impl Event {
fn new(signum: DWORD) -> io::Result<Self> {
global_init()?;
@@ -91,17 +93,10 @@ impl Event {
Ok(Event { rx })
}
-}
-
-pub(crate) fn ctrl_c() -> io::Result<Event> {
- Event::new(CTRL_C_EVENT)
-}
-
-impl Stream for Event {
- type Item = ();
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- self.rx.poll_recv(cx)
+ pub(crate) async fn recv(&mut self) -> Option<()> {
+ use crate::future::poll_fn;
+ poll_fn(|cx| self.rx.poll_recv(cx)).await
}
}
@@ -109,6 +104,7 @@ fn global_init() -> io::Result<()> {
static INIT: Once = Once::new();
let mut init = None;
+
INIT.call_once(|| unsafe {
let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
let ret = if rc == 0 {
@@ -153,6 +149,22 @@ pub struct CtrlBreak {
inner: Event,
}
+impl CtrlBreak {
+ #[doc(hidden)] // TODO: document
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.rx.poll_recv(cx)
+ }
+}
+
+#[cfg(feature = "stream")]
+impl futures_core::Stream for CtrlBreak {
+ type Item = ();
+
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.poll_recv(cx)
+ }
+}
+
/// Creates a new stream which receives "ctrl-break" notifications sent to the
/// process.
///
@@ -161,29 +173,22 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
Event::new(CTRL_BREAK_EVENT).map(|inner| CtrlBreak { inner })
}
-impl Stream for CtrlBreak {
- type Item = ();
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- Pin::new(&mut self.inner)
- .poll_next(cx)
- .map(|item| item.map(|_| ()))
- }
-}
-
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::Runtime;
+ use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
- use futures_util::stream::StreamExt;
+ use futures::stream::StreamExt;
#[test]
fn ctrl_c() {
- let mut rt = rt();
+ let rt = rt();
- rt.block_on(async {
- let ctrl_c = crate::signal::ctrl_c().expect("failed to create CtrlC");
+ rt.enter(|| {
+ let mut ctrl_c = task::spawn(crate::signal::ctrl_c());
+
+ assert_pending!(ctrl_c.poll());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@@ -192,7 +197,7 @@ mod tests {
super::handler(CTRL_C_EVENT);
}
- let _ = ctrl_c.into_future().await;
+ assert_ready_ok!(ctrl_c.poll());
});
}
@@ -201,7 +206,7 @@ mod tests {
let mut rt = rt();
rt.block_on(async {
- let ctrl_break = super::ctrl_break().expect("failed to create CtrlC");
+ let mut ctrl_break = assert_ok!(super::ctrl_break());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@@ -210,7 +215,7 @@ mod tests {
super::handler(CTRL_BREAK_EVENT);
}
- let _ = ctrl_break.into_future().await;
+ ctrl_break.next().await.unwrap();
});
}