diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-31 21:18:11 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-31 21:18:11 -0800 |
commit | ab24a655adc1eb0d0c6951d5df2b815c671ac7d2 (patch) | |
tree | f32a689920d4d779fc2c30d87609e279847a291a /tokio/src/util | |
parent | c3d56b85c318c3cdc164558c722b9440d443dcea (diff) |
stream: provide `StreamMap` utility (#2185)
`StreamMap` is similar to `StreamExt::merge` in that it combines source
streams into a single merged stream that yields values in the order that
they arrive from the source streams. However, `StreamMap` has a lot more
flexibility in usage patterns.
`StreamMap` can:
- Merge an arbitrary number of streams.
- Track which source stream the value was received from.
- Handle inserting and removing streams from the set of managed streams
at any point during iteration.
All source streams held by `StreamMap` are indexed using a key. This key
is included with the value when a source stream yields a value. The key
is also used to remove the stream from the `StreamMap` before the stream
has completed streaming.
Because the `StreamMap` API moves streams during runtime, both streams
and keys must be `Unpin`. In order to insert a `!Unpin` stream into a
`StreamMap`, use `pin!` to pin the stream to the stack or `Box::pin` to
pin the stream in the heap.
Diffstat (limited to 'tokio/src/util')
-rw-r--r-- | tokio/src/util/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/util/rand.rs | 13 |
2 files changed, 10 insertions, 11 deletions
diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index fa904c3d..cd5b151d 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,7 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "rt-threaded", feature = "macros"))] +#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] mod rand; cfg_rt_threaded! { @@ -16,6 +16,6 @@ cfg_rt_threaded! { pub(crate) use try_lock::TryLock; } -cfg_macros! { - pub use rand::thread_rng_n; -} +#[cfg(any(feature = "macros", feature = "stream"))] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub use rand::thread_rng_n; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 101f5bb6..4b72b4b1 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -51,15 +51,14 @@ impl FastRand { } } -// Used by the select macro -cfg_macros! { +// Used by the select macro and `StreamMap` +#[cfg(any(feature = "macros", feature = "stream"))] +#[doc(hidden)] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub fn thread_rng_n(n: u32) -> u32 { thread_local! { static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); } - // Used by macros - #[doc(hidden)] - pub fn thread_rng_n(n: u32) -> u32 { - THREAD_RNG.with(|rng| rng.fastrand_n(n)) - } + THREAD_RNG.with(|rng| rng.fastrand_n(n)) } |