summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorIvan Petkov <ivanppetkov@gmail.com>2020-09-22 15:40:44 -0700
committerGitHub <noreply@github.com>2020-09-22 15:40:44 -0700
commit7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb (patch)
tree990e22703eb7103b030f5e8bf117557684a9f205 /tokio/src
parente09b90ea32385337170ce17eb55ab372f9388af5 (diff)
signal: move driver to runtime thread (#2835)
Refactors the signal infrastructure to move the driver to the runtime thread. This follows the model put forth by the I/O driver and time driver.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/io/registration.rs13
-rw-r--r--tokio/src/runtime/builder.rs47
-rw-r--r--tokio/src/runtime/context.rs16
-rw-r--r--tokio/src/runtime/driver.rs196
-rw-r--r--tokio/src/runtime/handle.rs11
-rw-r--r--tokio/src/runtime/io.rs63
-rw-r--r--tokio/src/runtime/mod.rs11
-rw-r--r--tokio/src/runtime/park.rs10
-rw-r--r--tokio/src/runtime/shell.rs8
-rw-r--r--tokio/src/runtime/time.rs59
-rw-r--r--tokio/src/signal/unix.rs81
-rw-r--r--tokio/src/signal/unix/driver.rs153
12 files changed, 428 insertions, 240 deletions
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index 82065072..b805d2b9 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -109,7 +109,18 @@ impl Registration {
where
T: Evented,
{
- let handle = Handle::current();
+ Self::new_with_ready_and_handle(io, ready, Handle::current())
+ }
+
+ /// Same as `new_with_ready` but also accepts an explicit handle.
+ pub(crate) fn new_with_ready_and_handle<T>(
+ io: &T,
+ ready: mio::Ready,
+ handle: Handle,
+ ) -> io::Result<Registration>
+ where
+ T: Evented,
+ {
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 42aed3e9..4072b04e 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,7 +1,7 @@
use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
-use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
+use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
use std::fmt;
#[cfg(feature = "blocking")]
@@ -359,14 +359,17 @@ impl Builder {
}
}
+ fn get_cfg(&self) -> driver::Cfg {
+ driver::Cfg {
+ enable_io: self.enable_io,
+ enable_time: self.enable_time,
+ }
+ }
+
fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;
- let clock = time::create_clock();
-
- // Create I/O driver
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
let spawner = Spawner::Shell;
@@ -377,9 +380,10 @@ impl Builder {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
- io_handle,
- time_handle,
- clock,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
blocking_spawner,
},
blocking_pool,
@@ -478,12 +482,7 @@ cfg_rt_core! {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
- let clock = time::create_clock();
-
- // Create I/O driver
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
-
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
@@ -500,9 +499,10 @@ cfg_rt_core! {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
- io_handle,
- time_handle,
- clock,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
blocking_spawner,
},
blocking_pool,
@@ -533,10 +533,8 @@ cfg_rt_threaded! {
let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
- let clock = time::create_clock();
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
@@ -547,9 +545,10 @@ cfg_rt_threaded! {
// Create the runtime handle
let handle = Handle {
spawner,
- io_handle,
- time_handle,
- clock,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
blocking_spawner,
};
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index c42b3432..7498175f 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -14,7 +14,7 @@ cfg_blocking_impl! {
}
cfg_io_driver! {
- pub(crate) fn io_handle() -> crate::runtime::io::Handle {
+ pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
@@ -22,8 +22,18 @@ cfg_io_driver! {
}
}
+cfg_signal! {
+ #[cfg(unix)]
+ pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.signal_handle.clone(),
+ None => Default::default(),
+ })
+ }
+}
+
cfg_time! {
- pub(crate) fn time_handle() -> crate::runtime::time::Handle {
+ pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => Default::default(),
@@ -31,7 +41,7 @@ cfg_time! {
}
cfg_test_util! {
- pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
+ pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,
diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs
new file mode 100644
index 00000000..2ea50890
--- /dev/null
+++ b/tokio/src/runtime/driver.rs
@@ -0,0 +1,196 @@
+//! Abstracts out the entire chain of runtime sub-drivers into common types.
+use crate::park::{Park, ParkThread};
+use std::io;
+use std::time::Duration;
+
+// ===== io driver =====
+
+cfg_io_driver! {
+ type IoDriver = crate::park::Either<crate::io::driver::Driver, crate::park::ParkThread>;
+ pub(crate) type IoHandle = Option<crate::io::driver::Handle>;
+
+ fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> {
+ use crate::park::Either;
+
+ #[cfg(loom)]
+ assert!(!enable);
+
+ if enable {
+ let driver = crate::io::driver::Driver::new()?;
+ let handle = driver.handle();
+
+ Ok((Either::A(driver), Some(handle)))
+ } else {
+ let driver = ParkThread::new();
+ Ok((Either::B(driver), None))
+ }
+ }
+}
+
+cfg_not_io_driver! {
+ type IoDriver = ParkThread;
+ pub(crate) type IoHandle = ();
+
+ fn create_io_driver(_enable: bool) -> io::Result<(IoDriver, IoHandle)> {
+ let driver = ParkThread::new();
+ Ok((driver, ()))
+ }
+}
+
+// ===== signal driver =====
+
+macro_rules! cfg_unix_and_signal {
+ ($($item:item)*) => {
+ $(
+ #[cfg(all(not(loom), unix, feature = "signal"))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_neither_unix_nor_windows {
+ ($($item:item)*) => {
+ $(
+ #[cfg(any(loom, not(all(unix, feature = "signal"))))]
+ $item
+ )*
+ }
+}
+
+cfg_unix_and_signal! {
+ type SignalDriver = crate::park::Either<crate::signal::unix::driver::Driver, IoDriver>;
+ pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>;
+
+ fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
+ use crate::park::Either;
+
+ // Enable the signal driver if IO is also enabled
+ match io_driver {
+ Either::A(io_driver) => {
+ let driver = crate::signal::unix::driver::Driver::new(io_driver)?;
+ let handle = driver.handle();
+ Ok((Either::A(driver), Some(handle)))
+ }
+ Either::B(_) => Ok((Either::B(io_driver), None)),
+ }
+ }
+}
+
+cfg_neither_unix_nor_windows! {
+ type SignalDriver = IoDriver;
+ pub(crate) type SignalHandle = ();
+
+ fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
+ Ok((io_driver, ()))
+ }
+}
+
+// ===== time driver =====
+
+cfg_time! {
+ type TimeDriver = crate::park::Either<crate::time::driver::Driver<SignalDriver>, SignalDriver>;
+
+ pub(crate) type Clock = crate::time::Clock;
+ pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
+
+ fn create_clock() -> Clock {
+ crate::time::Clock::new()
+ }
+
+ fn create_time_driver(
+ enable: bool,
+ signal_driver: SignalDriver,
+ clock: Clock,
+ ) -> (TimeDriver, TimeHandle) {
+ use crate::park::Either;
+
+ if enable {
+ let driver = crate::time::driver::Driver::new(signal_driver, clock);
+ let handle = driver.handle();
+
+ (Either::A(driver), Some(handle))
+ } else {
+ (Either::B(signal_driver), None)
+ }
+ }
+}
+
+cfg_not_time! {
+ type TimeDriver = SignalDriver;
+
+ pub(crate) type Clock = ();
+ pub(crate) type TimeHandle = ();
+
+ fn create_clock() -> Clock {
+ ()
+ }
+
+ fn create_time_driver(
+ _enable: bool,
+ signal_driver: SignalDriver,
+ _clock: Clock,
+ ) -> (TimeDriver, TimeHandle) {
+ (signal_driver, ())
+ }
+}
+
+// ===== runtime driver =====
+
+#[derive(Debug)]
+pub(crate) struct Driver {
+ inner: TimeDriver,
+}
+
+pub(crate) struct Resources {
+ pub(crate) io_handle: IoHandle,
+ pub(crate) signal_handle: SignalHandle,
+ pub(crate) time_handle: TimeHandle,
+ pub(crate) clock: Clock,
+}
+
+pub(crate) struct Cfg {
+ pub(crate) enable_io: bool,
+ pub(crate) enable_time: bool,
+}
+
+impl Driver {
+ pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
+ let clock = create_clock();
+
+ let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?;
+ let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
+ let (time_driver, time_handle) =
+ create_time_driver(cfg.enable_time, signal_driver, clock.clone());
+
+ Ok((
+ Self { inner: time_driver },
+ Resources {
+ io_handle,
+ signal_handle,
+ time_handle,
+ clock,
+ },
+ ))
+ }
+}
+
+impl Park for Driver {
+ type Unpark = <TimeDriver as Park>::Unpark;
+ type Error = <TimeDriver as Park>::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.inner.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+
+ fn shutdown(&mut self) {
+ self.inner.shutdown()
+ }
+}
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index 516ad4b3..dfcc5e97 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -1,4 +1,4 @@
-use crate::runtime::{blocking, context, io, time, Spawner};
+use crate::runtime::{blocking, context, driver, Spawner};
/// Handle to the runtime.
///
@@ -11,13 +11,16 @@ pub(crate) struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
- pub(super) io_handle: io::Handle,
+ pub(super) io_handle: driver::IoHandle,
+
+ /// Handles to the signal drivers
+ pub(super) signal_handle: driver::SignalHandle,
/// Handles to the time drivers
- pub(super) time_handle: time::Handle,
+ pub(super) time_handle: driver::TimeHandle,
/// Source of `Instant::now()`
- pub(super) clock: time::Clock,
+ pub(super) clock: driver::Clock,
/// Blocking pool spawner
pub(super) blocking_spawner: blocking::Spawner,
diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs
deleted file mode 100644
index 6a0953af..00000000
--- a/tokio/src/runtime/io.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O
-//! driver. When the `time` feature flag is **not** enabled. These APIs are
-//! shells. This isolates the complexity of dealing with conditional
-//! compilation.
-
-/// Re-exported for convenience.
-pub(crate) use std::io::Result;
-
-pub(crate) use variant::*;
-
-#[cfg(feature = "io-driver")]
-mod variant {
- use crate::io::driver;
- use crate::park::{Either, ParkThread};
-
- use std::io;
-
- /// The driver value the runtime passes to the `timer` layer.
- ///
- /// When the `io-driver` feature is enabled, this is the "real" I/O driver
- /// backed by Mio. Without the `io-driver` feature, this is a thread parker
- /// backed by a condition variable.
- pub(crate) type Driver = Either<driver::Driver, ParkThread>;
-
- /// The handle the runtime stores for future use.
- ///
- /// When the `io-driver` feature is **not** enabled, this is `()`.
- pub(crate) type Handle = Option<driver::Handle>;
-
- pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
- #[cfg(loom)]
- assert!(!enable);
-
- if enable {
- let driver = driver::Driver::new()?;
- let handle = driver.handle();
-
- Ok((Either::A(driver), Some(handle)))
- } else {
- let driver = ParkThread::new();
- Ok((Either::B(driver), None))
- }
- }
-}
-
-#[cfg(not(feature = "io-driver"))]
-mod variant {
- use crate::park::ParkThread;
-
- use std::io;
-
- /// I/O is not enabled, use a condition variable based parker
- pub(crate) type Driver = ParkThread;
-
- /// There is no handle
- pub(crate) type Handle = ();
-
- pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> {
- let driver = ParkThread::new();
-
- Ok((driver, ()))
- }
-}
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index bec0ecd5..884c2b46 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -208,13 +208,18 @@ cfg_blocking_impl! {
mod builder;
pub use self::builder::Builder;
+pub(crate) mod driver;
+
pub(crate) mod enter;
use self::enter::enter;
mod handle;
use handle::Handle;
-mod io;
+mod io {
+ /// Re-exported for convenience.
+ pub(crate) use std::io::Result;
+}
cfg_rt_threaded! {
mod park;
@@ -227,8 +232,6 @@ use self::shell::Shell;
mod spawner;
use self::spawner::Spawner;
-mod time;
-
cfg_rt_threaded! {
mod queue;
@@ -293,7 +296,7 @@ enum Kind {
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
- Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
+ Basic(Mutex<Option<BasicScheduler<driver::Driver>>>),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs
index 1dcf65af..c994c935 100644
--- a/tokio/src/runtime/park.rs
+++ b/tokio/src/runtime/park.rs
@@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::park::{Park, Unpark};
-use crate::runtime::time;
+use crate::runtime::driver::Driver;
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
@@ -42,14 +42,14 @@ const NOTIFIED: usize = 3;
/// Shared across multiple Parker handles
struct Shared {
/// Shared driver. Only one thread at a time can use this
- driver: TryLock<time::Driver>,
+ driver: TryLock<Driver>,
/// Unpark handle
- handle: <time::Driver as Park>::Unpark,
+ handle: <Driver as Park>::Unpark,
}
impl Parker {
- pub(crate) fn new(driver: time::Driver) -> Parker {
+ pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();
Parker {
@@ -180,7 +180,7 @@ impl Inner {
}
}
- fn park_driver(&self, driver: &mut time::Driver) {
+ fn park_driver(&self, driver: &mut Driver) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs
index a65869d0..3d631239 100644
--- a/tokio/src/runtime/shell.rs
+++ b/tokio/src/runtime/shell.rs
@@ -1,8 +1,8 @@
#![allow(clippy::redundant_clone)]
use crate::park::{Park, Unpark};
+use crate::runtime::driver::Driver;
use crate::runtime::enter;
-use crate::runtime::time;
use crate::util::{waker_ref, Wake};
use std::future::Future;
@@ -12,17 +12,17 @@ use std::task::Poll::Ready;
#[derive(Debug)]
pub(super) struct Shell {
- driver: time::Driver,
+ driver: Driver,
/// TODO: don't store this
unpark: Arc<Handle>,
}
#[derive(Debug)]
-struct Handle(<time::Driver as Park>::Unpark);
+struct Handle(<Driver as Park>::Unpark);
impl Shell {
- pub(super) fn new(driver: time::Driver) -> Shell {
+ pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));
Shell { driver, unpark }
diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs
deleted file mode 100644
index c623d964..00000000
--- a/tokio/src/runtime/time.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-//! Abstracts out the APIs necessary to `Runtime` for integrating the time
-//! driver. When the `time` feature flag is **not** enabled. These APIs are
-//! shells. This isolates the complexity of dealing with conditional
-//! compilation.
-
-pub(crate) use variant::*;
-
-#[cfg(feature = "time")]
-mod variant {
- use crate::park::Either;
- use crate::runtime::io;
- use crate::time::{self, driver};
-
- pub(crate) type Clock = time::Clock;
- pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>;
- pub(crate) type Handle = Option<driver::Handle>;
-
- pub(crate) fn create_clock() -> Clock {
- Clock::new()
- }
-
- /// Create a new timer driver / handle pair
- pub(crate) fn create_driver(
- enable: bool,
- io_driver: io::Driver,
- clock: Clock,
- ) -> (Driver, Handle) {
- if enable {
- let driver = driver::Driver::new(io_driver, clock);
- let handle = driver.handle();
-
- (Either::A(driver), Some(handle))
- } else {
- (Either::B(io_driver), None)
- }
- }
-}
-
-#[cfg(not(feature = "time"))]
-mod variant {
- use crate::runtime::io;
-
- pub(crate) type Clock = ();
- pub(crate) type Driver = io::Driver;
- pub(crate) type Handle = ();
-
- pub(crate) fn create_clock() -> Clock {
- ()
- }
-
- /// Create a new timer driver / handle pair
- pub(crate) fn create_driver(
- _enable: bool,
- io_driver: io::Driver,
- _clock: Clock,
- ) -> (Driver, Handle) {
- (io_driver, ())
- }
-}
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index bc48bdfa..45a091d7 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -5,7 +5,6 @@
#![cfg(unix)]
-use crate::io::{AsyncRead, PollEvented, ReadBuf};
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
@@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
+pub(crate) mod driver;
+
pub(crate) type OsStorage = Vec<SignalInfo>;
// Number of different unix signals
@@ -202,9 +203,9 @@ impl Default for SignalInfo {
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
-/// 2. Wake up driver tasks by writing a byte to a pipe
+/// 2. Wake up the driver by writing a byte to a pipe
///
-/// Those two operations shoudl both be async-signal safe.
+/// Those two operations should both be async-signal safe.
fn action(globals: Pin<&'static Globals>, signal: c_int) {
globals.record_event(signal as EventId);
@@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
));
}
+ // Check that we have a signal driver running
+ driver::Handle::current().check_inner()?;
+
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
Some(slot) => slot,
@@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
}
-#[derive(Debug)]
-struct Driver {
- wakeup: PollEvented<UnixStream>,
-}
-
-impl Driver {
- fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Drain the data from the pipe and maintain interest in getting more
- self.drain(cx);
- // Broadcast any signals which were received
- globals().broadcast();
-
- Poll::Pending
- }
-}
-
-impl Driver {
- fn new() -> io::Result<Driver> {
- // NB: We give each driver a "fresh" reciever file descriptor to avoid
- // the issues described in alexcrichton/tokio-process#42.
- //
- // In the past we would reuse the actual receiver file descriptor and
- // swallow any errors around double registration of the same descriptor.
- // I'm not sure if the second (failed) registration simply doesn't end up
- // receiving wake up notifications, or there could be some race condition
- // when consuming readiness events, but having distinct descriptors for
- // distinct PollEvented instances appears to mitigate this.
- //
- // Unfortunately we cannot just use a single global PollEvented instance
- // either, since we can't compare Handles or assume they will always
- // point to the exact same reactor.
- let stream = globals().receiver.try_clone()?;
- let wakeup = PollEvented::new(stream)?;
-
- Ok(Driver { wakeup })
- }
-
- /// Drain all data in the global receiver, ensuring we'll get woken up when
- /// there is a write on the other end.
- ///
- /// We do *NOT* use the existence of any read bytes as evidence a signal was
- /// received since the `pending` flags would have already been set if that
- /// was the case. See
- /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
- /// info.
- fn drain(&mut self, cx: &mut Context<'_>) {
- let mut buf = [0; 128];
- let mut buf = ReadBuf::new(&mut buf);
- loop {
- match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) {
- Poll::Ready(Ok(())) => {
- if buf.filled().is_empty() {
- panic!("EOF on self-pipe")
- }
- buf.clear();
- }
- Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
- Poll::Pending => break,
- }
- }
- }
-}
-
/// A stream of events for receiving a particular type of OS signal.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
@@ -382,7 +323,6 @@ impl Driver {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Signal {
- driver: Driver,
rx: Receiver<()>,
}
@@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
// Turn the signal delivery on once we are ready for it
signal_enable(signal)?;
- // Ensure there's a driver for our associated event loop processing
- // signals.
- let driver = Driver::new()?;
-
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
globals().register_listener(signal as EventId, tx);
- Ok(Signal { driver, rx })
+ Ok(Signal { rx })
}
impl Signal {
@@ -484,7 +420,6 @@ impl Signal {
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}
diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs
new file mode 100644
index 00000000..0458893c
--- /dev/null
+++ b/tokio/src/signal/unix/driver.rs
@@ -0,0 +1,153 @@
+//! Signal driver
+
+use crate::io::driver::Driver as IoDriver;
+use crate::io::Registration;
+use crate::park::Park;
+use crate::runtime::context;
+use crate::signal::registry::globals;
+use mio_uds::UnixStream;
+use std::io::{self, Read};
+use std::sync::{Arc, Weak};
+use std::time::Duration;
+
+/// Responsible for registering wakeups when an OS signal is received, and
+/// subsequently dispatching notifications to any signal listeners as appropriate.
+///
+/// Note: this driver relies on having an enabled IO driver in order to listen to
+/// pipe write wakeups.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ /// Thread parker. The `Driver` park implementation delegates to this.
+ park: IoDriver,
+
+ /// A pipe for receiving wake events from the signal handler
+ receiver: UnixStream,
+
+ /// The actual registraiton for `receiver` when active.
+ /// Lazily bound at the first signal registration.
+ registration: Registration,
+
+ /// Shared state
+ inner: Arc<Inner>,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct Handle {
+ inner: Weak<Inner>,
+}
+
+#[derive(Debug)]
+pub(super) struct Inner(());
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
+ // NB: We give each driver a "fresh" reciever file descriptor to avoid
+ // the issues described in alexcrichton/tokio-process#42.
+ //
+ // In the past we would reuse the actual receiver file descriptor and
+ // swallow any errors around double registration of the same descriptor.
+ // I'm not sure if the second (failed) registration simply doesn't end up
+ // receiving wake up notifications, or there could be some race condition
+ // when consuming readiness events, but having distinct descriptors for
+ // distinct PollEvented instances appears to mitigate this.
+ //
+ // Unfortunately we cannot just use a single global PollEvented instance
+ // either, since we can't compare Handles or assume they will always
+ // point to the exact same reactor.
+ let receiver = globals().receiver.try_clone()?;
+ let registration =
+ Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?;
+
+ Ok(Self {
+ park,
+ receiver,
+ registration,
+ inner: Arc::new(Inner(())),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ pub(crate) fn handle(&self) -> Handle {
+ Handle {
+ inner: Arc::downgrade(&self.inner),
+ }
+ }
+
+ fn process(&self) {
+ // Check if the pipe is ready to read and therefore has "woken" us up
+ match self.registration.take_read_ready() {
+ Ok(Some(ready)) => assert!(ready.is_readable()),
+ Ok(None) => return, // No wake has arrived, bail
+ Err(e) => panic!("reactor gone: {}", e),
+ }
+
+ // Drain the pipe completely so we can receive a new readiness event
+ // if another signal has come in.
+ let mut buf = [0; 128];
+ loop {
+ match (&self.receiver).read(&mut buf) {
+ Ok(0) => panic!("EOF on self-pipe"),
+ Ok(_) => continue, // Keep reading
+ Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
+ Err(e) => panic!("Bad read on self-pipe: {}", e),
+ }
+ }
+
+ // Broadcast any signals which were received
+ globals().broadcast();
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <IoDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ self.process();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ self.process();
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Returns a handle to the current driver
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current signal driver set.
+ pub(super) fn current() -> Self {
+ context::signal_handle().expect(
+ "there is no signal driver running, must be called from the context of Tokio runtime",
+ )
+ }
+
+ pub(super) fn check_inner(&self) -> io::Result<()> {
+ if self.inner.strong_count() > 0 {
+ Ok(())
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
+ }
+ }
+}