summaryrefslogtreecommitdiffstats
path: root/tokio-buf
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2019-02-19 16:30:37 -0500
committerCarl Lerche <me@carllerche.com>2019-02-19 13:30:37 -0800
commitdd66096ea0ab388c7163ae1928fc29d3f20e02ea (patch)
treef8d11c1d1a6ce8b3b77d1a8b1769ee117a0c7475 /tokio-buf
parentc08e73c8d4f3353e93658aa4eb488c0080d0cab0 (diff)
buf: Add `BufStreamExt` trait and add a core feature (#897)
This change adds an extension trait to `BufStream` and puts the core trait behind a feature flag for optional use. This mainly adds the additional functions in an extension trait to allow the user to select if they want just the core trait or the fully featured version. Now the user can add the core feature to _not_ include the extension trait. By deafult, this feature is disabled.
Diffstat (limited to 'tokio-buf')
-rw-r--r--tokio-buf/Cargo.toml8
-rw-r--r--tokio-buf/README.md35
-rw-r--r--tokio-buf/src/buf_stream/bytes.rs70
-rw-r--r--tokio-buf/src/buf_stream/mod.rs163
-rw-r--r--tokio-buf/src/errors.rs (renamed from tokio-buf/src/buf_stream/errors.rs)9
-rw-r--r--tokio-buf/src/ext/chain.rs (renamed from tokio-buf/src/buf_stream/chain.rs)7
-rw-r--r--tokio-buf/src/ext/collect.rs (renamed from tokio-buf/src/buf_stream/collect.rs)3
-rw-r--r--tokio-buf/src/ext/from.rs (renamed from tokio-buf/src/buf_stream/from.rs)2
-rw-r--r--tokio-buf/src/ext/limit.rs (renamed from tokio-buf/src/buf_stream/limit.rs)18
-rw-r--r--tokio-buf/src/ext/mod.rs69
-rw-r--r--tokio-buf/src/lib.rs147
-rw-r--r--tokio-buf/src/size_hint.rs (renamed from tokio-buf/src/buf_stream/size_hint.rs)0
-rw-r--r--tokio-buf/src/str.rs (renamed from tokio-buf/src/buf_stream/str.rs)2
-rw-r--r--tokio-buf/tests/buf_stream.rs268
-rw-r--r--tokio-buf/tests/buf_stream_ext.rs158
-rw-r--r--tokio-buf/tests/support.rs133
16 files changed, 561 insertions, 531 deletions
diff --git a/tokio-buf/Cargo.toml b/tokio-buf/Cargo.toml
index d139a688..2eb938a5 100644
--- a/tokio-buf/Cargo.toml
+++ b/tokio-buf/Cargo.toml
@@ -17,6 +17,10 @@ Asynchronous stream of byte buffers
categories = ["asynchronous"]
[dependencies]
-bytes = { version = "0.4.10", features = [ "either" ] }
-either = "1.5"
+bytes = { version = "0.4.10" }
+either = { version = "1.5", optional = true}
futures = "0.1.23"
+
+[features]
+default = ["ext"]
+ext = ["bytes/either", "either"]
diff --git a/tokio-buf/README.md b/tokio-buf/README.md
index e69de29b..3f275fb5 100644
--- a/tokio-buf/README.md
+++ b/tokio-buf/README.md
@@ -0,0 +1,35 @@
+# tokio-buf
+
+Asynchronous stream of byte buffers
+
+[Documenation](https://docs.rs/tokio-buf)
+
+## Usage
+
+First, add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+tokio-buf = "0.1"
+```
+
+Next, add this to your crate:
+
+```rust
+extern crate tokio_buf;
+```
+
+You can find extensive documentation and examples about how to use this crate
+online at [https://tokio.rs](https://tokio.rs). The [API
+documentation](https://docs.rs/tokio-buf) is also a great place to get started
+for the nitty-gritty.
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/tokio-buf/src/buf_stream/bytes.rs b/tokio-buf/src/buf_stream/bytes.rs
deleted file mode 100644
index 627e9b11..00000000
--- a/tokio-buf/src/buf_stream/bytes.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use BufStream;
-use buf_stream::errors::internal::Never;
-
-use bytes::{Bytes, BytesMut};
-use futures::Poll;
-
-use std::io;
-
-impl BufStream for Vec<u8> {
- type Item = io::Cursor<Vec<u8>>;
- type Error = Never;
-
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if self.is_empty() {
- return Ok(None.into());
- }
-
- poll_bytes(self)
- }
-}
-
-impl BufStream for &'static [u8] {
- type Item = io::Cursor<&'static [u8]>;
- type Error = Never;
-
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if self.is_empty() {
- return Ok(None.into());
- }
-
- poll_bytes(self)
- }
-}
-
-impl BufStream for Bytes {
- type Item = io::Cursor<Bytes>;
- type Error = Never;
-
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if self.is_empty() {
- return Ok(None.into());
- }
-
- poll_bytes(self)
- }
-}
-
-impl BufStream for BytesMut {
- type Item = io::Cursor<BytesMut>;
- type Error = Never;
-
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if self.is_empty() {
- return Ok(None.into());
- }
-
- poll_bytes(self)
- }
-}
-
-fn poll_bytes<T: Default>(buf: &mut T)
- -> Poll<Option<io::Cursor<T>>, Never>
-{
- use std::mem;
-
- let bytes = mem::replace(buf, Default::default());
- let buf = io::Cursor::new(bytes);
-
- Ok(Some(buf).into())
-}
diff --git a/tokio-buf/src/buf_stream/mod.rs b/tokio-buf/src/buf_stream/mod.rs
deleted file mode 100644
index bde1199b..00000000
--- a/tokio-buf/src/buf_stream/mod.rs
+++ /dev/null
@@ -1,163 +0,0 @@
-//! Types and utilities for working with `BufStream`.
-
-mod bytes;
-mod chain;
-mod collect;
-pub mod errors;
-mod from;
-mod limit;
-mod size_hint;
-mod str;
-
-pub use self::chain::Chain;
-pub use self::collect::Collect;
-pub use self::from::FromBufStream;
-pub use self::limit::Limit;
-pub use self::size_hint::SizeHint;
-
-use bytes::Buf;
-use futures::Poll;
-
-/// An asynchronous stream of bytes.
-///
-/// `BufStream` asynchronously yields values implementing `Buf`, i.e. byte
-/// buffers.
-pub trait BufStream {
- /// Values yielded by the `BufStream`.
- ///
- /// Each item is a sequence of bytes representing a chunk of the total
- /// `ByteStream`.
- type Item: Buf;
-
- /// The error type this `BufStream` might generate.
- type Error;
-
- /// Attempt to pull out the next buffer of this stream, registering the
- /// current task for wakeup if the value is not yet available, and returning
- /// `None` if the stream is exhausted.
- ///
- /// # Return value
- ///
- /// There are several possible return values, each indicating a distinct
- /// stream state:
- ///
- /// - `Ok(Async::NotReady)` means that this stream's next value is not ready
- /// yet. Implementations will ensure that the current task will be notified
- /// when the next value may be ready.
- ///
- /// - `Ok(Async::Ready(Some(buf)))` means that the stream has successfully
- /// produced a value, `buf`, and may produce further values on subsequent
- /// `poll_buf` calls.
- ///
- /// - `Ok(Async::Ready(None))` means that the stream has terminated, and
- /// `poll_buf` should not be invoked again.
- ///
- /// # Panics
- ///
- /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
- /// calls to `poll_buf` may result in a panic or other "bad behavior".
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
-
- /// Returns the bounds on the remaining length of the stream.
- ///
- /// The size hint allows the caller to perform certain optimizations that
- /// are dependent on the byte stream size. For example, `collect` uses the
- /// size hint to pre-allocate enough capacity to store the entirety of the
- /// data received from the byte stream.
- ///
- /// When `SizeHint::upper()` returns `Some` with a value equal to
- /// `SizeHint::lower()`, this represents the exact number of bytes that will
- /// be yielded by the `BufStream`.
- ///
- /// # Implementation notes
- ///
- /// While not enforced, implementations are expected to respect the values
- /// returned from `SizeHint`. Any deviation is considered an implementation
- /// bug. Consumers may rely on correctness in order to use the value as part
- /// of protocol impelmentations. For example, an HTTP library may use the
- /// size hint to set the `content-length` header.
- ///
- /// However, `size_hint` must not be trusted to omit bounds checks in unsafe
- /// code. An incorrect implementation of `size_hint()` must not lead to
- /// memory safety violations.
- fn size_hint(&self) -> SizeHint {
- SizeHint::default()
- }
-
- /// Indicates to the `BufStream` how much data the consumer is currently
- /// able to process.
- ///
- /// The consume hint allows the stream to perform certain optimizations that
- /// are dependent on the consumer's readiness. For example, the consume hint
- /// may be used to request a remote peer to start sending up to `amount`
- /// data.
- ///
- /// Calling `consume_hint` is not a requirement. If `consume_hint` is never
- /// called, the stream should assume a default behavior. When `consume_hint`
- /// is called, the stream should make a best effort to honor by the request.
- ///
- /// `amount` represents the number of bytes that the caller would like to
- /// receive at the time the function is called. For example, if
- /// `consume_hint` is called with 20, the consumer requests 20 bytes. The
- /// stream may yield less than that. If the next call to `poll_buf` returns
- /// 5 bytes, the consumer still has 15 bytes requested. At this point,
- /// invoking `consume_hint` again with 20 resets the amount requested back
- /// to 20 bytes.
- ///
- /// Calling `consume_hint` with 0 as the argument informs the stream that
- /// the caller does not intend to call `poll_buf`. If `poll_buf` **is**
- /// called, the stream may, but is not obligated to, return `NotReady` even
- /// if it could produce data at that point. If it chooses to return
- /// `NotReady`, when `consume_hint` is called with a non-zero argument, the
- /// task must be notified in order to respect the `poll_buf` contract.
- fn consume_hint(&mut self, amount: usize) {
- // By default, this function does nothing
- drop(amount);
- }
-
- /// Takes two buf streams and creates a new buf stream over both in
- /// sequence.
- ///
- /// `chain()` returns a new `BufStream` value which will first yield all
- /// data from `self` then all data from `other`.
- ///
- /// In other words, it links two buf streams together, in a chain.
- fn chain<T>(self, other: T) -> Chain<Self, T>
- where
- Self: Sized,
- T: BufStream<Error = Self::Error>,
- {
- Chain::new(self, other)
- }
-
- /// Consumes all data from `self`, storing it in byte storage of type `T`.
- ///
- /// `collect()` returns a future that buffers all data yielded from `self`
- /// into storage of type of `T`. The future completes once `self` yield
- /// `None`, returning the buffered data.
- ///
- /// The collect future will yield an error if `self` yields an error or if
- /// the collect operation errors. The collect error cases are dependent on
- /// the target storage type.
- fn collect<T>(self) -> Collect<Self, T>
- where
- Self: Sized,
- T: FromBufStream<Self::Item>,
- {
- Collect::new(self)
- }
-
- /// Limit the number of bytes that the stream can yield.
- ///
- /// `limit()` returns a new `BufStream` value which yields all the data from
- /// `self` while ensuring that at most `amount` bytes are yielded.
- ///
- /// If `self` can yield greater than `amount` bytes, the returned stream
- /// will yield an error.
- fn limit(self, amount: u64) -> Limit<Self>
- where
- Self: Sized,
- {
- Limit::new(self, amount)
- }
-}
diff --git a/tokio-buf/src/buf_stream/errors.rs b/tokio-buf/src/errors.rs
index 2a9760ff..d87f1f8c 100644
--- a/tokio-buf/src/buf_stream/errors.rs
+++ b/tokio-buf/src/errors.rs
@@ -1,8 +1,11 @@
//! Error types
-pub use super::collect::CollectError;
-pub use super::from::CollectVecError;
-pub use super::limit::LimitError;
+#[cfg(feature = "ext")]
+pub use ext::CollectError;
+#[cfg(feature = "ext")]
+pub use ext::CollectVecError;
+#[cfg(feature = "ext")]
+pub use ext::LimitError;
// Being crate-private, we should be able to swap the type out in a
// backwards compatible way.
diff --git a/tokio-buf/src/buf_stream/chain.rs b/tokio-buf/src/ext/chain.rs
index 157b0e9e..44fc4008 100644
--- a/tokio-buf/src/buf_stream/chain.rs
+++ b/tokio-buf/src/ext/chain.rs
@@ -1,4 +1,4 @@
-use super::{BufStream, SizeHint};
+use BufStream;
use either::Either;
use futures::Poll;
@@ -43,9 +43,4 @@ where
let res = try_ready!(self.right.poll_buf());
Ok(res.map(Either::Right).into())
}
-
- fn size_hint(&self) -> SizeHint {
- // TODO: Implement
- SizeHint::default()
- }
}
diff --git a/tokio-buf/src/buf_stream/collect.rs b/tokio-buf/src/ext/collect.rs
index c5731561..95da5452 100644
--- a/tokio-buf/src/buf_stream/collect.rs
+++ b/tokio-buf/src/ext/collect.rs
@@ -1,4 +1,5 @@
-use super::{BufStream, FromBufStream};
+use BufStream;
+use super::FromBufStream;
use futures::{Future, Poll};
diff --git a/tokio-buf/src/buf_stream/from.rs b/tokio-buf/src/ext/from.rs
index 6137bf82..c15057a4 100644
--- a/tokio-buf/src/buf_stream/from.rs
+++ b/tokio-buf/src/ext/from.rs
@@ -1,4 +1,4 @@
-use super::SizeHint;
+use SizeHint;
use bytes::{Buf, BufMut};
diff --git a/tokio-buf/src/buf_stream/limit.rs b/tokio-buf/src/ext/limit.rs
index 0c6baeb8..1d3e0ba3 100644
--- a/tokio-buf/src/buf_stream/limit.rs
+++ b/tokio-buf/src/ext/limit.rs
@@ -1,4 +1,4 @@
-use super::{BufStream, SizeHint};
+use BufStream;
use bytes::Buf;
use futures::Poll;
@@ -59,22 +59,6 @@ where
res
}
-
- fn size_hint(&self) -> SizeHint {
- let mut hint = self.stream.size_hint();
-
- let upper = hint.upper()
- .map(|upper| upper.min(self.remaining))
- .unwrap_or(self.remaining);
-
- hint.set_upper(upper);
- hint
- }
-
- fn consume_hint(&mut self, amount: usize) {
- // TODO: Should this be capped by `self.remaining`?
- self.stream.consume_hint(amount)
- }
}
// ===== impl LimitError =====
diff --git a/tokio-buf/src/ext/mod.rs b/tokio-buf/src/ext/mod.rs
new file mode 100644
index 00000000..f0299129
--- /dev/null
+++ b/tokio-buf/src/ext/mod.rs
@@ -0,0 +1,69 @@
+//! Types and utilities for working with `BufStream`.
+
+mod chain;
+mod collect;
+mod from;
+mod limit;
+
+pub use self::chain::Chain;
+pub use self::collect::Collect;
+pub use self::from::FromBufStream;
+pub use self::limit::Limit;
+
+pub use self::collect::CollectError;
+pub use self::from::CollectVecError;
+pub use self::limit::LimitError;
+
+use BufStream;
+
+impl<T> BufStreamExt for T where T: BufStream {}
+
+/// An extension trait for `BufStream`'s that provides a variety of convenient
+/// adapters.
+pub trait BufStreamExt: BufStream {
+ /// Takes two buf streams and creates a new buf stream over both in
+ /// sequence.
+ ///
+ /// `chain()` returns a new `BufStream` value which will first yield all
+ /// data from `self` then all data from `other`.
+ ///
+ /// In other words, it links two buf streams together, in a chain.
+ fn chain<T>(self, other: T) -> Chain<Self, T>
+ where
+ Self: Sized,
+ T: BufStream<Error = Self::Error>,
+ {
+ Chain::new(self, other)
+ }
+
+ /// Consumes all data from `self`, storing it in byte storage of type `T`.
+ ///
+ /// `collect()` returns a future that buffers all data yielded from `self`
+ /// into storage of type of `T`. The future completes once `self` yield
+ /// `None`, returning the buffered data.
+ ///
+ /// The collect future will yield an error if `self` yields an error or if
+ /// the collect operation errors. The collect error cases are dependent on
+ /// the target storage type.
+ fn collect<T>(self) -> Collect<Self, T>
+ where
+ Self: Sized,
+ T: FromBufStream<Self::Item>,
+ {
+ Collect::new(self)
+ }
+
+ /// Limit the number of bytes that the stream can yield.
+ ///
+ /// `limit()` returns a new `BufStream` value which yields all the data from
+ /// `self` while ensuring that at most `amount` bytes are yielded.
+ ///
+ /// If `self` can yield greater than `amount` bytes, the returned stream
+ /// will yield an error.
+ fn limit(self, amount: u64) -> Limit<Self>
+ where
+ Self: Sized,
+ {
+ Limit::new(self, amount)
+ }
+}
diff --git a/tokio-buf/src/lib.rs b/tokio-buf/src/lib.rs
index b706b932..d816f550 100644
--- a/tokio-buf/src/lib.rs
+++ b/tokio-buf/src/lib.rs
@@ -10,11 +10,154 @@
//! `Buf` (i.e, byte collections).
extern crate bytes;
+#[cfg(feature = "ext")]
extern crate either;
+#[allow(unused)]
#[macro_use]
extern crate futures;
-pub mod buf_stream;
+#[cfg(feature = "ext")]
+pub mod ext;
+pub mod errors;
+mod size_hint;
+mod str;
#[doc(inline)]
-pub use buf_stream::BufStream;
+#[cfg(feature = "ext")]
+pub use ext::BufStreamExt;
+pub use self::size_hint::SizeHint;
+
+use futures::Poll;
+use bytes::{Buf, Bytes, BytesMut};
+use std::io;
+use errors::internal::Never;
+
+/// An asynchronous stream of bytes.
+///
+/// `BufStream` asynchronously yields values implementing `Buf`, i.e. byte
+/// buffers.
+pub trait BufStream {
+ /// Values yielded by the `BufStream`.
+ ///
+ /// Each item is a sequence of bytes representing a chunk of the total
+ /// `ByteStream`.
+ type Item: Buf;
+
+ /// The error type this `BufStream` might generate.
+ type Error;
+
+ /// Attempt to pull out the next buffer of this stream, registering the
+ /// current task for wakeup if the value is not yet available, and returning
+ /// `None` if the stream is exhausted.
+ ///
+ /// # Return value
+ ///
+ /// There are several possible return values, each indicating a distinct
+ /// stream state:
+ ///
+ /// - `Ok(Async::NotReady)` means that this stream's next value is not ready
+ /// yet. Implementations will ensure that the current task will be notified
+ /// when the next value may be ready.
+ ///
+ /// - `Ok(Async::Ready(Some(buf)))` means that the stream has successfully
+ /// produced a value, `buf`, and may produce further values on subsequent
+ /// `poll_buf` calls.
+ ///
+ /// - `Ok(Async::Ready(None))` means that the stream has terminated, and
+ /// `poll_buf` should not be invoked again.
+ ///
+ /// # Panics
+ ///
+ /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
+ /// calls to `poll_buf` may result in a panic or other "bad behavior".
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
+
+ /// Returns the bounds on the remaining length of the stream.
+ ///
+ /// The size hint allows the caller to perform certain optimizations that
+ /// are dependent on the byte stream size. For example, `collect` uses the
+ /// size hint to pre-allocate enough capacity to store the entirety of the
+ /// data received from the byte stream.
+ ///
+ /// When `SizeHint::upper()` returns `Some` with a value equal to
+ /// `SizeHint::lower()`, this represents the exact number of bytes that will
+ /// be yielded by the `BufStream`.
+ ///
+ /// # Implementation notes
+ ///
+ /// While not enforced, implementations are expected to respect the values
+ /// returned from `SizeHint`. Any deviation is considered an implementation
+ /// bug. Consumers may rely on correctness in order to use the value as part
+ /// of protocol impelmentations. For example, an HTTP library may use the
+ /// size hint to set the `content-length` header.
+ ///
+ /// However, `size_hint` must not be trusted to omit bounds checks in unsafe
+ /// code. An incorrect implementation of `size_hint()` must not lead to
+ /// memory safety violations.
+ fn size_hint(&self) -> SizeHint {
+ SizeHint::default()
+ }
+}
+
+impl BufStream for Vec<u8> {
+ type Item = io::Cursor<Vec<u8>>;
+ type Error = Never;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.is_empty() {
+ return Ok(None.into());
+ }
+
+ poll_bytes(self)
+ }
+}
+
+impl BufStream for &'static [u8] {
+ type Item = io::Cursor<&'static [u8]>;
+ type Error = Never;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.is_empty() {
+ return Ok(None.into());
+ }
+
+ poll_bytes(self)
+ }
+}
+
+impl BufStream for Bytes {
+ type Item = io::Cursor<Bytes>;
+ type Error = Never;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.is_empty() {
+ return Ok(None.into());
+ }
+
+ poll_bytes(self)
+ }
+}
+
+impl BufStream for BytesMut {
+ type Item = io::Cursor<BytesMut>;
+ type Error = Never;
+
+ fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ if self.is_empty() {
+ return Ok(None.into());
+ }
+
+ poll_bytes(self)
+ }
+}
+
+fn poll_bytes<T: Default>(buf: &mut T)
+ -> Poll<Option<io::Cursor<T>>, Never>
+{
+ use std::mem;
+
+ let bytes = mem::replace(buf, Default::default());
+ let buf = io::Cursor::new(bytes);
+
+ Ok(Some(buf).into())
+}
diff --git a/tokio-buf/src/buf_stream/size_hint.rs b/tokio-buf/src/size_hint.rs
index 1e818fbd..1e818fbd 100644
--- a/tokio-buf/src/buf_stream/size_hint.rs
+++ b/tokio-buf/src/size_hint.rs
diff --git a/tokio-buf/src/buf_stream/str.rs b/tokio-buf/src/str.rs
index fa12d8f0..b1808080 100644
--- a/tokio-buf/src/buf_stream/str.rs
+++ b/tokio-buf/src/str.rs
@@ -1,5 +1,5 @@
use BufStream;
-use buf_stream::errors::internal::Never;
+use errors::internal::Never;
use futures::Poll;
diff --git a/tokio-buf/tests/buf_stream.rs b/tokio-buf/tests/buf_stream.rs
index e5b462b5..848a5de1 100644
--- a/tokio-buf/tests/buf_stream.rs
+++ b/tokio-buf/tests/buf_stream.rs
@@ -2,45 +2,13 @@ extern crate tokio_buf;
extern crate bytes;
extern crate futures;
-use tokio_buf::buf_stream::{BufStream, SizeHint};
+use tokio_buf::{BufStream, SizeHint};
use bytes::Buf;
-use futures::{Future, Poll};
use futures::Async::*;
-use std::collections::VecDeque;
-use std::io::Cursor;
-macro_rules! assert_buf_eq {
- ($actual:expr, $expect:expr) => {{
- match $actual {
- Ok(Ready(Some(val))) => {
- assert_eq!(val.remaining(), val.bytes().len());
- assert_eq!(val.bytes(), $expect.as_bytes());
- }
- Ok(Ready(None)) => panic!("expected value; BufStream yielded None"),
- Ok(NotReady) => panic!("expected value; BufStream is not ready"),
- Err(e) => panic!("expected value; got error = {:?}", e),
- }
- }};
-}
-
-macro_rules! assert_none {
- ($actual:expr) => {
- match $actual {
- Ok(Ready(None)) => {}
- actual => panic!("expected None; actual = {:?}", actual),
- }
- }
-}
-
-macro_rules! assert_not_ready {
- ($actual:expr) => {
- match $actual {
- Ok(NotReady) => {}
- actual => panic!("expected NotReady; actual = {:?}", actual),
- }
- }
-}
+#[macro_use]
+mod support;
// ===== test `SizeHint` =====
@@ -83,149 +51,6 @@ fn size_hint_upper_less_than_lower() {
hint.set_upper(100);
}
-// ===== test `chain()` =====
-
-#[test]
-fn chain() {
- // Chain one with one
- //
- let mut bs = one("hello").chain(one("world"));
-
- assert_buf_eq!(bs.poll_buf(), "hello");
- assert_buf_eq!(bs.poll_buf(), "world");
- assert_none!(bs.poll_buf());
-
- // Chain multi with multi
- let mut bs = list(&["foo", "bar"])
- .chain(list(&["baz", "bok"]));
-
- assert_buf_eq!(bs.poll_buf(), "foo");
- assert_buf_eq!(bs.poll_buf(), "bar");
- assert_buf_eq!(bs.poll_buf(), "baz");
- assert_buf_eq!(bs.poll_buf(), "bok");
- assert_none!(bs.poll_buf());
-
- // Chain includes a not ready call
- //
- let mut bs = new_mock(&[
- Ok(Ready("foo")),
- Ok(NotReady),
- Ok(Ready("bar"))
- ]).chain(one("baz"));
-
- assert_buf_eq!(bs.poll_buf(), "foo");
- assert_not_ready!(bs.poll_buf());
- assert_buf_eq!(bs.poll_buf(), "bar");
- assert_buf_eq!(bs.poll_buf(), "baz");
- assert_none!(bs.poll_buf());
-}
-
-// ===== Test `collect()` =====
-
-#[test]
-fn collect_vec() {
- // While unfortunate, this test makes some assumptions on vec's resizing
- // behavior.
- //
- // Collect one
- //
- let bs = one("hello world");
-
- let vec: Vec<u8> = bs.collect()
- .wait().unwrap();
-
- assert_eq!(vec, b"hello world");
- assert_eq!(vec.capacity(), 64);
-
- // Collect one, with size hint
- //
- let mut bs = one("hello world");
- bs.size_hint.set_lower(11);
-
- let vec: Vec<u8> = bs.collect()
- .wait().unwrap();
-
- assert_eq!(vec, b"hello world");
- assert_eq!(vec.capacity(), 64);
-
- // Collect one, with size hint
- //
- let mut bs = one("hello world");
- bs.size_hint.set_lower(10);
-
- let vec: Vec<u8> = bs.collect()
- .wait().unwrap();
-
- assert_eq!(vec, b"hello world");
- assert_eq!(vec.capacity(), 64);
-
- // Collect many
- //
- let bs = list(&["hello", " ", "world", ", one two three"]);
-
- let vec: Vec<u8> = bs.collect()
- .wait().unwrap();
-
- assert_eq!(vec, b"hello world, one two three");
-}
-
-// ===== Test limit() =====
-
-#[test]
-fn limit() {
- // Not limited
-
- let res = one("hello world")
- .limit(100)
- .collect::<Vec<_>>()
- .wait().unwrap();
-
- assert_eq!(res, b"hello world");
-
- let res = list(&["hello", " ", "world"])
- .limit(100)
- .collect::<Vec<_>>()
- .wait().unwrap();
-
- assert_eq!(res, b"hello world");
-
- let res = list(&["hello", " ", "world"])
- .limit(11)
- .collect::<Vec<_>>()
- .wait().unwrap();
-
- assert_eq!(res, b"hello world");
-
- // Limited
-
- let res = one("hello world")
- .limit(5)
- .collect::<Vec<_>>()
- .wait();
-
- assert!(res.is_err());
-
- let res = one("hello world")
- .limit(10)
- .collect::<Vec<_>>()
- .wait();
-
- assert!(res.is_err());
-
- let mut bs = list(&["hello", " ", "world"])
- .limit(9);
-
- assert_buf_eq!(bs.poll_buf(), "hello");
- assert_buf_eq!(bs.poll_buf(), " ");
- assert!(bs.poll_buf().is_err());
-
- let mut bs = list(&["hello", " ", "world"]);
- bs.size_hint.set_lower(11);
- let mut bs = bs.limit(9);
-
- assert!(bs.poll_buf().is_err());
-}
-
// ===== BufStream impelmentations for misc types =====
#[test]
@@ -240,90 +65,3 @@ fn str_buf_stream() {
assert!(bs.is_empty());
assert_none!(bs.poll_buf());
}
-
-// ===== Test utils =====
-
-fn one(buf: &'static str) -> Mock {
- list(&[buf])
-}
-
-fn list(bufs: &[&'static str]) -> Mock {
- let mut polls = VecDeque::new();
-
- for &buf in bufs {
- polls.push_back(Ok(Ready(buf.as_bytes())));
- }
-
- Mock {
- polls,
- size_hint: SizeHint::default(),
- }
-}
-
-fn new_mock(values: &[Poll<&'static str, ()>]) -> Mock {
- let mut polls = VecDeque::new();
-
- for &v in values {
- polls.push_back(match v {
- Ok(Ready(v)) => Ok(Ready(v.as_bytes())),
- Ok(NotReady) => Ok(NotReady),
- Err(e) => Err(e),
- });
- }
-
- Mock {
- polls,
- size_hint: SizeHint::default(),
- }
-}
-
-#[derive(Debug)]
-struct Mock {
- polls: VecDeque<Poll<&'static [u8], ()>>,
- size_hint: SizeHint,
-}
-
-#[derive(Debug)]
-struct MockBuf {
- data: Cursor<&'static [u8]>,
-}
-
-impl BufStream for Mock {
- type Item = MockBuf;
- type Error = ();
-
- fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- match self.polls.pop_front() {
- Some(Ok(Ready(value))) => Ok(Ready(Some(MockBuf::new(value)))),
- Some(Ok(NotReady)) => Ok(NotReady),
- Some(Err(e)) => Err(e),
- None => Ok(Ready(None)),
- }
- }
-
- fn size_hint(&self) -> SizeHint {
- self.size_hint.clone()
- }
-}
-
-impl MockBuf {
- fn new(data: &'static [u8]) -> MockBuf {
- MockBuf {
- data: Cursor::new(data),
- }
- }
-}
-
-impl Buf for MockBuf {
- fn remaining(&self) -> usize {
- self.data.remaining()
- }
-
- fn bytes(&self) -> &[u8] {
- self.data.bytes()
- }
-
- fn advance(&mut self, cnt: usize) {
- self.data.advance(cnt)
- }
-}
diff --git a/tokio-buf/tests/buf_stream_ext.rs b/tokio-buf/tests/buf_stream_ext.rs
new file mode 100644
index 00000000..2395d20e
--- /dev/null
+++ b/tokio-buf/tests/buf_stream_ext.rs
@@ -0,0 +1,158 @@
+#![cfg(feature = "ext")]
+
+extern crate tokio_buf;
+extern crate bytes;
+extern crate futures;
+
+use tokio_buf::{BufStream, BufStreamExt};
+use futures::Future;
+use futures::Async::*;
+use bytes::Buf;
+
+#[macro_use]
+mod support;
+
+use support::*;
+
+// ===== test `chain()` =====
+
+#[test]
+fn chain() {
+ // Chain one with one
+ //
+ let mut bs = one("hello").chain(one("world"));
+
+ assert_buf_eq!(bs.poll_buf(), "hello");
+ assert_buf_eq!(bs.poll_buf(), "world");
+ assert_none!(bs.poll_buf());
+
+ // Chain multi with multi
+ let mut bs = list(&["foo", "bar"])
+ .chain(list(&["baz", "bok"]));
+
+ assert_buf_eq!(bs.poll_buf(), "foo");
+ assert_buf_eq!(bs.poll_buf(), "bar");
+ assert_buf_eq!(bs.poll_buf(), "baz");
+ assert_buf_eq!(bs.poll_buf(), "bok");
+ assert_none!(bs.poll_buf());
+
+ // Chain includes a not ready call
+ //
+ let mut bs = new_mock(&[
+ Ok(Ready("foo")),
+ Ok(NotReady),
+ Ok(Ready("bar"