diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/signal/registry.rs | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/signal/registry.rs')
-rw-r--r-- | tokio/src/signal/registry.rs | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index d608539c..0e017965 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -87,6 +87,8 @@ impl<S: Storage> Registry<S> { /// /// Returns true if an event was delivered to at least one listener. fn broadcast(&self) -> bool { + use crate::sync::mpsc::error::TrySendError; + let mut did_notify = false; self.storage.for_each(|event_info| { // Any signal of this kind arrived since we checked last? @@ -103,17 +105,13 @@ impl<S: Storage> Registry<S> { for i in (0..recipients.len()).rev() { match recipients[i].try_send(()) { Ok(()) => did_notify = true, - Err(ref e) if e.is_closed() => { + Err(TrySendError::Closed(..)) => { recipients.swap_remove(i); } // Channel is full, ignore the error since the // receiver has already been woken up - Err(e) => { - // Sanity check in case this error type ever gets - // additional variants we have not considered. - debug_assert!(e.is_full()); - } + Err(_) => {} } } }); @@ -180,7 +178,8 @@ mod tests { use super::*; use crate::runtime::{self, Runtime}; use crate::sync::{mpsc, oneshot}; - use futures::{future, StreamExt}; + + use futures::future; #[test] fn smoke() { @@ -220,11 +219,7 @@ mod tests { }); let _ = fire.send(()); - let all = future::join3( - first_rx.collect::<Vec<_>>(), - second_rx.collect::<Vec<_>>(), - third_rx.collect::<Vec<_>>(), - ); + let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx)); let (first_results, second_results, third_results) = all.await; assert_eq!(2, first_results.len()); @@ -279,7 +274,7 @@ mod tests { }); let _ = fire.send(()); - let results: Vec<()> = third_rx.collect().await; + let results = collect(third_rx).await; assert_eq!(1, results.len()); }); @@ -311,4 +306,14 @@ mod tests { fn rt() -> Runtime { runtime::Builder::new().current_thread().build().unwrap() } + + async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> { + let mut ret = vec![]; + + while let Some(v) = rx.recv().await { + ret.push(v); + } + + ret + } } |