summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan Petkov <ivanppetkov@gmail.com>2020-10-06 10:30:16 -0700
committerGitHub <noreply@github.com>2020-10-06 17:30:16 +0000
commit4cf45c038b9691f24fac22df13594c2223b185f6 (patch)
tree2e9a92280ce9860daf2a8b3d57ae11441264097b
parent9730317e94cd5bfca237376549405a6feb815223 (diff)
process: add ProcessDriver to handle orphan reaping (#2907)
-rw-r--r--tokio/src/macros/cfg.rs17
-rw-r--r--tokio/src/process/mod.rs5
-rw-r--r--tokio/src/process/unix/driver.rs154
-rw-r--r--tokio/src/process/unix/mod.rs30
-rw-r--r--tokio/src/process/unix/orphan.rs78
-rw-r--r--tokio/src/process/unix/reap.rs57
-rw-r--r--tokio/src/runtime/driver.rs45
-rw-r--r--tokio/src/signal/unix.rs37
-rw-r--r--tokio/src/signal/unix/driver.rs2
9 files changed, 327 insertions, 98 deletions
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index f245b09e..328f3230 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -242,6 +242,23 @@ macro_rules! cfg_process {
}
}
+macro_rules! cfg_process_driver {
+ ($($item:item)*) => {
+ #[cfg(unix)]
+ #[cfg(not(loom))]
+ cfg_process! { $($item)* }
+ }
+}
+
+macro_rules! cfg_not_process_driver {
+ ($($item:item)*) => {
+ $(
+ #[cfg(not(all(unix, not(loom), feature = "process")))]
+ $item
+ )*
+ }
+}
+
macro_rules! cfg_signal {
($($item:item)*) => {
$(
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs
index 7da9783b..ff84428e 100644
--- a/tokio/src/process/mod.rs
+++ b/tokio/src/process/mod.rs
@@ -113,6 +113,11 @@
#[cfg(unix)]
mod imp;
+#[cfg(unix)]
+pub(crate) mod unix {
+ pub(crate) use super::imp::*;
+}
+
#[path = "windows.rs"]
#[cfg(windows)]
mod imp;
diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs
new file mode 100644
index 00000000..2eea0043
--- /dev/null
+++ b/tokio/src/process/unix/driver.rs
@@ -0,0 +1,154 @@
+//! Process driver
+
+use crate::park::Park;
+use crate::process::unix::orphan::ReapOrphanQueue;
+use crate::process::unix::GlobalOrphanQueue;
+use crate::signal::unix::driver::Driver as SignalDriver;
+use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
+use crate::sync::mpsc::error::TryRecvError;
+
+use std::io;
+use std::time::Duration;
+
+/// Responsible for cleaning up orphaned child processes on Unix platforms.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ park: SignalDriver,
+ inner: CoreDriver<Signal, GlobalOrphanQueue>,
+}
+
+#[derive(Debug)]
+struct CoreDriver<S, Q> {
+ sigchild: S,
+ orphan_queue: Q,
+}
+
+// ===== impl CoreDriver =====
+
+impl<S, Q> CoreDriver<S, Q>
+where
+ S: InternalStream,
+ Q: ReapOrphanQueue,
+{
+ fn got_signal(&mut self) -> bool {
+ match self.sigchild.try_recv() {
+ Ok(()) => true,
+ Err(TryRecvError::Empty) => false,
+ Err(TryRecvError::Closed) => panic!("signal was deregistered"),
+ }
+ }
+
+ fn process(&mut self) {
+ if self.got_signal() {
+ // Drain all notifications which may have been buffered
+ // so we can try to reap all orphans in one batch
+ while self.got_signal() {}
+
+ self.orphan_queue.reap_orphans();
+ }
+ }
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: SignalDriver) -> io::Result<Self> {
+ let sigchild = signal_with_handle(SignalKind::child(), park.handle())?;
+ let inner = CoreDriver {
+ sigchild,
+ orphan_queue: GlobalOrphanQueue,
+ };
+
+ Ok(Self { park, inner })
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <SignalDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ self.inner.process();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ self.inner.process();
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::process::unix::orphan::test::MockQueue;
+ use crate::sync::mpsc::error::TryRecvError;
+ use std::task::{Context, Poll};
+
+ struct MockStream {
+ total_try_recv: usize,
+ values: Vec<Option<()>>,
+ }
+
+ impl MockStream {
+ fn new(values: Vec<Option<()>>) -> Self {
+ Self {
+ total_try_recv: 0,
+ values,
+ }
+ }
+ }
+
+ impl InternalStream for MockStream {
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ unimplemented!();
+ }
+
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.total_try_recv += 1;
+ match self.values.remove(0) {
+ Some(()) => Ok(()),
+ None => Err(TryRecvError::Empty),
+ }
+ }
+ }
+
+ #[test]
+ fn no_reap_if_no_signal() {
+ let mut driver = CoreDriver {
+ sigchild: MockStream::new(vec![None]),
+ orphan_queue: MockQueue::<()>::new(),
+ };
+
+ driver.process();
+
+ assert_eq!(1, driver.sigchild.total_try_recv);
+ assert_eq!(0, driver.orphan_queue.total_reaps.get());
+ }
+
+ #[test]
+ fn coalesce_signals_before_reaping() {
+ let mut driver = CoreDriver {
+ sigchild: MockStream::new(vec![Some(()), Some(()), None]),
+ orphan_queue: MockQueue::<()>::new(),
+ };
+
+ driver.process();
+
+ assert_eq!(3, driver.sigchild.total_try_recv);
+ assert_eq!(1, driver.orphan_queue.total_reaps.get());
+ }
+}
diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs
index 46f1d790..db9d592c 100644
--- a/tokio/src/process/unix/mod.rs
+++ b/tokio/src/process/unix/mod.rs
@@ -21,8 +21,10 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...
-mod orphan;
-use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
+pub(crate) mod driver;
+
+pub(crate) mod orphan;
+use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait};
mod reap;
use reap::Reaper;
@@ -39,11 +41,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
-use std::process::ExitStatus;
+use std::process::{Child as StdChild, ExitStatus};
use std::task::Context;
use std::task::Poll;
-impl Wait for std::process::Child {
+impl Wait for StdChild {
fn id(&self) -> u32 {
self.id()
}
@@ -53,17 +55,17 @@ impl Wait for std::process::Child {
}
}
-impl Kill for std::process::Child {
+impl Kill for StdChild {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}
lazy_static::lazy_static! {
- static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
+ static ref ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
}
-struct GlobalOrphanQueue;
+pub(crate) struct GlobalOrphanQueue;
impl fmt::Debug for GlobalOrphanQueue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -71,19 +73,21 @@ impl fmt::Debug for GlobalOrphanQueue {
}
}
-impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
- fn push_orphan(&self, orphan: std::process::Child) {
- ORPHAN_QUEUE.push_orphan(orphan)
- }
-
+impl ReapOrphanQueue for GlobalOrphanQueue {
fn reap_orphans(&self) {
ORPHAN_QUEUE.reap_orphans()
}
}
+impl OrphanQueue<StdChild> for GlobalOrphanQueue {
+ fn push_orphan(&self, orphan: StdChild) {
+ ORPHAN_QUEUE.push_orphan(orphan)
+ }
+}
+
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
- inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
+ inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
}
impl fmt::Debug for Child {
diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs
index 6c449a90..8a1e1278 100644
--- a/tokio/src/process/unix/orphan.rs
+++ b/tokio/src/process/unix/orphan.rs
@@ -20,23 +20,29 @@ impl<T: Wait> Wait for &mut T {
}
}
-/// An interface for queueing up an orphaned process so that it can be reaped.
-pub(crate) trait OrphanQueue<T> {
- /// Adds an orphan to the queue.
- fn push_orphan(&self, orphan: T);
+/// An interface for reaping a set of orphaned processes.
+pub(crate) trait ReapOrphanQueue {
/// Attempts to reap every process in the queue, ignoring any errors and
/// enqueueing any orphans which have not yet exited.
fn reap_orphans(&self);
}
+impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
+ fn reap_orphans(&self) {
+ (**self).reap_orphans()
+ }
+}
+
+/// An interface for queueing up an orphaned process so that it can be reaped.
+pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
+ /// Adds an orphan to the queue.
+ fn push_orphan(&self, orphan: T);
+}
+
impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}
-
- fn reap_orphans(&self) {
- (**self).reap_orphans()
- }
}
/// An implementation of `OrphanQueue`.
@@ -62,42 +68,62 @@ impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
fn push_orphan(&self, orphan: T) {
self.queue.lock().unwrap().push(orphan)
}
+}
+impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
fn reap_orphans(&self) {
let mut queue = self.queue.lock().unwrap();
let queue = &mut *queue;
- let mut i = 0;
- while i < queue.len() {
+ for i in (0..queue.len()).rev() {
match queue[i].try_wait() {
- Ok(Some(_)) => {}
- Err(_) => {
- // TODO: bubble up error some how. Is this an internal bug?
- // Shoudl we panic? Is it OK for this to be silently
- // dropped?
- }
- // Still not done yet
- Ok(None) => {
- i += 1;
- continue;
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
}
}
-
- queue.remove(i);
}
}
}
#[cfg(all(test, not(loom)))]
-mod test {
- use super::Wait;
- use super::{OrphanQueue, OrphanQueueImpl};
- use std::cell::Cell;
+pub(crate) mod test {
+ use super::*;
+ use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::rc::Rc;
+ pub(crate) struct MockQueue<W> {
+ pub(crate) all_enqueued: RefCell<Vec<W>>,
+ pub(crate) total_reaps: Cell<usize>,
+ }
+
+ impl<W> MockQueue<W> {
+ pub(crate) fn new() -> Self {
+ Self {
+ all_enqueued: RefCell::new(Vec::new()),
+ total_reaps: Cell::new(0),
+ }
+ }
+ }
+
+ impl<W> OrphanQueue<W> for MockQueue<W> {
+ fn push_orphan(&self, orphan: W) {
+ self.all_enqueued.borrow_mut().push(orphan);
+ }
+ }
+
+ impl<W> ReapOrphanQueue for MockQueue<W> {
+ fn reap_orphans(&self) {
+ self.total_reaps.set(self.total_reaps.get() + 1);
+ }
+ }
+
struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs
index c51a20b9..de483c44 100644
--- a/tokio/src/process/unix/reap.rs
+++ b/tokio/src/process/unix/reap.rs
@@ -1,6 +1,6 @@
use crate::process::imp::orphan::{OrphanQueue, Wait};
use crate::process::kill::Kill;
-use crate::signal::unix::Signal;
+use crate::signal::unix::InternalStream;
use std::future::Future;
use std::io;
@@ -23,17 +23,6 @@ where
signal: S,
}
-// Work around removal of `futures_core` dependency
-pub(crate) trait Stream: Unpin {
- fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
-}
-
-impl Stream for Signal {
- fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- Signal::poll_recv(self, cx)
- }
-}
-
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
@@ -72,7 +61,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
- S: Stream,
+ S: InternalStream,
{
type Output = io::Result<ExitStatus>;
@@ -80,10 +69,8 @@ where
loop {
// If the child hasn't exited yet, then it's our responsibility to
// ensure the current task gets notified when it might be able to
- // make progress.
- //
- // As described in `spawn` above, we just indicate that we can
- // next make progress once a SIGCHLD is received.
+ // make progress. We can use the delivery of a SIGCHLD signal as a
+ // sign that we can potentially make progress.
//
// However, we will register for a notification on the next signal
// BEFORE we poll the child. Otherwise it is possible that the child
@@ -99,7 +86,6 @@ where
// should not cause significant issues with parent futures.
let registered_interest = self.signal.poll_recv(cx).is_pending();
- self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
return Poll::Ready(Ok(status));
}
@@ -147,8 +133,9 @@ where
mod test {
use super::*;
+ use crate::process::unix::orphan::test::MockQueue;
+ use crate::sync::mpsc::error::TryRecvError;
use futures::future::FutureExt;
- use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::task::Context;
@@ -211,7 +198,7 @@ mod test {
}
}
- impl Stream for MockStream {
+ impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
self.total_polls += 1;
match self.values.remove(0) {
@@ -219,29 +206,9 @@ mod test {
None => Poll::Pending,
}
}
- }
- struct MockQueue<W> {
- all_enqueued: RefCell<Vec<W>>,
- total_reaps: Cell<usize>,
- }
-
- impl<W> MockQueue<W> {
- fn new() -> Self {
- Self {
- all_enqueued: RefCell::new(Vec::new()),
- total_reaps: Cell::new(0),
- }
- }
- }
-
- impl<W: Wait> OrphanQueue<W> for MockQueue<W> {
- fn push_orphan(&self, orphan: W) {
- self.all_enqueued.borrow_mut().push(orphan);
- }
-
- fn reap_orphans(&self) {
- self.total_reaps.set(self.total_reaps.get() + 1);
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ unimplemented!();
}
}
@@ -262,7 +229,7 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(1, grim.signal.total_polls);
assert_eq!(1, grim.total_waits);
- assert_eq!(1, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Not yet exited, couldn't register interest the first time
@@ -270,7 +237,7 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(3, grim.signal.total_polls);
assert_eq!(3, grim.total_waits);
- assert_eq!(3, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Exited
@@ -283,7 +250,7 @@ mod test {
}
assert_eq!(4, grim.signal.total_polls);
assert_eq!(4, grim.total_waits);
- assert_eq!(4, grim.orphan_queue.total_reaps.get());
+ assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
}
diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs
index 148e0b16..af8e17a3 100644
--- a/tokio/src/runtime/driver.rs
+++ b/tokio/src/runtime/driver.rs
@@ -38,6 +38,7 @@ cfg_not_io_driver! {
}
// ===== signal driver =====
+
macro_rules! cfg_signal_internal_and_unix {
($($item:item)*) => {
#[cfg(unix)]
@@ -73,10 +74,37 @@ cfg_not_signal_internal! {
}
}
+// ===== process driver =====
+
+cfg_process_driver! {
+ type ProcessDriver = crate::park::Either<crate::process::unix::driver::Driver, SignalDriver>;
+
+ fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
+ use crate::park::Either;
+
+ // Enable the signal driver if IO is also enabled
+ match signal_driver {
+ Either::A(signal_driver) => {
+ let driver = crate::process::unix::driver::Driver::new(signal_driver)?;
+ Ok(Either::A(driver))
+ }
+ Either::B(_) => Ok(Either::B(signal_driver)),
+ }
+ }
+}
+
+cfg_not_process_driver! {
+ type ProcessDriver = SignalDriver;
+
+ fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
+ Ok(signal_driver)
+ }
+}
+
// ===== time driver =====
cfg_time! {
- type TimeDriver = crate::park::Either<crate::time::driver::Driver<SignalDriver>, SignalDriver>;
+ type TimeDriver = crate::park::Either<crate::time::driver::Driver<ProcessDriver>, ProcessDriver>;
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
@@ -87,24 +115,24 @@ cfg_time! {
fn create_time_driver(
enable: bool,
- signal_driver: SignalDriver,
+ process_driver: ProcessDriver,
clock: Clock,
) -> (TimeDriver, TimeHandle) {
use crate::park::Either;
if enable {
- let driver = crate::time::driver::Driver::new(signal_driver, clock);
+ let driver = crate::time::driver::Driver::new(process_driver, clock);
let handle = driver.handle();
(Either::A(driver), Some(handle))
} else {
- (Either::B(signal_driver), None)
+ (Either::B(process_driver), None)
}
}
}
cfg_not_time! {
- type TimeDriver = SignalDriver;
+ type TimeDriver = ProcessDriver;
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
@@ -115,10 +143,10 @@ cfg_not_time! {
fn create_time_driver(
_enable: bool,
- signal_driver: SignalDriver,
+ process_driver: ProcessDriver,
_clock: Clock,
) -> (TimeDriver, TimeHandle) {
- (signal_driver, ())
+ (process_driver, ())
}
}
@@ -147,8 +175,9 @@ impl Driver {
let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
+ let process_driver = create_process_driver(signal_driver)?;
let (time_driver, time_handle) =
- create_time_driver(cfg.enable_time, signal_driver, clock.clone());
+ create_time_driver(cfg.enable_time, process_driver, clock.clone());
Ok((
Self { inner: time_driver },
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index 77f300bb..aaaa75ed 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -6,6 +6,7 @@
#![cfg(unix)]
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
+use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::{channel, Receiver};
use libc::c_int;
@@ -17,6 +18,7 @@ use std::sync::Once;
use std::task::{Context, Poll};
pub(crate) mod driver;
+use self::driver::Handle;
pub(crate) type OsStorage = Vec<SignalInfo>;
@@ -220,7 +222,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) {
///
/// This will register the signal handler if it hasn't already been registered,
/// returning any error along the way if that fails.
-fn signal_enable(signal: c_int) -> io::Result<()> {
+fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> {
if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
return Err(Error::new(
ErrorKind::Other,
@@ -229,7 +231,7 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
// Check that we have a signal driver running
- driver::Handle::current().check_inner()?;
+ handle.check_inner()?;
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
@@ -349,10 +351,14 @@ pub struct Signal {
/// * If the signal is one of
/// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics)
pub fn signal(kind: SignalKind) -> io::Result<Signal> {
+ signal_with_handle(kind, Handle::current())
+}
+
+pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result<Signal> {
let signal = kind.0;
// Turn the signal delivery on once we are ready for it
- signal_enable(signal)?;
+ signal_enable(signal, handle)?;
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
@@ -394,6 +400,11 @@ impl Signal {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
+
+ /// Try to receive a signal notification without blocking or registering a waker.
+ pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.rx.try_recv()
+ }
}
cfg_stream! {
@@ -406,6 +417,22 @@ cfg_stream! {
}
}
+// Work around for abstracting streams internally
+pub(crate) trait InternalStream: Unpin {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
+ fn try_recv(&mut self) -> Result<(), TryRecvError>;
+}
+
+impl InternalStream for Signal {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.poll_recv(cx)
+ }
+
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.try_recv()
+ }
+}
+
pub(crate) fn ctrl_c() -> io::Result<Signal> {
signal(SignalKind::interrupt())
}
@@ -416,11 +443,11 @@ mod tests {
#[test]
fn signal_enable_error_on_invalid_input() {
- signal_enable(-1).unwrap_err();
+ signal_enable(-1, Handle::default()).unwrap_err();
}
#[test]
fn signal_enable_error_on_forbidden_input() {
- signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err();
+ signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err();
}
}
diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs
index ae60c22f..d0615312 100644
--- a/tokio/src/signal/unix/driver.rs
+++ b/tokio/src/signal/unix/driver.rs
@@ -30,7 +30,7 @@ pub(crate) struct Driver {
inner: Arc<Inner>,
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Default)]
pub(crate) struct Handle {
inner: Weak<Inner>,
}