summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-21 08:27:14 +0300
committerCarl Lerche <me@carllerche.com>2019-12-20 21:27:14 -0800
commit3dcd76a38feff3188e1a94f39c4f5feaaa5bdf61 (patch)
tree65c575c5afe5af446a5cc3def95ff8f7a6f89938 /tokio/src/stream
parent3b9c7b1715777b6db69d420c1530aa845e6306c3 (diff)
stream: StreamExt::try_next (#2005)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/mod.rs32
-rw-r--r--tokio/src/stream/try_next.rs30
2 files changed, 62 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index 329ee8a9..eecc7a7c 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -19,6 +19,9 @@ use map::Map;
mod next;
use next::Next;
+mod try_next;
+use try_next::TryNext;
+
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
@@ -61,6 +64,35 @@ pub trait StreamExt: Stream {
Next::new(self)
}
+ /// Creates a future that attempts to resolve the next item in the stream.
+ /// If an error is encountered before the next item, the error is returned instead.
+ ///
+ /// This is similar to the [`next`](StreamExt::next) combinator,
+ /// but returns a [`Result<Option<T>, E>`](Result) rather than
+ /// an [`Option<Result<T, E>>`](Option), making for easy use
+ /// with the [`?`](std::ops::Try) operator.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(stream.try_next().await, Err("nope"));
+ /// # }
+ /// ```
+ fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
+ where
+ Self: Stream<Item = Result<T, E>> + Unpin,
+ {
+ TryNext::new(self)
+ }
+
/// Maps this stream's items to a different type, returning a new stream of
/// the resulting type.
///
diff --git a/tokio/src/stream/try_next.rs b/tokio/src/stream/try_next.rs
new file mode 100644
index 00000000..ade5ecf0
--- /dev/null
+++ b/tokio/src/stream/try_next.rs
@@ -0,0 +1,30 @@
+use crate::stream::{Next, Stream};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Future for the [`try_next`](super::StreamExt::try_next) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct TryNext<'a, St: ?Sized> {
+ inner: Next<'a, St>,
+}
+
+impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {}
+
+impl<'a, St: ?Sized + Stream + Unpin> TryNext<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Self {
+ inner: Next::new(stream),
+ }
+ }
+}
+
+impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> {
+ type Output = Result<Option<T>, E>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.inner).poll(cx).map(Option::transpose)
+ }
+}