summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal/unix.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/signal/unix.rs')
-rw-r--r--tokio/src/signal/unix.rs81
1 files changed, 8 insertions, 73 deletions
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index bc48bdfa..45a091d7 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -5,7 +5,6 @@
#![cfg(unix)]
-use crate::io::{AsyncRead, PollEvented, ReadBuf};
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
@@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
+pub(crate) mod driver;
+
pub(crate) type OsStorage = Vec<SignalInfo>;
// Number of different unix signals
@@ -202,9 +203,9 @@ impl Default for SignalInfo {
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
-/// 2. Wake up driver tasks by writing a byte to a pipe
+/// 2. Wake up the driver by writing a byte to a pipe
///
-/// Those two operations shoudl both be async-signal safe.
+/// Those two operations should both be async-signal safe.
fn action(globals: Pin<&'static Globals>, signal: c_int) {
globals.record_event(signal as EventId);
@@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
));
}
+ // Check that we have a signal driver running
+ driver::Handle::current().check_inner()?;
+
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
Some(slot) => slot,
@@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
}
-#[derive(Debug)]
-struct Driver {
- wakeup: PollEvented<UnixStream>,
-}
-
-impl Driver {
- fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Drain the data from the pipe and maintain interest in getting more
- self.drain(cx);
- // Broadcast any signals which were received
- globals().broadcast();
-
- Poll::Pending
- }
-}
-
-impl Driver {
- fn new() -> io::Result<Driver> {
- // NB: We give each driver a "fresh" reciever file descriptor to avoid
- // the issues described in alexcrichton/tokio-process#42.
- //
- // In the past we would reuse the actual receiver file descriptor and
- // swallow any errors around double registration of the same descriptor.
- // I'm not sure if the second (failed) registration simply doesn't end up
- // receiving wake up notifications, or there could be some race condition
- // when consuming readiness events, but having distinct descriptors for
- // distinct PollEvented instances appears to mitigate this.
- //
- // Unfortunately we cannot just use a single global PollEvented instance
- // either, since we can't compare Handles or assume they will always
- // point to the exact same reactor.
- let stream = globals().receiver.try_clone()?;
- let wakeup = PollEvented::new(stream)?;
-
- Ok(Driver { wakeup })
- }
-
- /// Drain all data in the global receiver, ensuring we'll get woken up when
- /// there is a write on the other end.
- ///
- /// We do *NOT* use the existence of any read bytes as evidence a signal was
- /// received since the `pending` flags would have already been set if that
- /// was the case. See
- /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
- /// info.
- fn drain(&mut self, cx: &mut Context<'_>) {
- let mut buf = [0; 128];
- let mut buf = ReadBuf::new(&mut buf);
- loop {
- match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) {
- Poll::Ready(Ok(())) => {
- if buf.filled().is_empty() {
- panic!("EOF on self-pipe")
- }
- buf.clear();
- }
- Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
- Poll::Pending => break,
- }
- }
- }
-}
-
/// A stream of events for receiving a particular type of OS signal.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
@@ -382,7 +323,6 @@ impl Driver {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Signal {
- driver: Driver,
rx: Receiver<()>,
}
@@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
// Turn the signal delivery on once we are ready for it
signal_enable(signal)?;
- // Ensure there's a driver for our associated event loop processing
- // signals.
- let driver = Driver::new()?;
-
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
globals().register_listener(signal as EventId, tx);
- Ok(Signal { driver, rx })
+ Ok(Signal { rx })
}
impl Signal {
@@ -484,7 +420,6 @@ impl Signal {
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}