diff options
author | Bryan Burgers <bryan@burgers.io> | 2018-06-04 22:36:06 -0500 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-06-04 20:36:06 -0700 |
commit | f723d100871e025e4bdd2f47397c9b089e666ce0 (patch) | |
tree | 7d67d7f8bee561f6f04b9d1222dd02719fb47b12 | |
parent | 3d7263d3a0b73ab35d63b45a6524bde7251851e8 (diff) |
Create tokio-codec (#360)
Create a new tokio-codec crate with many of the contents of
`tokio_io::codec`.
42 files changed, 1100 insertions, 23 deletions
@@ -23,6 +23,7 @@ keywords = ["io", "async", "non-blocking", "futures"] members = [ "./", + "tokio-codec", "tokio-executor", "tokio-fs", "tokio-io", @@ -39,6 +40,7 @@ travis-ci = { repository = "tokio-rs/tokio" } appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" } [dependencies] +tokio-codec = { version = "0.1.0", path = "tokio-codec" } tokio-io = { version = "0.1.6", path = "tokio-io" } tokio-executor = { version = "0.1.2", path = "tokio-executor" } tokio-reactor = { version = "0.1.1", path = "tokio-reactor" } diff --git a/examples/connect.rs b/examples/connect.rs index f44a5c1e..5a6b5151 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -17,6 +17,7 @@ #![deny(warnings)] extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; extern crate futures; extern crate bytes; @@ -82,7 +83,7 @@ fn main() { mod codec { use std::io; use bytes::{BufMut, BytesMut}; - use tokio_io::codec::{Encoder, Decoder}; + use tokio_codec::{Encoder, Decoder}; /// A simple `Codec` implementation that just ships bytes around. /// @@ -120,6 +121,7 @@ mod codec { mod tcp { use tokio; + use tokio_codec::Decoder; use tokio::net::TcpStream; use tokio::prelude::*; @@ -151,7 +153,7 @@ mod tcp { // to the TCP stream. This is done to ensure that happens concurrently // with us reading data from the stream. Box::new(tcp.map(move |stream| { - let (sink, stream) = stream.framed(Bytes).split(); + let (sink, stream) = Bytes.framed(stream).split(); tokio::spawn(stdin.forward(sink).then(|result| { if let Err(e) = result { diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index bb054948..5dc53324 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,9 +55,10 @@ #![deny(warnings)] extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; -use tokio_io::codec::BytesCodec; +use tokio_codec::{Decoder, BytesCodec}; use tokio::net::TcpListener; use tokio::prelude::*; @@ -99,8 +100,8 @@ fn main() { // We're parsing each socket with the `BytesCodec` included in `tokio_io`, // and then we `split` each codec into the reader/writer halves. // - // See https://docs.rs/tokio-io/0.1/src/tokio_io/codec/bytes_codec.rs.html - let framed = socket.framed(BytesCodec::new()); + // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html + let framed = BytesCodec::new().framed(socket); let (_writer, reader) = framed.split(); let processor = reader diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 9bf3f27a..d56ff96f 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -21,6 +21,7 @@ extern crate serde_derive; extern crate serde_json; extern crate time; extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; use std::{env, fmt, io}; @@ -29,7 +30,7 @@ use std::net::SocketAddr; use tokio::net::{TcpStream, TcpListener}; use tokio::prelude::*; -use tokio_io::codec::{Encoder, Decoder}; +use tokio_codec::{Encoder, Decoder}; use bytes::BytesMut; use http::header::HeaderValue; @@ -55,10 +56,10 @@ fn main() { } fn process(socket: TcpStream) { - let (tx, rx) = socket + let (tx, rx) = // Frame the socket using the `Http` protocol. This maps the TCP socket // to a Stream + Sink of HTTP frames. - .framed(Http) + Http.framed(socket) // This splits a single `Stream + Sink` value into two separate handles // that can be used independently (even on different tasks or threads). .split(); diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 063083e5..b273a360 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,6 +9,7 @@ #![deny(warnings)] extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; extern crate env_logger; @@ -16,7 +17,7 @@ use std::net::SocketAddr; use tokio::prelude::*; use tokio::net::{UdpSocket, UdpFramed}; -use tokio_io::codec::BytesCodec; +use tokio_codec::BytesCodec; fn main() { let _ = env_logger::init(); diff --git a/tests/line-frames.rs b/tests/line-frames.rs index c6437d84..4d42f508 100644 --- a/tests/line-frames.rs +++ b/tests/line-frames.rs @@ -1,6 +1,7 @@ extern crate env_logger; extern crate futures; extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; extern crate tokio_threadpool; extern crate bytes; @@ -11,9 +12,8 @@ use std::net::Shutdown; use bytes::{BytesMut, BufMut}; use futures::{Future, Stream, Sink}; use tokio::net::{TcpListener, TcpStream}; -use tokio_io::codec::{Encoder, Decoder}; +use tokio_codec::{Encoder, Decoder}; use tokio_io::io::{write_all, read}; -use tokio_io::AsyncRead; use tokio_threadpool::Builder; pub struct LineCodec; @@ -61,7 +61,7 @@ fn echo() { let addr = listener.local_addr().unwrap(); let sender = pool.sender().clone(); let srv = listener.incoming().for_each(move |socket| { - let (sink, stream) = socket.framed(LineCodec).split(); + let (sink, stream) = LineCodec.framed(socket).split(); sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap(); Ok(()) }); diff --git a/tokio-codec/CHANGELOG.md b/tokio-codec/CHANGELOG.md new file mode 100644 index 00000000..f96d5068 --- /dev/null +++ b/tokio-codec/CHANGELOG.md @@ -0,0 +1,3 @@ +# Unreleased + +* Initial release (#353) diff --git a/tokio-codec/Cargo.toml b/tokio-codec/Cargo.toml new file mode 100644 index 00000000..1d335b83 --- /dev/null +++ b/tokio-codec/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "tokio-codec" + +# When releasing to crates.io: +# - Update html_root_url. +# - Update CHANGELOG.md. +# - Create "v0.1.x" git tag. +version = "0.1.0" +authors = ["Carl Lerche <me@carllerche.com>", "Bryan Burgers <bryan@burgers.io>"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-codec/0.1" +description = """ +Utilities for encoding and decoding frames. +""" +categories = ["asynchronous"] + +[dependencies] +tokio-io = { version = "0.1.6", path = "../tokio-io" } +bytes = "0.4.7" +futures = "0.1.18" diff --git a/tokio-codec/LICENSE b/tokio-codec/LICENSE new file mode 100644 index 00000000..38c1e27b --- /dev/null +++ b/tokio-codec/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tokio-codec/README.md b/tokio-codec/README.md new file mode 100644 index 00000000..e0c1a385 --- /dev/null +++ b/tokio-codec/README.md @@ -0,0 +1,35 @@ +# tokio-codec + +Utilities for encoding and decoding frames. + +[Documentation](https://docs.rs/tokio-codec) + +## Usage + +First, add this to your `Cargo.toml`: + +```toml +[dependencies] +tokio-codec = "0.1" +``` + +Next, add this to your crate: + +```rust +extern crate tokio_codec; +``` + +You can find extensive documentation and examples about how to use this crate +online at [https://tokio.rs](https://tokio.rs). The [API +documentation](https://docs.rs/tokio-codec) is also a great place to get started +for the nitty-gritty. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tokio-codec/src/bytes_codec.rs b/tokio-codec/src/bytes_codec.rs new file mode 100644 index 00000000..d535aef6 --- /dev/null +++ b/tokio-codec/src/bytes_codec.rs @@ -0,0 +1,37 @@ +use bytes::{Bytes, BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { BytesCodec(()) } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/tokio-codec/src/lib.rs b/tokio-codec/src/lib.rs new file mode 100644 index 00000000..2b26b542 --- /dev/null +++ b/tokio-codec/src/lib.rs @@ -0,0 +1,32 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +#![deny(missing_docs, missing_debug_implementations, warnings)] +#![doc(html_root_url = "https://docs.rs/tokio-codec/0.1.0")] + +extern crate bytes; +extern crate tokio_io; + +mod bytes_codec; +mod lines_codec; + +pub use tokio_io::_tokio_codec::{ + Decoder, + Encoder, + Framed, + FramedParts, + FramedRead, + FramedWrite, +}; + +pub use bytes_codec::BytesCodec; +pub use lines_codec::LinesCodec; diff --git a/tokio-codec/src/lines_codec.rs b/tokio-codec/src/lines_codec.rs new file mode 100644 index 00000000..bf4135b8 --- /dev/null +++ b/tokio-codec/src/lines_codec.rs @@ -0,0 +1,89 @@ +use bytes::{BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::{io, str}; + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf).map_err(|_| + io::Error::new( + io::ErrorKind::InvalidData, + "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + if let Some(newline_offset) = + buf[self.next_index..].iter().position(|b| *b == b'\n') + { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len()-1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} diff --git a/tokio-io/tests/codecs.rs b/tokio-codec/tests/codecs.rs index 5d63242f..6359e7c7 100644 --- a/tokio-io/tests/codecs.rs +++ b/tokio-codec/tests/codecs.rs @@ -1,8 +1,8 @@ -extern crate tokio_io; +extern crate tokio_codec; extern crate bytes; use bytes::{BytesMut, Bytes, BufMut}; -use tokio_io::codec::{BytesCodec, LinesCodec, Decoder, Encoder}; +use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder}; #[test] fn bytes_decoder() { diff --git a/tokio-io/tests/framed.rs b/tokio-codec/tests/framed.rs index 660cc5d0..fdedd560 100644 --- a/tokio-io/tests/framed.rs +++ b/tokio-codec/tests/framed.rs @@ -1,10 +1,11 @@ +extern crate tokio_codec; extern crate tokio_io; extern crate bytes; extern crate futures; use futures::{Stream, Future}; use std::io::{self, Read}; -use tokio_io::codec::{Framed, FramedParts, Decoder, Encoder}; +use tokio_codec::{Framed, FramedParts, Decoder, Encoder}; use tokio_io::AsyncRead; use bytes::{BytesMut, Buf, BufMut, IntoBuf, BigEndian}; diff --git a/tokio-io/tests/framed_read.rs b/tokio-codec/tests/framed_read.rs index 0dd32737..80dfa5e5 100644 --- a/tokio-io/tests/framed_read.rs +++ b/tokio-codec/tests/framed_read.rs @@ -1,9 +1,10 @@ +extern crate tokio_codec; extern crate tokio_io; extern crate bytes; extern crate futures; use tokio_io::AsyncRead; -use tokio_io::codec::{FramedRead, Decoder}; +use tokio_codec::{FramedRead, Decoder}; use bytes::{BytesMut, Buf, IntoBuf, BigEndian}; use futures::Stream; diff --git a/tokio-io/tests/framed_write.rs b/tokio-codec/tests/framed_write.rs index 2db21b48..137fb5be 100644 --- a/tokio-io/tests/framed_write.rs +++ b/tokio-codec/tests/framed_write.rs @@ -1,9 +1,10 @@ +extern crate tokio_codec; extern crate tokio_io; extern crate bytes; extern crate futures; use tokio_io::AsyncWrite; -use tokio_io::codec::{Encoder, FramedWrite}; +use tokio_codec::{Encoder, FramedWrite}; use futures::{Sink, Poll}; use bytes::{BytesMut, BufMut, BigEndian}; diff --git a/tokio-fs/CHANGELOG.md b/tokio-fs/CHANGELOG.md index 91f5c842..fb3e7dab 100644 --- a/tokio-fs/CHANGELOG.md +++ b/tokio-fs/CHANGELOG.md @@ -1,3 +1,7 @@ +# Unreleased + +* Use `tokio-codec` in examples + # 0.1.0 (May 2, 2018) * Initial release diff --git a/tokio-fs/Cargo.toml b/tokio-fs/Cargo.toml index 922eca16..a241e028 100644 --- a/tokio-fs/Cargo.toml +++ b/tokio-fs/Cargo.toml @@ -27,3 +27,4 @@ tokio-io = { version = "0.1.6", path = "../tokio-io" } rand = "0.4.2" tempdir = "0.3.7" tokio-io = { version = "0.1.6", path = "../tokio-io" } +tokio-codec = { version = "0.1.0", path = "../tokio-codec" } diff --git a/tokio-fs/examples/std-echo.rs b/tokio-fs/examples/std-echo.rs index cdfdf008..83efa66e 100644 --- a/tokio-fs/examples/std-echo.rs +++ b/tokio-fs/examples/std-echo.rs @@ -1,12 +1,13 @@ //! Echo everything received on STDIN to STDOUT. +#![deny(deprecated, warnings)] extern crate futures; extern crate tokio_fs; -extern crate tokio_io; +extern crate tokio_codec; extern crate tokio_threadpool; use tokio_fs::{stdin, stdout, stderr}; -use tokio_io::codec::{FramedRead, FramedWrite, LinesCodec}; +use tokio_codec::{FramedRead, FramedWrite, LinesCodec}; use tokio_threadpool::Builder; use futures::{Future, Stream, Sink}; diff --git a/tokio-io/CHANGELOG.md b/tokio-io/CHANGELOG.md index c96c4d1d..428bdf00 100644 --- a/tokio-io/CHANGELOG.md +++ b/tokio-io/CHANGELOG.md @@ -1,3 +1,7 @@ +# Unreleased + +* Move `codec::{Encode, Decode, Framed*}` into `tokio-codec` (#353) + # 0.1.6 (March 09, 2018) * Add native endian builder fn to length_delimited (#144) diff --git a/tokio-io/src/_tokio_codec/decoder.rs b/tokio-io/src/_tokio_codec/decoder.rs new file mode 100644 index 00000000..9c9fbacb --- /dev/null +++ b/tokio-io/src/_tokio_codec/decoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implmentation of Encoder in tokio_io. + +pub use codec::Decoder; diff --git a/tokio-io/src/_tokio_codec/encoder.rs b/tokio-io/src/_tokio_codec/encoder.rs new file mode 100644 index 00000000..9cbe054d --- /dev/null +++ b/tokio-io/src/_tokio_codec/encoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implmentation of Encoder in tokio_io. + +pub use codec::Encoder; diff --git a/tokio-io/src/_tokio_codec/framed.rs b/tokio-io/src/_tokio_codec/framed.rs new file mode 100644 index 00000000..de360dba --- /dev/null +++ b/tokio-io/src/_tokio_codec/framed.rs @@ -0,0 +1,248 @@ +#![allow(deprecated)] + +use std::io::{self, Read, Write}; +use std::fmt; + +use {AsyncRead, AsyncWrite}; +use codec::{Decoder, Encoder}; +use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; + +use futures::{Stream, Sink, StartSend, Poll}; +use bytes::{BytesMut}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +pub struct Fuse<T, U>(pub T, pub U); + +/// Provides a `Stream` and `Sink` interface for reading and writing to this +/// `Io` object, using `Decode` and `Encode` to read and write the raw data. +/// +/// Raw I/O objects work with byte sequences, but higher-level code usually +/// wants to batch these into meaningful chunks, called "frames". This +/// method layers framing on top of an I/O object, by using the `Codec` +/// traits to handle encoding and decoding of messages frames. Note that +/// the incoming and outgoing frame types may be distinct. +/// +/// This function returns a *single* object that is both `Stream` and +/// `Sink`; grouping this into a single object is often useful for layering +/// things like gzip or TLS, which require both read and write access to the +/// underlying object. +/// +/// If you want to work more directly with the streams and sink, consider +/// calling `split` on the `Framed` returned by this method, which will +/// break them into separate objects, allowing them to interact more easily. +pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U> + where T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U> + { + Framed { + inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T> { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf } + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data, and also the current codec state. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + /// + /// Note that this function will be removed once the codec has been + /// integrated into `FramedParts` in a new version (see + /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)). + pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writ |