diff options
author | Carl Lerche <me@carllerche.com> | 2019-03-29 12:26:13 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-29 12:26:13 -0700 |
commit | 824b7b675990baab87fa3df347fa4a5491f6809b (patch) | |
tree | a0f3ef0aaa5cf3469e20379239fb4a8e878aecfb | |
parent | cb91dd274af0a7486177c824b9744d28b43849b9 (diff) |
buf: stream and iter helpers (#1011)
-rw-r--r-- | tokio-buf/Cargo.toml | 3 | ||||
-rw-r--r-- | tokio-buf/src/util/iter.rs | 54 | ||||
-rw-r--r-- | tokio-buf/src/util/mod.rs | 4 | ||||
-rw-r--r-- | tokio-buf/src/util/stream.rs | 35 | ||||
-rw-r--r-- | tokio-buf/tests/chain.rs | 1 | ||||
-rw-r--r-- | tokio-buf/tests/iter.rs | 33 | ||||
-rw-r--r-- | tokio-buf/tests/limit.rs | 1 | ||||
-rw-r--r-- | tokio-buf/tests/stream.rs | 49 | ||||
-rw-r--r-- | tokio-buf/tests/string.rs | 1 | ||||
-rw-r--r-- | tokio-buf/tests/support.rs | 1 |
10 files changed, 179 insertions, 3 deletions
diff --git a/tokio-buf/Cargo.toml b/tokio-buf/Cargo.toml index fa0bc240..d41eb487 100644 --- a/tokio-buf/Cargo.toml +++ b/tokio-buf/Cargo.toml @@ -27,3 +27,6 @@ futures = "0.1.23" [features] default = ["util"] util = ["bytes/either", "either"] + +[dev-dependencies] +tokio-mock-task = "0.1.1" diff --git a/tokio-buf/src/util/iter.rs b/tokio-buf/src/util/iter.rs new file mode 100644 index 00000000..0f48546a --- /dev/null +++ b/tokio-buf/src/util/iter.rs @@ -0,0 +1,54 @@ +use bytes::Buf; +use futures::Poll; +use std::error::Error; +use std::fmt; +use BufStream; + +/// Converts an `Iterator` into a `BufStream` which is always ready to yield the +/// next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +pub fn iter<I>(i: I) -> Iter<I::IntoIter> +where + I: IntoIterator, + I::Item: Buf, +{ + Iter { + iter: i.into_iter(), + } +} + +/// `BufStream` returned by the [`iter`] function. +#[derive(Debug)] +pub struct Iter<I> { + iter: I, +} + +#[derive(Debug)] +pub enum Never {} + +impl<I> BufStream for Iter<I> +where + I: Iterator, + I::Item: Buf, +{ + type Item = I::Item; + type Error = Never; + + fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(self.iter.next().into()) + } +} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + unreachable!(); + } +} + +impl Error for Never { + fn description(&self) -> &str { + unreachable!(); + } +} diff --git a/tokio-buf/src/util/mod.rs b/tokio-buf/src/util/mod.rs index c9a0b64e..e080b8a7 100644 --- a/tokio-buf/src/util/mod.rs +++ b/tokio-buf/src/util/mod.rs @@ -3,12 +3,16 @@ mod chain; mod collect; mod from; +mod iter; mod limit; +mod stream; pub use self::chain::Chain; pub use self::collect::Collect; pub use self::from::FromBufStream; +pub use self::iter::iter; pub use self::limit::Limit; +pub use self::stream::stream; pub mod error { //! Error types diff --git a/tokio-buf/src/util/stream.rs b/tokio-buf/src/util/stream.rs new file mode 100644 index 00000000..a735fb48 --- /dev/null +++ b/tokio-buf/src/util/stream.rs @@ -0,0 +1,35 @@ +use bytes::Buf; +use futures::{Poll, Stream}; +use BufStream; + +/// Converts a `Stream` of `Buf` types into a `BufStream`. +/// +/// While `Stream` and `BufSream` are very similar, they are not identical. The +/// `stream` function returns a `BufStream` that is backed by the provided +/// `Stream` type. +pub fn stream<T>(stream: T) -> FromStream<T> +where + T: Stream, + T::Item: Buf, +{ + FromStream { stream } +} + +/// `BufStream` returned by the [`stream`] function. +#[derive(Debug)] +pub struct FromStream<T> { + stream: T, +} + +impl<T> BufStream for FromStream<T> +where + T: Stream, + T::Item: Buf, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.stream.poll() + } +} diff --git a/tokio-buf/tests/chain.rs b/tokio-buf/tests/chain.rs index fcac0280..7cc7ea70 100644 --- a/tokio-buf/tests/chain.rs +++ b/tokio-buf/tests/chain.rs @@ -4,7 +4,6 @@ extern crate bytes; extern crate futures; extern crate tokio_buf; -use bytes::Buf; use futures::Async::*; use tokio_buf::{BufStream, BufStreamExt}; diff --git a/tokio-buf/tests/iter.rs b/tokio-buf/tests/iter.rs new file mode 100644 index 00000000..365a396e --- /dev/null +++ b/tokio-buf/tests/iter.rs @@ -0,0 +1,33 @@ +extern crate bytes; +extern crate futures; +extern crate tokio_buf; + +use futures::Async::*; +use std::io::Cursor; +use tokio_buf::{util, BufStream}; + +#[macro_use] +mod support; + +type Buf = Cursor<&'static [u8]>; + +#[test] +fn empty_iter() { + let mut bs = util::iter(Vec::<Buf>::new()); + assert_none!(bs.poll_buf()); +} + +#[test] +fn full_iter() { + let bufs = vec![buf(b"one"), buf(b"two"), buf(b"three")]; + + let mut bs = util::iter(bufs); + assert_buf_eq!(bs.poll_buf(), "one"); + assert_buf_eq!(bs.poll_buf(), "two"); + assert_buf_eq!(bs.poll_buf(), "three"); + assert_none!(bs.poll_buf()); +} + +fn buf(data: &'static [u8]) -> Buf { + Cursor::new(data) +} diff --git a/tokio-buf/tests/limit.rs b/tokio-buf/tests/limit.rs index 058f2e10..1cd90921 100644 --- a/tokio-buf/tests/limit.rs +++ b/tokio-buf/tests/limit.rs @@ -4,7 +4,6 @@ extern crate bytes; extern crate futures; extern crate tokio_buf; -use bytes::Buf; use futures::Async::*; use futures::Future; use tokio_buf::{BufStream, BufStreamExt}; diff --git a/tokio-buf/tests/stream.rs b/tokio-buf/tests/stream.rs new file mode 100644 index 00000000..3491feb4 --- /dev/null +++ b/tokio-buf/tests/stream.rs @@ -0,0 +1,49 @@ +extern crate bytes; +extern crate futures; +extern crate tokio_buf; +extern crate tokio_mock_task; + +use futures::sync::mpsc; +use futures::Async::*; +use std::io::Cursor; +use tokio_buf::{util, BufStream}; +use tokio_mock_task::MockTask; + +#[macro_use] +mod support; + +type Buf = Cursor<&'static [u8]>; + +#[test] +fn empty_stream() { + let (_, rx) = mpsc::unbounded::<Buf>(); + let mut bs = util::stream(rx); + assert_none!(bs.poll_buf()); +} + +#[test] +fn full_stream() { + let (tx, rx) = mpsc::unbounded(); + let mut bs = util::stream(rx); + let mut task = MockTask::new(); + + tx.unbounded_send(buf(b"one")).unwrap(); + + assert_buf_eq!(bs.poll_buf(), "one"); + task.enter(|| assert_not_ready!(bs.poll_buf())); + + tx.unbounded_send(buf(b"two")).unwrap(); + + assert!(task.is_notified()); + assert_buf_eq!(bs.poll_buf(), "two"); + task.enter(|| assert_not_ready!(bs.poll_buf())); + + drop(tx); + + assert!(task.is_notified()); + assert_none!(bs.poll_buf()); +} + +fn buf(data: &'static [u8]) -> Buf { + Cursor::new(data) +} diff --git a/tokio-buf/tests/string.rs b/tokio-buf/tests/string.rs index b54d7d30..a99db209 100644 --- a/tokio-buf/tests/string.rs +++ b/tokio-buf/tests/string.rs @@ -2,7 +2,6 @@ extern crate bytes; extern crate futures; extern crate tokio_buf; -use bytes::Buf; use futures::Async::*; use std::fmt; use tokio_buf::BufStream; diff --git a/tokio-buf/tests/support.rs b/tokio-buf/tests/support.rs index 61fb0949..c8d7abfc 100644 --- a/tokio-buf/tests/support.rs +++ b/tokio-buf/tests/support.rs @@ -14,6 +14,7 @@ use std::io::Cursor; macro_rules! assert_buf_eq { ($actual:expr, $expect:expr) => {{ + use bytes::Buf; match $actual { Ok(Ready(Some(val))) => { assert_eq!(val.remaining(), val.bytes().len()); |