summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal/windows.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/signal/windows.rs')
-rw-r--r--tokio/src/signal/windows.rs63
1 files changed, 34 insertions, 29 deletions
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index 96e585ba..6758566a 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -10,10 +10,8 @@
use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
-use futures_core::stream::Stream;
use std::convert::TryFrom;
use std::io;
-use std::pin::Pin;
use std::sync::Once;
use std::task::{Context, Poll};
use winapi::shared::minwindef::*;
@@ -82,6 +80,10 @@ pub(crate) struct Event {
rx: Receiver<()>,
}
+pub(crate) fn ctrl_c() -> io::Result<Event> {
+ Event::new(CTRL_C_EVENT)
+}
+
impl Event {
fn new(signum: DWORD) -> io::Result<Self> {
global_init()?;
@@ -91,17 +93,10 @@ impl Event {
Ok(Event { rx })
}
-}
-
-pub(crate) fn ctrl_c() -> io::Result<Event> {
- Event::new(CTRL_C_EVENT)
-}
-
-impl Stream for Event {
- type Item = ();
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- self.rx.poll_recv(cx)
+ pub(crate) async fn recv(&mut self) -> Option<()> {
+ use crate::future::poll_fn;
+ poll_fn(|cx| self.rx.poll_recv(cx)).await
}
}
@@ -109,6 +104,7 @@ fn global_init() -> io::Result<()> {
static INIT: Once = Once::new();
let mut init = None;
+
INIT.call_once(|| unsafe {
let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
let ret = if rc == 0 {
@@ -153,6 +149,22 @@ pub struct CtrlBreak {
inner: Event,
}
+impl CtrlBreak {
+ #[doc(hidden)] // TODO: document
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.rx.poll_recv(cx)
+ }
+}
+
+#[cfg(feature = "stream")]
+impl futures_core::Stream for CtrlBreak {
+ type Item = ();
+
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.poll_recv(cx)
+ }
+}
+
/// Creates a new stream which receives "ctrl-break" notifications sent to the
/// process.
///
@@ -161,29 +173,22 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
Event::new(CTRL_BREAK_EVENT).map(|inner| CtrlBreak { inner })
}
-impl Stream for CtrlBreak {
- type Item = ();
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- Pin::new(&mut self.inner)
- .poll_next(cx)
- .map(|item| item.map(|_| ()))
- }
-}
-
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::Runtime;
+ use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
- use futures_util::stream::StreamExt;
+ use futures::stream::StreamExt;
#[test]
fn ctrl_c() {
- let mut rt = rt();
+ let rt = rt();
- rt.block_on(async {
- let ctrl_c = crate::signal::ctrl_c().expect("failed to create CtrlC");
+ rt.enter(|| {
+ let mut ctrl_c = task::spawn(crate::signal::ctrl_c());
+
+ assert_pending!(ctrl_c.poll());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@@ -192,7 +197,7 @@ mod tests {
super::handler(CTRL_C_EVENT);
}
- let _ = ctrl_c.into_future().await;
+ assert_ready_ok!(ctrl_c.poll());
});
}
@@ -201,7 +206,7 @@ mod tests {
let mut rt = rt();
rt.block_on(async {
- let ctrl_break = super::ctrl_break().expect("failed to create CtrlC");
+ let mut ctrl_break = assert_ok!(super::ctrl_break());
// Windows doesn't have a good programmatic way of sending events
// like sending signals on Unix, so we'll stub out the actual OS
@@ -210,7 +215,7 @@ mod tests {
super::handler(CTRL_BREAK_EVENT);
}
- let _ = ctrl_break.into_future().await;
+ ctrl_break.next().await.unwrap();
});
}