path: root/tokio/src
diff options
authorCarl Lerche <>2020-01-31 21:18:11 -0800
committerGitHub <>2020-01-31 21:18:11 -0800
commitab24a655adc1eb0d0c6951d5df2b815c671ac7d2 (patch)
treef32a689920d4d779fc2c30d87609e279847a291a /tokio/src
parentc3d56b85c318c3cdc164558c722b9440d443dcea (diff)
stream: provide `StreamMap` utility (#2185)
`StreamMap` is similar to `StreamExt::merge` in that it combines source streams into a single merged stream that yields values in the order that they arrive from the source streams. However, `StreamMap` has a lot more flexibility in usage patterns. `StreamMap` can: - Merge an arbitrary number of streams. - Track which source stream the value was received from. - Handle inserting and removing streams from the set of managed streams at any point during iteration. All source streams held by `StreamMap` are indexed using a key. This key is included with the value when a source stream yields a value. The key is also used to remove the stream from the `StreamMap` before the stream has completed streaming. Because the `StreamMap` API moves streams during runtime, both streams and keys must be `Unpin`. In order to insert a `!Unpin` stream into a `StreamMap`, use `pin!` to pin the stream to the stack or `Box::pin` to pin the stream in the heap.
Diffstat (limited to 'tokio/src')
4 files changed, 516 insertions, 11 deletions
diff --git a/tokio/src/stream/ b/tokio/src/stream/
index 82771eee..3cc7e68f 100644
--- a/tokio/src/stream/
+++ b/tokio/src/stream/
@@ -50,6 +50,9 @@ pub use once::{once, Once};
mod pending;
pub use pending::{pending, Pending};
+mod stream_map;
+pub use stream_map::StreamMap;
mod try_next;
use try_next::TryNext;
diff --git a/tokio/src/stream/ b/tokio/src/stream/
new file mode 100644
index 00000000..2f60ea4d
--- /dev/null
+++ b/tokio/src/stream/
@@ -0,0 +1,503 @@
+use crate::stream::Stream;
+use std::borrow::Borrow;
+use std::hash::Hash;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+/// Combine many streams into one, indexing each source stream with a unique
+/// key.
+/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
+/// streams into a single merged stream that yields values in the order that
+/// they arrive from the source streams. However, `StreamMap` has a lot more
+/// flexibility in usage patterns.
+/// `StreamMap` can:
+/// * Merge an arbitrary number of streams.
+/// * Track which source stream the value was received from.
+/// * Handle inserting and removing streams from the set of managed streams at
+/// any point during iteration.
+/// All source streams held by `StreamMap` are indexed using a key. This key is
+/// included with the value when a source stream yields a value. The key is also
+/// used to remove the stream from the `StreamMap` before the stream has
+/// completed streaming.
+/// # `Unpin`
+/// Because the `StreamMap` API moves streams during runtime, both streams and
+/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
+/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
+/// pin the stream in the heap.
+/// # Implementation
+/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
+/// internal implementation detail will persist in future versions, but it is
+/// important to know the runtime implications. In general, `StreamMap` works
+/// best with a "smallish" number of streams as all entries are scanned on
+/// insert, remove, and polling. In cases where a large number of streams need
+/// to be merged, it may be advisable to use tasks sending values on a shared
+/// [`mpsc`] channel.
+/// [`StreamExt::merge`]: crate::stream::StreamExt::merge
+/// [`mpsc`]: crate::sync::mpsc
+/// [`pin!`]: macro@pin
+/// [`Box::pin`]: std::boxed::Box::pin
+/// # Examples
+/// Merging two streams, then remove them after receiving the first value
+/// ```
+/// use tokio::stream::{StreamExt, StreamMap};
+/// use tokio::sync::mpsc;
+/// #[tokio::main]
+/// async fn main() {
+/// let (mut tx1, rx1) = mpsc::channel(10);
+/// let (mut tx2, rx2) = mpsc::channel(10);
+/// tokio::spawn(async move {
+/// tx1.send(1).await.unwrap();
+/// // This value will never be received. The send may or may not return
+/// // `Err` depending on if the remote end closed first or not.
+/// let _ = tx1.send(2).await;
+/// });
+/// tokio::spawn(async move {
+/// tx2.send(3).await.unwrap();
+/// let _ = tx2.send(4).await;
+/// });
+/// let mut map = StreamMap::new();
+/// // Insert both streams
+/// map.insert("one", rx1);
+/// map.insert("two", rx2);
+/// // Read twice
+/// for _ in 0..2 {
+/// let (key, val) =;
+/// if key == "one" {
+/// assert_eq!(val, 1);
+/// } else {
+/// assert_eq!(val, 3);
+/// }
+/// // Remove the stream to prevent reading the next value
+/// map.remove(key);
+/// }
+/// }
+/// ```
+/// This example models a read-only client to a chat system with channels. The
+/// client sends commands to join and leave channels. `StreamMap` is used to
+/// manage active channel subscriptions.
+/// For simplicity, messages are displayed with `println!`, but they could be
+/// sent to the client over a socket.
+/// ```no_run
+/// use tokio::stream::{Stream, StreamExt, StreamMap};
+/// enum Command {
+/// Join(String),
+/// Leave(String),
+/// }
+/// fn commands() -> impl Stream<Item = Command> {
+/// // Streams in user commands by parsing `stdin`.
+/// # tokio::stream::pending()
+/// }
+/// // Join a channel, returns a stream of messages received on the channel.
+/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
+/// // left as an exercise to the reader
+/// # tokio::stream::pending()
+/// }
+/// #[tokio::main]
+/// async fn main() {
+/// let mut channels = StreamMap::new();
+/// // Input commands (join / leave channels).
+/// let cmds = commands();
+/// tokio::pin!(cmds);
+/// loop {
+/// tokio::select! {
+/// Some(cmd) = => {
+/// match cmd {
+/// Command::Join(chan) => {
+/// // Join the channel and add it to the `channels`
+/// // stream map
+/// let msgs = join(&chan);
+/// channels.insert(chan, msgs);
+/// }
+/// Command::Leave(chan) => {
+/// channels.remove(&chan);
+/// }
+/// }
+/// }
+/// Some((chan, msg)) = => {
+/// // Received a message, display it on stdout with the channel
+/// // it originated from.
+/// println!("{}: {}", chan, msg);
+/// }
+/// // Both the `commands` stream and the `channels` stream are
+/// // complete. There is no more work to do, so leave the loop.
+/// else => break,
+/// }
+/// }
+/// }
+/// ```
+#[derive(Debug, Default)]
+pub struct StreamMap<K, V> {
+ /// Streams stored in the map
+ entries: Vec<(K, V)>,
+impl<K, V> StreamMap<K, V> {
+ /// Creates an empty `StreamMap`.
+ ///
+ /// The stream map is initially created with a capacity of `0`, so it will
+ /// not allocate until it is first inserted into.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
+ /// ```
+ pub fn new() -> StreamMap<K, V> {
+ StreamMap { entries: vec![] }
+ }
+ /// Creates an empty `StreamMap` with the specified capacity.
+ ///
+ /// The stream map will be able to hold at least `capacity` elements without
+ /// reallocating. If `capacity` is 0, the stream map will not allocate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
+ /// ```
+ pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
+ StreamMap {
+ entries: Vec::with_capacity(capacity),
+ }
+ }
+ /// Returns an iterator visiting all keys in arbitrary order.
+ ///
+ /// The iterator element type is &'a K.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for key in map.keys() {
+ /// println!("{}", key);
+ /// }
+ /// ```
+ pub fn keys(&self) -> impl Iterator<Item = &K> {
+ self.entries.iter().map(|(k, _)| k)
+ }
+ /// An iterator visiting all values in arbitrary order.
+ ///
+ /// The iterator element type is &'a V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values(&self) -> impl Iterator<Item = &V> {
+ self.entries.iter().map(|(_, v)| v)
+ }
+ /// An iterator visiting all values mutably in arbitrary order.
+ ///
+ /// The iterator element type is &'a mut V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values_mut() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
+ self.entries.iter_mut().map(|(_, v)| v)
+ }
+ /// Returns the number of streams the map can hold without reallocating.
+ ///
+ /// This number is a lower bound; the `StreamMap` might be able to hold
+ /// more, but is guaranteed to be able to hold at least this many.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
+ /// assert!(map.capacity() >= 100);
+ /// ```
+ pub fn capacity(&self) -> usize {
+ self.entries.capacity()
+ }
+ /// Returns the number of streams in the map.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// assert_eq!(a.len(), 0);
+ /// a.insert(1, pending::<i32>());
+ /// assert_eq!(a.len(), 1);
+ /// ```
+ pub fn len(&self) -> usize {
+ self.entries.len()
+ }
+ /// Returns `true` if the map contains no elements.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::collections::HashMap;
+ ///
+ /// let mut a = HashMap::new();
+ /// assert!(a.is_empty());
+ /// a.insert(1, "a");
+ /// assert!(!a.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.entries.is_empty()
+ }
+ /// Clears the map, removing all key-stream pairs. Keeps the allocated
+ /// memory for reuse.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// a.insert(1, pending::<i32>());
+ /// a.clear();
+ /// assert!(a.is_empty());
+ /// ```
+ pub fn clear(&mut self) {
+ self.entries.clear();
+ }
+ /// Insert a key-stream pair into the map.
+ ///
+ /// If the map did not have this key present, `None` is returned.
+ ///
+ /// If the map did have this key present, the new `stream` replaces the old
+ /// one and the old stream is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// assert!(map.insert(37, pending::<i32>()).is_none());
+ /// assert!(!map.is_empty());
+ ///
+ /// map.insert(37, pending());
+ /// assert!(map.insert(37, pending()).is_some());
+ /// ```
+ pub fn insert(&mut self, k: K, stream: V) -> Option<V>
+ where
+ K: Hash + Eq,
+ {
+ let ret = self.remove(&k);
+ self.entries.push((k, stream));
+ ret
+ }
+ /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert!(map.remove(&1).is_some());
+ /// assert!(map.remove(&1).is_none());
+ /// ```
+ pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return Some(self.entries.swap_remove(i).1);
+ }
+ }
+ None
+ }
+ /// Returns `true` if the map contains a stream for the specified key.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert_eq!(map.contains_key(&1), true);
+ /// assert_eq!(map.contains_key(&2), false);
+ /// ```
+ pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return true;
+ }
+ }
+ false
+ }
+impl<K, V> StreamMap<K, V>
+ K: Unpin,
+ V: Stream + Unpin,
+ /// Polls the next value, includes the vec entry index
+ fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
+ use Poll::*;
+ let start = crate::util::thread_rng_n(self.entries.len() as u32) as usize;
+ let mut idx = start;
+ for _ in 0..self.entries.len() {
+ let (_, stream) = &mut self.entries[idx];
+ match Pin::new(stream).poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some((idx, val))),
+ Ready(None) => {
+ // Remove the entry
+ self.entries.swap_remove(idx);
+ // Check if this was the last entry, if so the cursor needs
+ // to wrap
+ if idx == self.entries.len() {
+ idx = 0;
+ } else if idx < start && start <= self.entries.len() {
+ // The stream being swapped into the current index has
+ // already been polled, so skip it.
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ Pending => {
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ }
+ // If the map is empty, then the stream is complete.
+ if self.entries.is_empty() {
+ Ready(None)
+ } else {
+ Pending
+ }
+ }
+impl<K, V> Stream for StreamMap<K, V>
+ K: Clone + Unpin,
+ V: Stream + Unpin,
+ type Item = (K, V::Item);
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
+ let key = self.entries[idx].0.clone();
+ Poll::Ready(Some((key, val)))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let mut ret = (0, Some(0));
+ for (_, stream) in &self.entries {
+ let hint = stream.size_hint();
+ ret.0 += hint.0;
+ match (ret.1, hint.1) {
+ (Some(a), Some(b)) => ret.1 = Some(a + b),
+ (Some(_), None) => ret.1 = None,
+ _ => {}
+ }
+ }
+ ret
+ }
diff --git a/tokio/src/util/ b/tokio/src/util/
index fa904c3d..cd5b151d 100644
--- a/tokio/src/util/
+++ b/tokio/src/util/
@@ -3,7 +3,7 @@ cfg_io_driver! {
pub(crate) mod slab;
-#[cfg(any(feature = "rt-threaded", feature = "macros"))]
+#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
mod rand;
cfg_rt_threaded! {
@@ -16,6 +16,6 @@ cfg_rt_threaded! {
pub(crate) use try_lock::TryLock;
-cfg_macros! {
- pub use rand::thread_rng_n;
+#[cfg(any(feature = "macros", feature = "stream"))]
+#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
+pub use rand::thread_rng_n;
diff --git a/tokio/src/util/ b/tokio/src/util/
index 101f5bb6..4b72b4b1 100644
--- a/tokio/src/util/
+++ b/tokio/src/util/
@@ -51,15 +51,14 @@ impl FastRand {
-// Used by the select macro
-cfg_macros! {
+// Used by the select macro and `StreamMap`
+#[cfg(any(feature = "macros", feature = "stream"))]
+#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
+pub fn thread_rng_n(n: u32) -> u32 {
thread_local! {
static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed());
- // Used by macros
- #[doc(hidden)]
- pub fn thread_rng_n(n: u32) -> u32 {
- THREAD_RNG.with(|rng| rng.fastrand_n(n))
- }
+ THREAD_RNG.with(|rng| rng.fastrand_n(n))