diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 16:33:52 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 16:33:52 -0800 |
commit | 7c3f1cb4a3d6076cb5e1aedf2311f62c8a7a2fd7 (patch) | |
tree | 0d0a2eb5d8a60438b14fa191ddc933f1a1fbe15a /tokio/tests/stream_chain.rs | |
parent | 64d23899118dfc8f1d4d7a9b60c015e43260df80 (diff) |
stream: add `StreamExt::chain` (#2093)
Asynchronous equivalent to `Iterator::chain`.
Diffstat (limited to 'tokio/tests/stream_chain.rs')
-rw-r--r-- | tokio/tests/stream_chain.rs | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/tokio/tests/stream_chain.rs b/tokio/tests/stream_chain.rs new file mode 100644 index 00000000..0e14618b --- /dev/null +++ b/tokio/tests/stream_chain.rs @@ -0,0 +1,71 @@ +use tokio::stream::{self, Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_test::{assert_pending, assert_ready, task}; + +#[tokio::test] +async fn basic_usage() { + let one = stream::iter(vec![1, 2, 3]); + let two = stream::iter(vec![4, 5, 6]); + + let mut stream = one.chain(two); + + assert_eq!(stream.size_hint(), (6, Some(6))); + assert_eq!(stream.next().await, Some(1)); + + assert_eq!(stream.size_hint(), (5, Some(5))); + assert_eq!(stream.next().await, Some(2)); + + assert_eq!(stream.size_hint(), (4, Some(4))); + assert_eq!(stream.next().await, Some(3)); + + assert_eq!(stream.size_hint(), (3, Some(3))); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (2, Some(2))); + assert_eq!(stream.next().await, Some(5)); + + assert_eq!(stream.size_hint(), (1, Some(1))); + assert_eq!(stream.next().await, Some(6)); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); +} + +#[tokio::test] +async fn pending_first() { + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + let mut stream = task::spawn(rx1.chain(rx2)); + assert_eq!(stream.size_hint(), (0, None)); + + assert_pending!(stream.poll_next()); + + tx2.send(2).unwrap(); + assert!(!stream.is_woken()); + + assert_pending!(stream.poll_next()); + + tx1.send(1).unwrap(); + assert!(stream.is_woken()); + assert_eq!(Some(1), assert_ready!(stream.poll_next())); + + assert_pending!(stream.poll_next()); + + drop(tx1); + + assert_eq!(stream.size_hint(), (0, None)); + + assert!(stream.is_woken()); + assert_eq!(Some(2), assert_ready!(stream.poll_next())); + + assert_eq!(stream.size_hint(), (0, None)); + + drop(tx2); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(None, assert_ready!(stream.poll_next())); +} |