summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-03-29 12:26:13 -0700
committerGitHub <noreply@github.com>2019-03-29 12:26:13 -0700
commit824b7b675990baab87fa3df347fa4a5491f6809b (patch)
treea0f3ef0aaa5cf3469e20379239fb4a8e878aecfb
parentcb91dd274af0a7486177c824b9744d28b43849b9 (diff)
buf: stream and iter helpers (#1011)
-rw-r--r--tokio-buf/Cargo.toml3
-rw-r--r--tokio-buf/src/util/iter.rs54
-rw-r--r--tokio-buf/src/util/mod.rs4
-rw-r--r--tokio-buf/src/util/stream.rs35
-rw-r--r--tokio-buf/tests/chain.rs1
-rw-r--r--tokio-buf/tests/iter.rs33
-rw-r--r--tokio-buf/tests/limit.rs1
-rw-r--r--tokio-buf/tests/stream.rs49
-rw-r--r--tokio-buf/tests/string.rs1
-rw-r--r--tokio-buf/tests/support.rs1
10 files changed, 179 insertions, 3 deletions
diff --git a/tokio-buf/Cargo.toml b/tokio-buf/Cargo.toml
index fa0bc240..d41eb487 100644
--- a/tokio-buf/Cargo.toml
+++ b/tokio-buf/Cargo.toml
@@ -27,3 +27,6 @@ futures = "0.1.23"
[features]
default = ["util"]
util = ["bytes/either", "either"]
+
+[dev-dependencies]
+tokio-mock-task = "0.1.1"
diff --git a/tokio-buf/src/util/iter.rs b/tokio-buf/src/util/iter.rs
new file mode 100644
index 00000000..0f48546a
--- /dev/null
+++ b/tokio-buf/src/util/iter.rs
@@ -0,0 +1,54 @@
+use bytes::Buf;
+use futures::Poll;
+use std::error::Error;
+use std::fmt;
+use BufStream;
+
+/// Converts an `Iterator` into a `BufStream` which is always ready to yield the
+/// next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+where
+ I: IntoIterator,
+ I::Item: Buf,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+/// `BufStream` returned by the [`iter`] function.
+#[derive(Debug)]
+pub struct Iter<I> {
+ iter: I,
+}
+
+#[derive(Debug)]
+pub enum Never {}
+
+impl<I> BufStream for Iter<I>
+where
+ I: Iterator,
+ I::Item: Buf,
+{
+ type Item = I::Item;
+ type Error = Never;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(self.iter.next().into())
+ }
+}
+
+impl fmt::Display for Never {
+ fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
+ unreachable!();
+ }
+}
+
+impl Error for Never {
+ fn description(&self) -> &str {
+ unreachable!();
+ }
+}
diff --git a/tokio-buf/src/util/mod.rs b/tokio-buf/src/util/mod.rs
index c9a0b64e..e080b8a7 100644
--- a/tokio-buf/src/util/mod.rs
+++ b/tokio-buf/src/util/mod.rs
@@ -3,12 +3,16 @@
mod chain;
mod collect;
mod from;
+mod iter;
mod limit;
+mod stream;
pub use self::chain::Chain;
pub use self::collect::Collect;
pub use self::from::FromBufStream;
+pub use self::iter::iter;
pub use self::limit::Limit;
+pub use self::stream::stream;
pub mod error {
//! Error types
diff --git a/tokio-buf/src/util/stream.rs b/tokio-buf/src/util/stream.rs
new file mode 100644
index 00000000..a735fb48
--- /dev/null
+++ b/tokio-buf/src/util/stream.rs
@@ -0,0 +1,35 @@
+use bytes::Buf;
+use futures::{Poll, Stream};
+use BufStream;
+
+/// Converts a `Stream` of `Buf` types into a `BufStream`.
+///
+/// While `Stream` and `BufSream` are very similar, they are not identical. The
+/// `stream` function returns a `BufStream` that is backed by the provided
+/// `Stream` type.
+pub fn stream<T>(stream: T) -> FromStream<T>
+where
+ T: Stream,
+ T::Item: Buf,
+{
+ FromStream { stream }
+}
+
+/// `BufStream` returned by the [`stream`] function.
+#[derive(Debug)]
+pub struct FromStream<T> {
+ stream: T,
+}
+
+impl<T> BufStream for FromStream<T>
+where
+ T: Stream,
+ T::Item: Buf,
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.stream.poll()
+ }
+}
diff --git a/tokio-buf/tests/chain.rs b/tokio-buf/tests/chain.rs
index fcac0280..7cc7ea70 100644
--- a/tokio-buf/tests/chain.rs
+++ b/tokio-buf/tests/chain.rs
@@ -4,7 +4,6 @@ extern crate bytes;
extern crate futures;
extern crate tokio_buf;
-use bytes::Buf;
use futures::Async::*;
use tokio_buf::{BufStream, BufStreamExt};
diff --git a/tokio-buf/tests/iter.rs b/tokio-buf/tests/iter.rs
new file mode 100644
index 00000000..365a396e
--- /dev/null
+++ b/tokio-buf/tests/iter.rs
@@ -0,0 +1,33 @@
+extern crate bytes;
+extern crate futures;
+extern crate tokio_buf;
+
+use futures::Async::*;
+use std::io::Cursor;
+use tokio_buf::{util, BufStream};
+
+#[macro_use]
+mod support;
+
+type Buf = Cursor<&'static [u8]>;
+
+#[test]
+fn empty_iter() {
+ let mut bs = util::iter(Vec::<Buf>::new());
+ assert_none!(bs.poll_buf());
+}
+
+#[test]
+fn full_iter() {
+ let bufs = vec![buf(b"one"), buf(b"two"), buf(b"three")];
+
+ let mut bs = util::iter(bufs);
+ assert_buf_eq!(bs.poll_buf(), "one");
+ assert_buf_eq!(bs.poll_buf(), "two");
+ assert_buf_eq!(bs.poll_buf(), "three");
+ assert_none!(bs.poll_buf());
+}
+
+fn buf(data: &'static [u8]) -> Buf {
+ Cursor::new(data)
+}
diff --git a/tokio-buf/tests/limit.rs b/tokio-buf/tests/limit.rs
index 058f2e10..1cd90921 100644
--- a/tokio-buf/tests/limit.rs
+++ b/tokio-buf/tests/limit.rs
@@ -4,7 +4,6 @@ extern crate bytes;
extern crate futures;
extern crate tokio_buf;
-use bytes::Buf;
use futures::Async::*;
use futures::Future;
use tokio_buf::{BufStream, BufStreamExt};
diff --git a/tokio-buf/tests/stream.rs b/tokio-buf/tests/stream.rs
new file mode 100644
index 00000000..3491feb4
--- /dev/null
+++ b/tokio-buf/tests/stream.rs
@@ -0,0 +1,49 @@
+extern crate bytes;
+extern crate futures;
+extern crate tokio_buf;
+extern crate tokio_mock_task;
+
+use futures::sync::mpsc;
+use futures::Async::*;
+use std::io::Cursor;
+use tokio_buf::{util, BufStream};
+use tokio_mock_task::MockTask;
+
+#[macro_use]
+mod support;
+
+type Buf = Cursor<&'static [u8]>;
+
+#[test]
+fn empty_stream() {
+ let (_, rx) = mpsc::unbounded::<Buf>();
+ let mut bs = util::stream(rx);
+ assert_none!(bs.poll_buf());
+}
+
+#[test]
+fn full_stream() {
+ let (tx, rx) = mpsc::unbounded();
+ let mut bs = util::stream(rx);
+ let mut task = MockTask::new();
+
+ tx.unbounded_send(buf(b"one")).unwrap();
+
+ assert_buf_eq!(bs.poll_buf(), "one");
+ task.enter(|| assert_not_ready!(bs.poll_buf()));
+
+ tx.unbounded_send(buf(b"two")).unwrap();
+
+ assert!(task.is_notified());
+ assert_buf_eq!(bs.poll_buf(), "two");
+ task.enter(|| assert_not_ready!(bs.poll_buf()));
+
+ drop(tx);
+
+ assert!(task.is_notified());
+ assert_none!(bs.poll_buf());
+}
+
+fn buf(data: &'static [u8]) -> Buf {
+ Cursor::new(data)
+}
diff --git a/tokio-buf/tests/string.rs b/tokio-buf/tests/string.rs
index b54d7d30..a99db209 100644
--- a/tokio-buf/tests/string.rs
+++ b/tokio-buf/tests/string.rs
@@ -2,7 +2,6 @@ extern crate bytes;
extern crate futures;
extern crate tokio_buf;
-use bytes::Buf;
use futures::Async::*;
use std::fmt;
use tokio_buf::BufStream;
diff --git a/tokio-buf/tests/support.rs b/tokio-buf/tests/support.rs
index 61fb0949..c8d7abfc 100644
--- a/tokio-buf/tests/support.rs
+++ b/tokio-buf/tests/support.rs
@@ -14,6 +14,7 @@ use std::io::Cursor;
macro_rules! assert_buf_eq {
($actual:expr, $expect:expr) => {{
+ use bytes::Buf;
match $actual {
Ok(Ready(Some(val))) => {
assert_eq!(val.remaining(), val.bytes().len());