summaryrefslogtreecommitdiffstats
path: root/tokio-codec
diff options
context:
space:
mode:
authorBryan Burgers <bryan@burgers.io>2018-06-04 22:36:06 -0500
committerCarl Lerche <me@carllerche.com>2018-06-04 20:36:06 -0700
commitf723d100871e025e4bdd2f47397c9b089e666ce0 (patch)
tree7d67d7f8bee561f6f04b9d1222dd02719fb47b12 /tokio-codec
parent3d7263d3a0b73ab35d63b45a6524bde7251851e8 (diff)
Create tokio-codec (#360)
Create a new tokio-codec crate with many of the contents of `tokio_io::codec`.
Diffstat (limited to 'tokio-codec')
-rw-r--r--tokio-codec/CHANGELOG.md3
-rw-r--r--tokio-codec/Cargo.toml22
-rw-r--r--tokio-codec/LICENSE25
-rw-r--r--tokio-codec/README.md35
-rw-r--r--tokio-codec/src/bytes_codec.rs37
-rw-r--r--tokio-codec/src/lib.rs32
-rw-r--r--tokio-codec/src/lines_codec.rs89
-rw-r--r--tokio-codec/tests/codecs.rs76
-rw-r--r--tokio-codec/tests/framed.rs97
-rw-r--r--tokio-codec/tests/framed_read.rs216
-rw-r--r--tokio-codec/tests/framed_write.rs134
11 files changed, 766 insertions, 0 deletions
diff --git a/tokio-codec/CHANGELOG.md b/tokio-codec/CHANGELOG.md
new file mode 100644
index 00000000..f96d5068
--- /dev/null
+++ b/tokio-codec/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Unreleased
+
+* Initial release (#353)
diff --git a/tokio-codec/Cargo.toml b/tokio-codec/Cargo.toml
new file mode 100644
index 00000000..1d335b83
--- /dev/null
+++ b/tokio-codec/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "tokio-codec"
+
+# When releasing to crates.io:
+# - Update html_root_url.
+# - Update CHANGELOG.md.
+# - Create "v0.1.x" git tag.
+version = "0.1.0"
+authors = ["Carl Lerche <me@carllerche.com>", "Bryan Burgers <bryan@burgers.io>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-codec/0.1"
+description = """
+Utilities for encoding and decoding frames.
+"""
+categories = ["asynchronous"]
+
+[dependencies]
+tokio-io = { version = "0.1.6", path = "../tokio-io" }
+bytes = "0.4.7"
+futures = "0.1.18"
diff --git a/tokio-codec/LICENSE b/tokio-codec/LICENSE
new file mode 100644
index 00000000..38c1e27b
--- /dev/null
+++ b/tokio-codec/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2018 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/tokio-codec/README.md b/tokio-codec/README.md
new file mode 100644
index 00000000..e0c1a385
--- /dev/null
+++ b/tokio-codec/README.md
@@ -0,0 +1,35 @@
+# tokio-codec
+
+Utilities for encoding and decoding frames.
+
+[Documentation](https://docs.rs/tokio-codec)
+
+## Usage
+
+First, add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+tokio-codec = "0.1"
+```
+
+Next, add this to your crate:
+
+```rust
+extern crate tokio_codec;
+```
+
+You can find extensive documentation and examples about how to use this crate
+online at [https://tokio.rs](https://tokio.rs). The [API
+documentation](https://docs.rs/tokio-codec) is also a great place to get started
+for the nitty-gritty.
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/tokio-codec/src/bytes_codec.rs b/tokio-codec/src/bytes_codec.rs
new file mode 100644
index 00000000..d535aef6
--- /dev/null
+++ b/tokio-codec/src/bytes_codec.rs
@@ -0,0 +1,37 @@
+use bytes::{Bytes, BufMut, BytesMut};
+use tokio_io::_tokio_codec::{Encoder, Decoder};
+use std::io;
+
+/// A simple `Codec` implementation that just ships bytes around.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+ /// Creates a new `BytesCodec` for shipping around raw bytes.
+ pub fn new() -> BytesCodec { BytesCodec(()) }
+}
+
+impl Decoder for BytesCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl Encoder for BytesCodec {
+ type Item = Bytes;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
diff --git a/tokio-codec/src/lib.rs b/tokio-codec/src/lib.rs
new file mode 100644
index 00000000..2b26b542
--- /dev/null
+++ b/tokio-codec/src/lib.rs
@@ -0,0 +1,32 @@
+//! Utilities for encoding and decoding frames.
+//!
+//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as [transports].
+//!
+//! [`AsyncRead`]: #
+//! [`AsyncWrite`]: #
+//! [`Sink`]: #
+//! [`Stream`]: #
+//! [transports]: #
+
+#![deny(missing_docs, missing_debug_implementations, warnings)]
+#![doc(html_root_url = "https://docs.rs/tokio-codec/0.1.0")]
+
+extern crate bytes;
+extern crate tokio_io;
+
+mod bytes_codec;
+mod lines_codec;
+
+pub use tokio_io::_tokio_codec::{
+ Decoder,
+ Encoder,
+ Framed,
+ FramedParts,
+ FramedRead,
+ FramedWrite,
+};
+
+pub use bytes_codec::BytesCodec;
+pub use lines_codec::LinesCodec;
diff --git a/tokio-codec/src/lines_codec.rs b/tokio-codec/src/lines_codec.rs
new file mode 100644
index 00000000..bf4135b8
--- /dev/null
+++ b/tokio-codec/src/lines_codec.rs
@@ -0,0 +1,89 @@
+use bytes::{BufMut, BytesMut};
+use tokio_io::_tokio_codec::{Encoder, Decoder};
+use std::{io, str};
+
+/// A simple `Codec` implementation that splits up data into lines.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct LinesCodec {
+ // Stored index of the next index to examine for a `\n` character.
+ // This is used to optimize searching.
+ // For example, if `decode` was called with `abc`, it would hold `3`,
+ // because that is the next index to examine.
+ // The next time `decode` is called with `abcde\n`, the method will
+ // only look at `de\n` before returning.
+ next_index: usize,
+}
+
+impl LinesCodec {
+ /// Returns a `LinesCodec` for splitting up data into lines.
+ pub fn new() -> LinesCodec {
+ LinesCodec { next_index: 0 }
+ }
+}
+
+fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
+ str::from_utf8(buf).map_err(|_|
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Unable to decode input as UTF8"))
+}
+
+fn without_carriage_return(s: &[u8]) -> &[u8] {
+ if let Some(&b'\r') = s.last() {
+ &s[..s.len() - 1]
+ } else {
+ s
+ }
+}
+
+impl Decoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ if let Some(newline_offset) =
+ buf[self.next_index..].iter().position(|b| *b == b'\n')
+ {
+ let newline_index = newline_offset + self.next_index;
+ let line = buf.split_to(newline_index + 1);
+ let line = &line[..line.len()-1];
+ let line = without_carriage_return(line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Ok(Some(line.to_string()))
+ } else {
+ self.next_index = buf.len();
+ Ok(None)
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ Ok(match self.decode(buf)? {
+ Some(frame) => Some(frame),
+ None => {
+ // No terminating newline - return remaining data, if any
+ if buf.is_empty() || buf == &b"\r"[..] {
+ None
+ } else {
+ let line = buf.take();
+ let line = without_carriage_return(&line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Some(line.to_string())
+ }
+ }
+ })
+ }
+}
+
+impl Encoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(line.len() + 1);
+ buf.put(line);
+ buf.put_u8(b'\n');
+ Ok(())
+ }
+}
diff --git a/tokio-codec/tests/codecs.rs b/tokio-codec/tests/codecs.rs
new file mode 100644
index 00000000..6359e7c7
--- /dev/null
+++ b/tokio-codec/tests/codecs.rs
@@ -0,0 +1,76 @@
+extern crate tokio_codec;
+extern crate bytes;
+
+use bytes::{BytesMut, Bytes, BufMut};
+use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder};
+
+#[test]
+fn bytes_decoder() {
+ let mut codec = BytesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.put_slice(b"abc");
+ assert_eq!("abc", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"a");
+ assert_eq!("a", codec.decode(buf).unwrap().unwrap());
+}
+
+#[test]
+fn bytes_encoder() {
+ let mut codec = BytesCodec::new();
+
+ // Default capacity of BytesMut
+ #[cfg(target_pointer_width = "64")]
+ const INLINE_CAP: usize = 4 * 8 - 1;
+ #[cfg(target_pointer_width = "32")]
+ const INLINE_CAP: usize = 4 * 4 - 1;
+
+ let mut buf = BytesMut::new();
+ codec.encode(Bytes::from_static(&[0; INLINE_CAP + 1]), &mut buf).unwrap();
+
+ // Default capacity of Framed Read
+ const INITIAL_CAPACITY: usize = 8 * 1024;
+
+ let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY);
+ codec.encode(Bytes::from_static(&[0; INITIAL_CAPACITY + 1]), &mut buf).unwrap();
+}
+
+#[test]
+fn lines_decoder() {
+ let mut codec = LinesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.reserve(200);
+ buf.put("line 1\nline 2\r\nline 3\n\r\n\r");
+ assert_eq!("line 1", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 3", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+ buf.put("k");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+}
+
+#[test]
+fn lines_encoder() {
+ let mut codec = BytesCodec::new();
+
+ // Default capacity of BytesMut
+ #[cfg(target_pointer_width = "64")]
+ const INLINE_CAP: usize = 4 * 8 - 1;
+ #[cfg(target_pointer_width = "32")]
+ const INLINE_CAP: usize = 4 * 4 - 1;
+
+ let mut buf = BytesMut::new();
+ codec.encode(Bytes::from_static(&[b'a'; INLINE_CAP + 1]), &mut buf).unwrap();
+
+ // Default capacity of Framed Read
+ const INITIAL_CAPACITY: usize = 8 * 1024;
+
+ let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY);
+ codec.encode(Bytes::from_static(&[b'a'; INITIAL_CAPACITY + 1]), &mut buf).unwrap();
+}
diff --git a/tokio-codec/tests/framed.rs b/tokio-codec/tests/framed.rs
new file mode 100644
index 00000000..fdedd560
--- /dev/null
+++ b/tokio-codec/tests/framed.rs
@@ -0,0 +1,97 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use futures::{Stream, Future};
+use std::io::{self, Read};
+use tokio_codec::{Framed, FramedParts, Decoder, Encoder};
+use tokio_io::AsyncRead;
+use bytes::{BytesMut, Buf, BufMut, IntoBuf, BigEndian};
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+struct U32Codec;
+
+impl Decoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).into_buf().get_u32_be();
+ Ok(Some(n))
+ }
+}
+
+impl Encoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32_be(item);
+ Ok(())
+ }
+}
+
+struct DontReadIntoThis;
+
+impl Read for DontReadIntoThis {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "Read into something you weren't supposed to."))
+ }
+}
+
+impl AsyncRead for DontReadIntoThis {}
+
+#[test]
+fn can_read_from_existing_buf() {
+ let parts = FramedParts {
+ inner: DontReadIntoThis,
+ readbuf: vec![0, 0, 0, 42].into(),
+ writebuf: BytesMut::with_capacity(0),
+ };
+ let framed = Framed::from_parts(parts, U32Codec);
+
+ let num = framed
+ .into_future()
+ .map(|(first_num, _)| {
+ first_num.unwrap()
+ })
+ .wait()
+ .map_err(|e| e.0)
+ .unwrap();
+ assert_eq!(num, 42);
+}
+
+#[test]
+fn external_buf_grows_to_init() {
+ let parts = FramedParts {
+ inner: DontReadIntoThis,
+ readbuf: vec![0, 0, 0, 42].into(),
+ writebuf: BytesMut::with_capacity(0),
+ };
+ let framed = Framed::from_parts(parts, U32Codec);
+ let FramedParts { readbuf, .. } = framed.into_parts();
+
+ assert_eq!(readbuf.capacity(), INITIAL_CAPACITY);
+}
+
+#[test]
+fn external_buf_does_not_shrink() {
+ let parts = FramedParts {
+ inner: DontReadIntoThis,
+ readbuf: vec![0; INITIAL_CAPACITY * 2].into(),
+ writebuf: BytesMut::with_capacity(0),
+ };
+ let framed = Framed::from_parts(parts, U32Codec);
+ let FramedParts { readbuf, .. } = framed.into_parts();
+
+ assert_eq!(readbuf.capacity(), INITIAL_CAPACITY * 2);
+}
diff --git a/tokio-codec/tests/framed_read.rs b/tokio-codec/tests/framed_read.rs
new file mode 100644
index 00000000..80dfa5e5
--- /dev/null
+++ b/tokio-codec/tests/framed_read.rs
@@ -0,0 +1,216 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use tokio_io::AsyncRead;
+use tokio_codec::{FramedRead, Decoder};
+
+use bytes::{BytesMut, Buf, IntoBuf, BigEndian};
+use futures::Stream;
+use futures::Async::{Ready, NotReady};
+
+use std::io::{self, Read};
+use std::collections::VecDeque;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+struct U32Decoder;
+
+impl Decoder for U32Decoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).into_buf().get_u32::<BigEndian>();
+ Ok(Some(n))
+ }
+}
+
+#[test]
+fn read_multi_frame_in_packet() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_multi_frame_across_packets() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ Ok(b"\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_not_ready() {
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_partial_then_not_ready() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_err() {
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn read_partial_then_err() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn read_partial_would_block_then_err() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn huge_size() {
+ let data = [0; 32 * 1024];
+
+ let mut framed = FramedRead::new(&data[..], BigDecoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+
+ struct BigDecoder;
+
+ impl Decoder for BigDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 32 * 1024 {
+ return Ok(None);
+ }
+ buf.split_to(32 * 1024);
+ Ok(Some(0))
+ }
+ }
+}
+
+#[test]
+fn data_remaining_is_error() {
+ let data = [0; 5];
+
+ let mut framed = FramedRead::new(&data[..], U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert!(framed.poll().is_err());
+}
+
+#[test]
+fn multi_frames_on_eof() {
+ struct MyDecoder(Vec<u32>);
+
+ impl Decoder for MyDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ unreachable!();
+ }
+
+ fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if self.0.is_empty() {
+ return Ok(None);
+ }
+
+ Ok(Some(self.0.remove(0)))
+ }
+ }
+
+ let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(3)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl Read for Mock {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ debug_assert!(dst.len() >= data.len());
+ dst[..data.len()].copy_from_slice(&data[..]);
+ Ok(data.len())
+ }
+ Some(Err(e)) => Err(e),
+ None => Ok(0),
+ }
+ }
+}
+
+impl AsyncRead for Mock {
+}
diff --git a/tokio-codec/tests/framed_write.rs b/tokio-codec/tests/framed_write.rs
new file mode 100644
index 00000000..137fb5be
--- /dev/null
+++ b/tokio-codec/tests/framed_write.rs
@@ -0,0 +1,134 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use tokio_io::AsyncWrite;
+use tokio_codec::{Encoder, FramedWrite};
+
+use futures::{Sink, Poll};
+use bytes::{BytesMut, BufMut, BigEndian};
+
+use std::io::{self, Write};
+use std::collections::VecDeque;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+struct U32Encoder;
+
+impl Encoder for U32Encoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32_be(item);
+ Ok(())
+ }
+}
+
+#[test]
+fn write_multi_frame_in_packet() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+ assert!(framed.start_send(0).unwrap().is_ready());
+ assert!(framed.start_send(1).unwrap().is_ready());
+ assert!(framed.start_send(2).unwrap().is_ready());
+
+ // Nothing written yet
+ assert_eq!(1, framed.get_ref().calls.len());
+
+ // Flush the writes
+ assert!(framed.poll_complete().unwrap().is_ready());
+
+ assert_eq!(0, framed.get_ref().calls.len());
+}
+
+#[test]
+fn write_hits_backpressure() {
+ const ITER: usize = 2 * 1024;
+
+ let mut mock = mock! {
+ // Block the `ITER`th write
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
+ Ok(b"".to_vec()),
+ };
+
+ for i in 0..(ITER + 1) {
+ let mut b = BytesMut::with_capacity(4);
+ b.put_u32_be(i as u32);
+
+ // Append to the end
+ match mock.calls.back_mut().unwrap() {
+ &mut Ok(ref mut data) => {
+ // Write in 2kb chunks
+ if data.len() < ITER {
+ data.extend_from_slice(&b[..]);
+ continue;
+ }
+ }
+ _ => unreachable!(),
+ }
+
+ // Push a new new chunk
+ mock.calls.push_back(Ok(b[..].to_vec()));
+ }
+
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+
+ for i in 0..ITER {
+ assert!(framed.start_send(i as u32).unwrap().is_ready());
+ }
+
+ // This should reject
+ assert!(!framed.start_send(ITER as u32).unwrap().is_ready());
+
+ // This should succeed and start flushing the buffer.
+ assert!(framed.start_send(ITER as u32).unwrap().is_ready());
+
+ // Flush the rest of the buffer
+ assert!(framed.poll_complete().unwrap().is_ready());
+
+ // Ensure the mock is empty
+ assert_eq!(0, framed.get_ref().calls.len());
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl Write for Mock {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ assert!(src.len() >= data.len());
+ assert_eq!(&data[..], &src[..data.len()]);
+ Ok(data.len())
+ }
+ Some(Err(e)) => Err(e),
+ None => panic!("unexpected write; {:?}", src),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}