summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal/unix/driver.rs
blob: 323bb9dff129703feecbb75c2b05c62f52c83cad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#![cfg_attr(not(feature = "rt"), allow(dead_code))]

//! Signal driver

use crate::io::driver::Driver as IoDriver;
use crate::io::{Interest, PollEvented};
use crate::park::Park;
use crate::signal::registry::globals;

use mio::net::UnixStream;
use std::io::{self, Read};
use std::ptr;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

/// Responsible for registering wakeups when an OS signal is received, and
/// subsequently dispatching notifications to any signal listeners as appropriate.
///
/// Note: this driver relies on having an enabled IO driver in order to listen to
/// pipe write wakeups.
#[derive(Debug)]
pub(crate) struct Driver {
    /// Thread parker. The `Driver` park implementation delegates to this.
    park: IoDriver,

    /// A pipe for receiving wake events from the signal handler
    receiver: PollEvented<UnixStream>,

    /// Shared state
    inner: Arc<Inner>,
}

#[derive(Clone, Debug, Default)]
pub(crate) struct Handle {
    inner: Weak<Inner>,
}

#[derive(Debug)]
pub(super) struct Inner(());

// ===== impl Driver =====

impl Driver {
    /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
    pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
        use std::mem::ManuallyDrop;
        use std::os::unix::io::{AsRawFd, FromRawFd};

        // NB: We give each driver a "fresh" reciever file descriptor to avoid
        // the issues described in alexcrichton/tokio-process#42.
        //
        // In the past we would reuse the actual receiver file descriptor and
        // swallow any errors around double registration of the same descriptor.
        // I'm not sure if the second (failed) registration simply doesn't end
        // up receiving wake up notifications, or there could be some race
        // condition when consuming readiness events, but having distinct
        // descriptors for distinct PollEvented instances appears to mitigate
        // this.
        //
        // Unfortunately we cannot just use a single global PollEvented instance
        // either, since we can't compare Handles or assume they will always
        // point to the exact same reactor.
        //
        // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
        // with registering dups with the same reactor. In this case, duping is
        // safe as each dup is registered with separate reactors **and** we
        // only expect at least one dup to receive the notification.

        // Manually drop as we don't actually own this instance of UnixStream.
        let receiver_fd = globals().receiver.as_raw_fd();

        // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
        let original =
            ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
        let receiver = UnixStream::from_std(original.try_clone()?);
        let receiver = PollEvented::new_with_interest_and_handle(
            receiver,
            Interest::READABLE | Interest::WRITABLE,
            park.handle(),
        )?;

        Ok(Self {
            park,
            receiver,
            inner: Arc::new(Inner(())),
        })
    }

    /// Returns a handle to this event loop which can be sent across threads
    /// and can be used as a proxy to the event loop itself.
    pub(crate) fn handle(&self) -> Handle {
        Handle {
            inner: Arc::downgrade(&self.inner),
        }
    }

    fn process(&self) {
        // Check if the pipe is ready to read and therefore has "woken" us up
        //
        // To do so, we will `poll_read_ready` with a noop waker, since we don't
        // need to actually be notified when read ready...
        let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
        let mut cx = Context::from_waker(&waker);

        let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
            Poll::Ready(Ok(ev)) => ev,
            Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
            Poll::Pending => return, // No wake has arrived, bail
        };

        // Drain the pipe completely so we can receive a new readiness event
        // if another signal has come in.
        let mut buf = [0; 128];
        loop {
            match (&*self.receiver).read(&mut buf) {
                Ok(0) => panic!("EOF on self-pipe"),
                Ok(_) => continue, // Keep reading
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
                Err(e) => panic!("Bad read on self-pipe: {}", e),
            }
        }

        self.receiver.registration().clear_readiness(ev);

        // Broadcast any signals which were received
        globals().broadcast();
    }
}

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);

unsafe fn noop_clone(_data: *const ()) -> RawWaker {
    RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}

unsafe fn noop(_data: *const ()) {}

// ===== impl Park for Driver =====

impl Park for Driver {
    type Unpark = <IoDriver as Park>::Unpark;
    type Error = io::Error;

    fn unpark(&self) -> Self::Unpark {
        self.park.unpark()
    }

    fn park(&mut self) -> Result<(), Self::Error> {
        self.park.park()?;
        self.process();
        Ok(())
    }

    fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
        self.park.park_timeout(duration)?;
        self.process();
        Ok(())
    }

    fn shutdown(&mut self) {
        self.park.shutdown()
    }
}

// ===== impl Handle =====

impl Handle {
    pub(super) fn check_inner(&self) -> io::Result<()> {
        if self.inner.strong_count() > 0 {
            Ok(())
        } else {
            Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
        }
    }
}

cfg_rt! {
    impl Handle {
        /// Returns a handle to the current driver
        ///
        /// # Panics
        ///
        /// This function panics if there is no current signal driver set.
        pub(super) fn current() -> Self {
            crate::runtime::context::signal_handle().expect(
                "there is no signal driver running, must be called from the context of Tokio runtime",
            )
        }
    }
}

cfg_not_rt! {
    impl Handle {
        /// Returns a handle to the current driver
        ///
        /// # Panics
        ///
        /// This function panics if there is no current signal driver set.
        pub(super) fn current() -> Self {
            panic!(
                "there is no signal driver running, must be called from the context of Tokio runtime or with\
                `rt` enabled.",
            )
        }
    }
}