1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
//! Stream utilities for Tokio.
//!
//! `Stream`s are an asynchoronous version of standard library's Iterator.
//!
//! This module provides helpers to work with them.
mod iter;
pub use iter::{iter, Iter};
mod map;
use map::Map;
mod next;
use next::Next;
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
/// combinator functions.
pub trait StreamExt: Stream {
/// Creates a future that resolves to the next item in the stream.
///
/// Equivalent to:
///
/// ```ignore
/// async fn next(&mut self) -> Option<Self::Item>;
/// ```
///
/// Note that because `next` doesn't take ownership over the stream,
/// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
/// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
/// be done by boxing the stream using [`Box::pin`] or
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=3);
///
/// assert_eq!(stream.next().await, Some(1));
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, Some(3));
/// assert_eq!(stream.next().await, None);
/// # }
/// ```
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next::new(self)
}
/// Maps this stream's items to a different type, returning a new stream of
/// the resulting type.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available. It is executed inline with calls to
/// [`poll_next`](Stream::poll_next).
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=3);
/// let mut stream = stream.map(|x| x + 3);
///
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// # }
/// ```
fn map<T, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> T,
Self: Sized,
{
Map::new(self, f)
}
}
impl<T: ?Sized> StreamExt for T where T: Stream {}
|