From 91bb0f73f55bc4e5ebed506acdbfc6591c714289 Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Mon, 1 Apr 2019 12:46:23 -0700 Subject: signal: refactor Windows registrations to be lazy (#1001) - Use `Handle::default` over `Handle::current` for consistent semantics - Make all `windows::Event` constructors lazily invoke `global_init` so they can be safely constructed off-task - Don't assume the reactor is alive and event registration will be done when calling `global_init` Add windows regression tests. Unfortunately, Windows doesn't have a reliable way of programmatically sending CTRL_C or CTRL_BREAK events to a progress, so the tests can only exercise our internal machinery by invoking the handler that we register with the OS Fixes #999 --- tokio-signal/CHANGELOG.md | 8 ++ tokio-signal/Cargo.toml | 2 +- tokio-signal/examples/ctrl-c.rs | 7 +- tokio-signal/src/windows.rs | 157 +++++++++++++++++++++++++++------------- 4 files changed, 121 insertions(+), 53 deletions(-) (limited to 'tokio-signal') diff --git a/tokio-signal/CHANGELOG.md b/tokio-signal/CHANGELOG.md index 56920565..5014fb4d 100644 --- a/tokio-signal/CHANGELOG.md +++ b/tokio-signal/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.2.9 + +### Fixed +- `windows::Event` performs internal registrations lazily, so now it can be +constructed outside of a running task +- remove usage of deprecated `Handle::current` in default `windows::Event` +constructors + # 0.2.8 (March 22, 2019) ### Fixed diff --git a/tokio-signal/Cargo.toml b/tokio-signal/Cargo.toml index e6ec22a3..275aefd4 100644 --- a/tokio-signal/Cargo.toml +++ b/tokio-signal/Cargo.toml @@ -38,4 +38,4 @@ tokio = "0.1.8" [target.'cfg(windows)'.dependencies.winapi] version = "0.3" -features = ["minwindef", "wincon"] +features = ["consoleapi", "minwindef", "wincon"] diff --git a/tokio-signal/examples/ctrl-c.rs b/tokio-signal/examples/ctrl-c.rs index 45ad82b9..6d6adfd7 100644 --- a/tokio-signal/examples/ctrl-c.rs +++ b/tokio-signal/examples/ctrl-c.rs @@ -53,7 +53,12 @@ fn main() -> Result<(), Box> { // Up until now, we haven't really DONE anything, just prepared // now it's time to actually schedule, and thus execute, the stream // on our event loop - tokio::runtime::current_thread::block_on_all(future)?; + // FIXME(1000): windows uses a global driver task which doesn't terminate + // on its own, so if we use block_on_all our application will never exit + //tokio::runtime::current_thread::block_on_all(future)?; + tokio::runtime::current_thread::Runtime::new() + .expect("failed to start runtime on current thread") + .block_on(future)?; println!("Stream ended, quiting the program."); Ok(()) diff --git a/tokio-signal/src/windows.rs b/tokio-signal/src/windows.rs index c30f8736..8e1cd49e 100644 --- a/tokio-signal/src/windows.rs +++ b/tokio-signal/src/windows.rs @@ -16,21 +16,18 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Once, ONCE_INIT}; use self::winapi::shared::minwindef::*; +use self::winapi::um::consoleapi::SetConsoleCtrlHandler; use self::winapi::um::wincon::*; use futures::future; use futures::stream::Fuse; use futures::sync::mpsc; use futures::sync::oneshot; -use futures::{Async, Future, IntoFuture, Poll, Stream}; +use futures::{Async, Future, Poll, Stream}; use mio::Ready; use tokio_reactor::{Handle, PollEvented}; use IoFuture; -extern "system" { - fn SetConsoleCtrlHandler(HandlerRoutine: usize, Add: BOOL) -> BOOL; -} - static INIT: Once = ONCE_INIT; static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _; @@ -85,7 +82,7 @@ impl Event { /// This function will register a handler via `SetConsoleCtrlHandler` and /// deliver notifications to the returned stream. pub fn ctrl_c() -> IoFuture { - Event::ctrl_c_handle(&Handle::current()) + Event::ctrl_c_handle(&Handle::default()) } /// Creates a new stream listening for the `CTRL_C_EVENT` events. @@ -101,7 +98,7 @@ impl Event { /// This function will register a handler via `SetConsoleCtrlHandler` and /// deliver notifications to the returned stream. pub fn ctrl_break() -> IoFuture { - Event::ctrl_break_handle(&Handle::current()) + Event::ctrl_break_handle(&Handle::default()) } /// Creates a new stream listening for the `CTRL_BREAK_EVENT` events. @@ -113,11 +110,17 @@ impl Event { } fn new(signum: DWORD, handle: &Handle) -> IoFuture { - let mut init = None; - INIT.call_once(|| { - init = Some(global_init(handle)); - }); - let new_signal = future::lazy(move || { + let handle = handle.clone(); + let new_signal = future::poll_fn(move || { + let mut init = None; + INIT.call_once(|| { + init = Some(global_init(&handle)); + }); + + if let Some(Err(e)) = init { + return Err(e); + } + let (tx, rx) = oneshot::channel(); let msg = Message::NewEvent(signum, tx); let res = unsafe { (*GLOBAL_STATE).tx.clone().unbounded_send(msg) }; @@ -125,12 +128,10 @@ impl Event { "failed to request a new signal stream, did the \ first event loop go away?", ); - rx.then(|r| r.unwrap()) + Ok(Async::Ready(rx.then(|r| r.unwrap()))) }); - match init { - Some(init) => Box::new(init.into_future().and_then(|()| new_signal)), - None => Box::new(new_signal), - } + + Box::new(new_signal.flatten()) } } @@ -145,11 +146,7 @@ impl Stream for Event { self.reg.clear_read_ready(Ready::readable())?; self.reg .get_ref() - .inner - .borrow() - .as_ref() - .unwrap() - .1 + .readiness .set_readiness(mio::Ready::empty()) .expect("failed to set readiness"); Ok(Async::Ready(Some(()))) @@ -157,12 +154,12 @@ impl Stream for Event { } fn global_init(handle: &Handle) -> io::Result<()> { + let reg = MyRegistration::new(); + let ready = reg.readiness.clone(); + let (tx, rx) = mpsc::unbounded(); - let reg = MyRegistration { - inner: RefCell::new(None), - }; let reg = try!(PollEvented::new_with_handle(reg, handle)); - let ready = reg.get_ref().inner.borrow().as_ref().unwrap().1.clone(); + unsafe { let state = Box::new(GlobalState { ready: ready, @@ -176,7 +173,7 @@ fn global_init(handle: &Handle) -> io::Result<()> { }); GLOBAL_STATE = Box::into_raw(state); - let rc = SetConsoleCtrlHandler(handler as usize, TRUE); + let rc = SetConsoleCtrlHandler(Some(handler), TRUE); if rc == 0 { Box::from_raw(GLOBAL_STATE); GLOBAL_STATE = 0 as *mut _; @@ -238,9 +235,9 @@ impl DriverTask { // Acquire the (registration, set_readiness) pair by... assuming // we're on the event loop (true because of the spawn above). - let reg = MyRegistration { - inner: RefCell::new(None), - }; + let reg = MyRegistration::new(); + let ready = reg.readiness.clone(); + let reg = match PollEvented::new_with_handle(reg, &self.handle) { Ok(reg) => reg, Err(e) => { @@ -252,7 +249,6 @@ impl DriverTask { // Create the `Event` to pass back and then also keep a handle to // the `SetReadiness` for ourselves internally. let (tx, rx) = oneshot::channel(); - let ready = reg.get_ref().inner.borrow_mut().as_mut().unwrap().1.clone(); drop(complete.send(Ok(Event { reg: reg, _finished: tx, @@ -268,13 +264,9 @@ impl DriverTask { self.reg.clear_read_ready(Ready::readable())?; self.reg .get_ref() - .inner - .borrow() - .as_ref() - .unwrap() - .1 + .readiness .set_readiness(mio::Ready::empty()) - .unwrap(); + .expect("failed to set readiness"); if unsafe { (*GLOBAL_STATE).ctrl_c.ready.swap(false, Ordering::SeqCst) } { for task in self.ctrl_c.tasks.iter() { @@ -305,15 +297,27 @@ unsafe extern "system" fn handler(ty: DWORD) -> BOOL { FALSE } else { drop((*GLOBAL_STATE).ready.set_readiness(mio::Ready::readable())); - // TODO: this will report that we handled a CTRL_BREAK_EVENT when in - // fact we may not have any streams actually created for that + // TODO(1000): this will report that we handled a CTRL_BREAK_EVENT when + // in fact we may not have any streams actually created for that // event. TRUE } } struct MyRegistration { - inner: RefCell>, + registration: mio::Registration, + readiness: mio::SetReadiness, +} + +impl MyRegistration { + fn new() -> Self { + let (registration, readiness) = mio::Registration::new2(); + + Self { + registration, + readiness, + } + } } impl mio::Evented for MyRegistration { @@ -324,23 +328,74 @@ impl mio::Evented for MyRegistration { events: mio::Ready, opts: mio::PollOpt, ) -> io::Result<()> { - let reg = mio::Registration::new2(); - reg.0.register(poll, token, events, opts)?; - *self.inner.borrow_mut() = Some(reg); - Ok(()) + self.registration.register(poll, token, events, opts) } fn reregister( &self, - _poll: &mio::Poll, - _token: mio::Token, - _events: mio::Ready, - _opts: mio::PollOpt, + poll: &mio::Poll, + token: mio::Token, + events: mio::Ready, + opts: mio::PollOpt, ) -> io::Result<()> { - Ok(()) + self.registration.reregister(poll, token, events, opts) } - fn deregister(&self, _poll: &mio::Poll) -> io::Result<()> { - Ok(()) + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + mio::Evented::deregister(&self.registration, poll) + } +} + +#[cfg(test)] +mod tests { + extern crate tokio; + + use self::tokio::runtime::current_thread; + use self::tokio::timer::Timeout; + use super::*; + use std::time::Duration; + + fn with_timeout(future: F) -> impl Future { + Timeout::new(future, Duration::from_secs(1)).map_err(|e| { + if e.is_timer() { + panic!("failed to register timer"); + } else if e.is_elapsed() { + panic!("timed out") + } else { + e.into_inner().expect("missing inner error") + } + }) + } + + #[test] + fn ctrl_c_and_ctrl_break() { + // FIXME(1000): combining into one test due to a restriction where the + // first event loop cannot go away + let mut rt = current_thread::Runtime::new().unwrap(); + let event_ctrl_c = rt + .block_on(with_timeout(Event::ctrl_c())) + .expect("failed to run future"); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(CTRL_C_EVENT); + } + + rt.block_on(with_timeout(event_ctrl_c.into_future())) + .ok() + .expect("failed to run event"); + + let event_ctrl_break = rt + .block_on(with_timeout(Event::ctrl_break())) + .expect("failed to run future"); + unsafe { + super::handler(CTRL_BREAK_EVENT); + } + + rt.block_on(with_timeout(event_ctrl_break.into_future())) + .ok() + .expect("failed to run event"); } } -- cgit v1.2.3