summaryrefslogtreecommitdiffstats
path: root/tokio/src/signal/registry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/signal/registry.rs')
-rw-r--r--tokio/src/signal/registry.rs31
1 files changed, 18 insertions, 13 deletions
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs
index d608539c..0e017965 100644
--- a/tokio/src/signal/registry.rs
+++ b/tokio/src/signal/registry.rs
@@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> {
///
/// Returns true if an event was delivered to at least one listener.
fn broadcast(&self) -> bool {
+ use crate::sync::mpsc::error::TrySendError;
+
let mut did_notify = false;
self.storage.for_each(|event_info| {
// Any signal of this kind arrived since we checked last?
@@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> {
for i in (0..recipients.len()).rev() {
match recipients[i].try_send(()) {
Ok(()) => did_notify = true,
- Err(ref e) if e.is_closed() => {
+ Err(TrySendError::Closed(..)) => {
recipients.swap_remove(i);
}
// Channel is full, ignore the error since the
// receiver has already been woken up
- Err(e) => {
- // Sanity check in case this error type ever gets
- // additional variants we have not considered.
- debug_assert!(e.is_full());
- }
+ Err(_) => {}
}
}
});
@@ -180,7 +178,8 @@ mod tests {
use super::*;
use crate::runtime::{self, Runtime};
use crate::sync::{mpsc, oneshot};
- use futures::{future, StreamExt};
+
+ use futures::future;
#[test]
fn smoke() {
@@ -220,11 +219,7 @@ mod tests {
});
let _ = fire.send(());
- let all = future::join3(
- first_rx.collect::<Vec<_>>(),
- second_rx.collect::<Vec<_>>(),
- third_rx.collect::<Vec<_>>(),
- );
+ let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx));
let (first_results, second_results, third_results) = all.await;
assert_eq!(2, first_results.len());
@@ -279,7 +274,7 @@ mod tests {
});
let _ = fire.send(());
- let results: Vec<()> = third_rx.collect().await;
+ let results = collect(third_rx).await;
assert_eq!(1, results.len());
});
@@ -311,4 +306,14 @@ mod tests {
fn rt() -> Runtime {
runtime::Builder::new().current_thread().build().unwrap()
}
+
+ async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {
+ let mut ret = vec![];
+
+ while let Some(v) = rx.recv().await {
+ ret.push(v);
+ }
+
+ ret
+ }
}