summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Burgers <bryan@burgers.io>2018-06-04 22:36:06 -0500
committerCarl Lerche <me@carllerche.com>2018-06-04 20:36:06 -0700
commitf723d100871e025e4bdd2f47397c9b089e666ce0 (patch)
tree7d67d7f8bee561f6f04b9d1222dd02719fb47b12
parent3d7263d3a0b73ab35d63b45a6524bde7251851e8 (diff)
Create tokio-codec (#360)
Create a new tokio-codec crate with many of the contents of `tokio_io::codec`.
-rw-r--r--Cargo.toml2
-rw-r--r--examples/connect.rs6
-rw-r--r--examples/print_each_packet.rs7
-rw-r--r--examples/tinyhttp.rs7
-rw-r--r--examples/udp-codec.rs3
-rw-r--r--tests/line-frames.rs6
-rw-r--r--tokio-codec/CHANGELOG.md3
-rw-r--r--tokio-codec/Cargo.toml22
-rw-r--r--tokio-codec/LICENSE25
-rw-r--r--tokio-codec/README.md35
-rw-r--r--tokio-codec/src/bytes_codec.rs37
-rw-r--r--tokio-codec/src/lib.rs32
-rw-r--r--tokio-codec/src/lines_codec.rs89
-rw-r--r--tokio-codec/tests/codecs.rs (renamed from tokio-io/tests/codecs.rs)4
-rw-r--r--tokio-codec/tests/framed.rs (renamed from tokio-io/tests/framed.rs)3
-rw-r--r--tokio-codec/tests/framed_read.rs (renamed from tokio-io/tests/framed_read.rs)3
-rw-r--r--tokio-codec/tests/framed_write.rs (renamed from tokio-io/tests/framed_write.rs)3
-rw-r--r--tokio-fs/CHANGELOG.md4
-rw-r--r--tokio-fs/Cargo.toml1
-rw-r--r--tokio-fs/examples/std-echo.rs5
-rw-r--r--tokio-io/CHANGELOG.md4
-rw-r--r--tokio-io/src/_tokio_codec/decoder.rs3
-rw-r--r--tokio-io/src/_tokio_codec/encoder.rs3
-rw-r--r--tokio-io/src/_tokio_codec/framed.rs248
-rw-r--r--tokio-io/src/_tokio_codec/framed_read.rs214
-rw-r--r--tokio-io/src/_tokio_codec/framed_write.rs237
-rw-r--r--tokio-io/src/_tokio_codec/mod.rs36
-rw-r--r--tokio-io/src/async_read.rs3
-rw-r--r--tokio-io/src/codec/bytes_codec.rs3
-rw-r--r--tokio-io/src/codec/decoder.rs31
-rw-r--r--tokio-io/src/codec/encoder.rs3
-rw-r--r--tokio-io/src/codec/lines_codec.rs3
-rw-r--r--tokio-io/src/codec/mod.rs8
-rw-r--r--tokio-io/src/framed.rs6
-rw-r--r--tokio-io/src/framed_read.rs6
-rw-r--r--tokio-io/src/framed_write.rs6
-rw-r--r--tokio-io/src/length_delimited.rs2
-rw-r--r--tokio-io/src/lib.rs1
-rw-r--r--tokio-udp/Cargo.toml1
-rw-r--r--tokio-udp/src/frame.rs2
-rw-r--r--tokio-udp/src/lib.rs1
-rw-r--r--tokio-udp/tests/udp.rs5
42 files changed, 1100 insertions, 23 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 1569d4eb..70a215f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,6 +23,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
members = [
"./",
+ "tokio-codec",
"tokio-executor",
"tokio-fs",
"tokio-io",
@@ -39,6 +40,7 @@ travis-ci = { repository = "tokio-rs/tokio" }
appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" }
[dependencies]
+tokio-codec = { version = "0.1.0", path = "tokio-codec" }
tokio-io = { version = "0.1.6", path = "tokio-io" }
tokio-executor = { version = "0.1.2", path = "tokio-executor" }
tokio-reactor = { version = "0.1.1", path = "tokio-reactor" }
diff --git a/examples/connect.rs b/examples/connect.rs
index f44a5c1e..5a6b5151 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -17,6 +17,7 @@
#![deny(warnings)]
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
extern crate futures;
extern crate bytes;
@@ -82,7 +83,7 @@ fn main() {
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
- use tokio_io::codec::{Encoder, Decoder};
+ use tokio_codec::{Encoder, Decoder};
/// A simple `Codec` implementation that just ships bytes around.
///
@@ -120,6 +121,7 @@ mod codec {
mod tcp {
use tokio;
+ use tokio_codec::Decoder;
use tokio::net::TcpStream;
use tokio::prelude::*;
@@ -151,7 +153,7 @@ mod tcp {
// to the TCP stream. This is done to ensure that happens concurrently
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
- let (sink, stream) = stream.framed(Bytes).split();
+ let (sink, stream) = Bytes.framed(stream).split();
tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
index bb054948..5dc53324 100644
--- a/examples/print_each_packet.rs
+++ b/examples/print_each_packet.rs
@@ -55,9 +55,10 @@
#![deny(warnings)]
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
-use tokio_io::codec::BytesCodec;
+use tokio_codec::{Decoder, BytesCodec};
use tokio::net::TcpListener;
use tokio::prelude::*;
@@ -99,8 +100,8 @@ fn main() {
// We're parsing each socket with the `BytesCodec` included in `tokio_io`,
// and then we `split` each codec into the reader/writer halves.
//
- // See https://docs.rs/tokio-io/0.1/src/tokio_io/codec/bytes_codec.rs.html
- let framed = socket.framed(BytesCodec::new());
+ // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html
+ let framed = BytesCodec::new().framed(socket);
let (_writer, reader) = framed.split();
let processor = reader
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 9bf3f27a..d56ff96f 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -21,6 +21,7 @@ extern crate serde_derive;
extern crate serde_json;
extern crate time;
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
use std::{env, fmt, io};
@@ -29,7 +30,7 @@ use std::net::SocketAddr;
use tokio::net::{TcpStream, TcpListener};
use tokio::prelude::*;
-use tokio_io::codec::{Encoder, Decoder};
+use tokio_codec::{Encoder, Decoder};
use bytes::BytesMut;
use http::header::HeaderValue;
@@ -55,10 +56,10 @@ fn main() {
}
fn process(socket: TcpStream) {
- let (tx, rx) = socket
+ let (tx, rx) =
// Frame the socket using the `Http` protocol. This maps the TCP socket
// to a Stream + Sink of HTTP frames.
- .framed(Http)
+ Http.framed(socket)
// This splits a single `Stream + Sink` value into two separate handles
// that can be used independently (even on different tasks or threads).
.split();
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 063083e5..b273a360 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -9,6 +9,7 @@
#![deny(warnings)]
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
extern crate env_logger;
@@ -16,7 +17,7 @@ use std::net::SocketAddr;
use tokio::prelude::*;
use tokio::net::{UdpSocket, UdpFramed};
-use tokio_io::codec::BytesCodec;
+use tokio_codec::BytesCodec;
fn main() {
let _ = env_logger::init();
diff --git a/tests/line-frames.rs b/tests/line-frames.rs
index c6437d84..4d42f508 100644
--- a/tests/line-frames.rs
+++ b/tests/line-frames.rs
@@ -1,6 +1,7 @@
extern crate env_logger;
extern crate futures;
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_threadpool;
extern crate bytes;
@@ -11,9 +12,8 @@ use std::net::Shutdown;
use bytes::{BytesMut, BufMut};
use futures::{Future, Stream, Sink};
use tokio::net::{TcpListener, TcpStream};
-use tokio_io::codec::{Encoder, Decoder};
+use tokio_codec::{Encoder, Decoder};
use tokio_io::io::{write_all, read};
-use tokio_io::AsyncRead;
use tokio_threadpool::Builder;
pub struct LineCodec;
@@ -61,7 +61,7 @@ fn echo() {
let addr = listener.local_addr().unwrap();
let sender = pool.sender().clone();
let srv = listener.incoming().for_each(move |socket| {
- let (sink, stream) = socket.framed(LineCodec).split();
+ let (sink, stream) = LineCodec.framed(socket).split();
sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap();
Ok(())
});
diff --git a/tokio-codec/CHANGELOG.md b/tokio-codec/CHANGELOG.md
new file mode 100644
index 00000000..f96d5068
--- /dev/null
+++ b/tokio-codec/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Unreleased
+
+* Initial release (#353)
diff --git a/tokio-codec/Cargo.toml b/tokio-codec/Cargo.toml
new file mode 100644
index 00000000..1d335b83
--- /dev/null
+++ b/tokio-codec/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "tokio-codec"
+
+# When releasing to crates.io:
+# - Update html_root_url.
+# - Update CHANGELOG.md.
+# - Create "v0.1.x" git tag.
+version = "0.1.0"
+authors = ["Carl Lerche <me@carllerche.com>", "Bryan Burgers <bryan@burgers.io>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-codec/0.1"
+description = """
+Utilities for encoding and decoding frames.
+"""
+categories = ["asynchronous"]
+
+[dependencies]
+tokio-io = { version = "0.1.6", path = "../tokio-io" }
+bytes = "0.4.7"
+futures = "0.1.18"
diff --git a/tokio-codec/LICENSE b/tokio-codec/LICENSE
new file mode 100644
index 00000000..38c1e27b
--- /dev/null
+++ b/tokio-codec/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2018 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/tokio-codec/README.md b/tokio-codec/README.md
new file mode 100644
index 00000000..e0c1a385
--- /dev/null
+++ b/tokio-codec/README.md
@@ -0,0 +1,35 @@
+# tokio-codec
+
+Utilities for encoding and decoding frames.
+
+[Documentation](https://docs.rs/tokio-codec)
+
+## Usage
+
+First, add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+tokio-codec = "0.1"
+```
+
+Next, add this to your crate:
+
+```rust
+extern crate tokio_codec;
+```
+
+You can find extensive documentation and examples about how to use this crate
+online at [https://tokio.rs](https://tokio.rs). The [API
+documentation](https://docs.rs/tokio-codec) is also a great place to get started
+for the nitty-gritty.
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/tokio-codec/src/bytes_codec.rs b/tokio-codec/src/bytes_codec.rs
new file mode 100644
index 00000000..d535aef6
--- /dev/null
+++ b/tokio-codec/src/bytes_codec.rs
@@ -0,0 +1,37 @@
+use bytes::{Bytes, BufMut, BytesMut};
+use tokio_io::_tokio_codec::{Encoder, Decoder};
+use std::io;
+
+/// A simple `Codec` implementation that just ships bytes around.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+ /// Creates a new `BytesCodec` for shipping around raw bytes.
+ pub fn new() -> BytesCodec { BytesCodec(()) }
+}
+
+impl Decoder for BytesCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl Encoder for BytesCodec {
+ type Item = Bytes;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
diff --git a/tokio-codec/src/lib.rs b/tokio-codec/src/lib.rs
new file mode 100644
index 00000000..2b26b542
--- /dev/null
+++ b/tokio-codec/src/lib.rs
@@ -0,0 +1,32 @@
+//! Utilities for encoding and decoding frames.
+//!
+//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as [transports].
+//!
+//! [`AsyncRead`]: #
+//! [`AsyncWrite`]: #
+//! [`Sink`]: #
+//! [`Stream`]: #
+//! [transports]: #
+
+#![deny(missing_docs, missing_debug_implementations, warnings)]
+#![doc(html_root_url = "https://docs.rs/tokio-codec/0.1.0")]
+
+extern crate bytes;
+extern crate tokio_io;
+
+mod bytes_codec;
+mod lines_codec;
+
+pub use tokio_io::_tokio_codec::{
+ Decoder,
+ Encoder,
+ Framed,
+ FramedParts,
+ FramedRead,
+ FramedWrite,
+};
+
+pub use bytes_codec::BytesCodec;
+pub use lines_codec::LinesCodec;
diff --git a/tokio-codec/src/lines_codec.rs b/tokio-codec/src/lines_codec.rs
new file mode 100644
index 00000000..bf4135b8
--- /dev/null
+++ b/tokio-codec/src/lines_codec.rs
@@ -0,0 +1,89 @@
+use bytes::{BufMut, BytesMut};
+use tokio_io::_tokio_codec::{Encoder, Decoder};
+use std::{io, str};
+
+/// A simple `Codec` implementation that splits up data into lines.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct LinesCodec {
+ // Stored index of the next index to examine for a `\n` character.
+ // This is used to optimize searching.
+ // For example, if `decode` was called with `abc`, it would hold `3`,
+ // because that is the next index to examine.
+ // The next time `decode` is called with `abcde\n`, the method will
+ // only look at `de\n` before returning.
+ next_index: usize,
+}
+
+impl LinesCodec {
+ /// Returns a `LinesCodec` for splitting up data into lines.
+ pub fn new() -> LinesCodec {
+ LinesCodec { next_index: 0 }
+ }
+}
+
+fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
+ str::from_utf8(buf).map_err(|_|
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Unable to decode input as UTF8"))
+}
+
+fn without_carriage_return(s: &[u8]) -> &[u8] {
+ if let Some(&b'\r') = s.last() {
+ &s[..s.len() - 1]
+ } else {
+ s
+ }
+}
+
+impl Decoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ if let Some(newline_offset) =
+ buf[self.next_index..].iter().position(|b| *b == b'\n')
+ {
+ let newline_index = newline_offset + self.next_index;
+ let line = buf.split_to(newline_index + 1);
+ let line = &line[..line.len()-1];
+ let line = without_carriage_return(line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Ok(Some(line.to_string()))
+ } else {
+ self.next_index = buf.len();
+ Ok(None)
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ Ok(match self.decode(buf)? {
+ Some(frame) => Some(frame),
+ None => {
+ // No terminating newline - return remaining data, if any
+ if buf.is_empty() || buf == &b"\r"[..] {
+ None
+ } else {
+ let line = buf.take();
+ let line = without_carriage_return(&line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Some(line.to_string())
+ }
+ }
+ })
+ }
+}
+
+impl Encoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(line.len() + 1);
+ buf.put(line);
+ buf.put_u8(b'\n');
+ Ok(())
+ }
+}
diff --git a/tokio-io/tests/codecs.rs b/tokio-codec/tests/codecs.rs
index 5d63242f..6359e7c7 100644
--- a/tokio-io/tests/codecs.rs
+++ b/tokio-codec/tests/codecs.rs
@@ -1,8 +1,8 @@
-extern crate tokio_io;
+extern crate tokio_codec;
extern crate bytes;
use bytes::{BytesMut, Bytes, BufMut};
-use tokio_io::codec::{BytesCodec, LinesCodec, Decoder, Encoder};
+use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder};
#[test]
fn bytes_decoder() {
diff --git a/tokio-io/tests/framed.rs b/tokio-codec/tests/framed.rs
index 660cc5d0..fdedd560 100644
--- a/tokio-io/tests/framed.rs
+++ b/tokio-codec/tests/framed.rs
@@ -1,10 +1,11 @@
+extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;
use futures::{Stream, Future};
use std::io::{self, Read};
-use tokio_io::codec::{Framed, FramedParts, Decoder, Encoder};
+use tokio_codec::{Framed, FramedParts, Decoder, Encoder};
use tokio_io::AsyncRead;
use bytes::{BytesMut, Buf, BufMut, IntoBuf, BigEndian};
diff --git a/tokio-io/tests/framed_read.rs b/tokio-codec/tests/framed_read.rs
index 0dd32737..80dfa5e5 100644
--- a/tokio-io/tests/framed_read.rs
+++ b/tokio-codec/tests/framed_read.rs
@@ -1,9 +1,10 @@
+extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;
use tokio_io::AsyncRead;
-use tokio_io::codec::{FramedRead, Decoder};
+use tokio_codec::{FramedRead, Decoder};
use bytes::{BytesMut, Buf, IntoBuf, BigEndian};
use futures::Stream;
diff --git a/tokio-io/tests/framed_write.rs b/tokio-codec/tests/framed_write.rs
index 2db21b48..137fb5be 100644
--- a/tokio-io/tests/framed_write.rs
+++ b/tokio-codec/tests/framed_write.rs
@@ -1,9 +1,10 @@
+extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;
use tokio_io::AsyncWrite;
-use tokio_io::codec::{Encoder, FramedWrite};
+use tokio_codec::{Encoder, FramedWrite};
use futures::{Sink, Poll};
use bytes::{BytesMut, BufMut, BigEndian};
diff --git a/tokio-fs/CHANGELOG.md b/tokio-fs/CHANGELOG.md
index 91f5c842..fb3e7dab 100644
--- a/tokio-fs/CHANGELOG.md
+++ b/tokio-fs/CHANGELOG.md
@@ -1,3 +1,7 @@
+# Unreleased
+
+* Use `tokio-codec` in examples
+
# 0.1.0 (May 2, 2018)
* Initial release
diff --git a/tokio-fs/Cargo.toml b/tokio-fs/Cargo.toml
index 922eca16..a241e028 100644
--- a/tokio-fs/Cargo.toml
+++ b/tokio-fs/Cargo.toml
@@ -27,3 +27,4 @@ tokio-io = { version = "0.1.6", path = "../tokio-io" }
rand = "0.4.2"
tempdir = "0.3.7"
tokio-io = { version = "0.1.6", path = "../tokio-io" }
+tokio-codec = { version = "0.1.0", path = "../tokio-codec" }
diff --git a/tokio-fs/examples/std-echo.rs b/tokio-fs/examples/std-echo.rs
index cdfdf008..83efa66e 100644
--- a/tokio-fs/examples/std-echo.rs
+++ b/tokio-fs/examples/std-echo.rs
@@ -1,12 +1,13 @@
//! Echo everything received on STDIN to STDOUT.
+#![deny(deprecated, warnings)]
extern crate futures;
extern crate tokio_fs;
-extern crate tokio_io;
+extern crate tokio_codec;
extern crate tokio_threadpool;
use tokio_fs::{stdin, stdout, stderr};
-use tokio_io::codec::{FramedRead, FramedWrite, LinesCodec};
+use tokio_codec::{FramedRead, FramedWrite, LinesCodec};
use tokio_threadpool::Builder;
use futures::{Future, Stream, Sink};
diff --git a/tokio-io/CHANGELOG.md b/tokio-io/CHANGELOG.md
index c96c4d1d..428bdf00 100644
--- a/tokio-io/CHANGELOG.md
+++ b/tokio-io/CHANGELOG.md
@@ -1,3 +1,7 @@
+# Unreleased
+
+* Move `codec::{Encode, Decode, Framed*}` into `tokio-codec` (#353)
+
# 0.1.6 (March 09, 2018)
* Add native endian builder fn to length_delimited (#144)
diff --git a/tokio-io/src/_tokio_codec/decoder.rs b/tokio-io/src/_tokio_codec/decoder.rs
new file mode 100644
index 00000000..9c9fbacb
--- /dev/null
+++ b/tokio-io/src/_tokio_codec/decoder.rs
@@ -0,0 +1,3 @@
+// For now, we need to keep the implmentation of Encoder in tokio_io.
+
+pub use codec::Decoder;
diff --git a/tokio-io/src/_tokio_codec/encoder.rs b/tokio-io/src/_tokio_codec/encoder.rs
new file mode 100644
index 00000000..9cbe054d
--- /dev/null
+++ b/tokio-io/src/_tokio_codec/encoder.rs
@@ -0,0 +1,3 @@
+// For now, we need to keep the implmentation of Encoder in tokio_io.
+
+pub use codec::Encoder;
diff --git a/tokio-io/src/_tokio_codec/framed.rs b/tokio-io/src/_tokio_codec/framed.rs
new file mode 100644
index 00000000..de360dba
--- /dev/null
+++ b/tokio-io/src/_tokio_codec/framed.rs
@@ -0,0 +1,248 @@
+#![allow(deprecated)]
+
+use std::io::{self, Read, Write};
+use std::fmt;
+
+use {AsyncRead, AsyncWrite};
+use codec::{Decoder, Encoder};
+use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
+use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
+
+use futures::{Stream, Sink, StartSend, Poll};
+use bytes::{BytesMut};
+
+/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
+pub struct Framed<T, U> {
+ inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
+}
+
+pub struct Fuse<T, U>(pub T, pub U);
+
+/// Provides a `Stream` and `Sink` interface for reading and writing to this
+/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+///
+/// Raw I/O objects work with byte sequences, but higher-level code usually
+/// wants to batch these into meaningful chunks, called "frames". This
+/// method layers framing on top of an I/O object, by using the `Codec`
+/// traits to handle encoding and decoding of messages frames. Note that
+/// the incoming and outgoing frame types may be distinct.
+///
+/// This function returns a *single* object that is both `Stream` and
+/// `Sink`; grouping this into a single object is often useful for layering
+/// things like gzip or TLS, which require both read and write access to the
+/// underlying object.
+///
+/// If you want to work more directly with the streams and sink, consider
+/// calling `split` on the `Framed` returned by this method, which will
+/// break them into separate objects, allowing them to interact more easily.
+pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
+ where T: AsyncRead + AsyncWrite,
+ U: Decoder + Encoder,
+{
+ Framed {
+ inner: framed_read2(framed_write2(Fuse(inner, codec))),
+ }
+}
+
+impl<T, U> Framed<T, U> {
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// This objects takes a stream and a readbuffer and a writebuffer. These field
+ /// can be obtained from an existing `Framed` with the `into_parts` method.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U>
+ {
+ Framed {
+ inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf),
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.get_ref().get_ref().0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.get_mut().get_mut().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner().into_inner().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+ /// with unprocessed data.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_parts(self) -> FramedParts<T> {
+ let (inner, readbuf) = self.inner.into_parts();
+ let (inner, writebuf) = inner.into_parts();
+ FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf }
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+ /// with unprocessed data, and also the current codec state.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ ///
+ /// Note that this function will be removed once the codec has been
+ /// integrated into `FramedParts` in a new version (see
+ /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
+ pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
+ let (inner, readbuf) = self.inner.into_parts();
+ let (inner, writ