diff options
Diffstat (limited to 'tokio/src/signal/registry.rs')
-rw-r--r-- | tokio/src/signal/registry.rs | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index d608539c..0e017965 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> { /// /// Returns true if an event was delivered to at least one listener. fn broadcast(&self) -> bool { + use crate::sync::mpsc::error::TrySendError; + let mut did_notify = false; self.storage.for_each(|event_info| { // Any signal of this kind arrived since we checked last? @@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> { for i in (0..recipients.len()).rev() { match recipients[i].try_send(()) { Ok(()) => did_notify = true, - Err(ref e) if e.is_closed() => { + Err(TrySendError::Closed(..)) => { recipients.swap_remove(i); } // Channel is full, ignore the error since the // receiver has already been woken up - Err(e) => { - // Sanity check in case this error type ever gets - // additional variants we have not considered. - debug_assert!(e.is_full()); - } + Err(_) => {} } } }); @@ -180,7 +178,8 @@ mod tests { use super::*; use crate::runtime::{self, Runtime}; use crate::sync::{mpsc, oneshot}; - use futures::{future, StreamExt}; + + use futures::future; #[test] fn smoke() { @@ -220,11 +219,7 @@ mod tests { }); let _ = fire.send(()); - let all = future::join3( - first_rx.collect::<Vec<_>>(), - second_rx.collect::<Vec<_>>(), - third_rx.collect::<Vec<_>>(), - ); + let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx)); let (first_results, second_results, third_results) = all.await; assert_eq!(2, first_results.len()); @@ -279,7 +274,7 @@ mod tests { }); let _ = fire.send(()); - let results: Vec<()> = third_rx.collect().await; + let results = collect(third_rx).await; assert_eq!(1, results.len()); }); @@ -311,4 +306,14 @@ mod tests { fn rt() -> Runtime { runtime::Builder::new().current_thread().build().unwrap() } + + async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> { + let mut ret = vec![]; + + while let Some(v) = rx.recv().await { + ret.push(v); + } + + ret + } } |