diff options
author | Roman <humbug@deeptown.org> | 2018-02-12 21:10:19 +0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-02-12 09:10:19 -0800 |
commit | 35aeabd3ffda78fea756b786bbf0fe38f7e71d83 (patch) | |
tree | 68c5ce082e8d3736a7815afb49cac3cc018e7bbf | |
parent | 0b58bded7c0c04f07dd5418fdbaa0976369985ba (diff) |
Split codec code (#128)
* move src/codec.rs -> src/codec/mod.rs
* Move traits Encoder and Decoder from src/framed_{read|write}.rs into src/codec/{encoder,decoder}.rs
* Move LinesCodec and BytesCodec from src/codecs.rs into src/codec/{lines,bytes}_codec.rs
-rw-r--r-- | tokio-io/src/codec/bytes_codec.rs | 37 | ||||
-rw-r--r-- | tokio-io/src/codec/decoder.rs | 86 | ||||
-rw-r--r-- | tokio-io/src/codec/encoder.rs | 23 | ||||
-rw-r--r-- | tokio-io/src/codec/lines_codec.rs (renamed from tokio-io/src/codecs.rs) | 36 | ||||
-rw-r--r-- | tokio-io/src/codec/mod.rs (renamed from tokio-io/src/codec.rs) | 15 | ||||
-rw-r--r-- | tokio-io/src/framed.rs | 7 | ||||
-rw-r--r-- | tokio-io/src/framed_read.rs | 87 | ||||
-rw-r--r-- | tokio-io/src/framed_write.rs | 23 | ||||
-rw-r--r-- | tokio-io/src/lib.rs | 1 |
9 files changed, 166 insertions, 149 deletions
diff --git a/tokio-io/src/codec/bytes_codec.rs b/tokio-io/src/codec/bytes_codec.rs new file mode 100644 index 00000000..ce26b9e6 --- /dev/null +++ b/tokio-io/src/codec/bytes_codec.rs @@ -0,0 +1,37 @@ +use bytes::{Bytes, BufMut, BytesMut}; +use codec::{Encoder, Decoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { BytesCodec(()) } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/tokio-io/src/codec/decoder.rs b/tokio-io/src/codec/decoder.rs new file mode 100644 index 00000000..e4c3d79e --- /dev/null +++ b/tokio-io/src/codec/decoder.rs @@ -0,0 +1,86 @@ +use std::io; +use bytes::BytesMut; + +/// Decoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `Framed` or +/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has +/// already been buffered in `src` and decodes the data into a stream of +/// `Self::Item` frames. +/// +/// Implementations are able to track state on `self`, which enables +/// implementing stateful streaming parsers. In many cases, though, this type +/// will simply be a unit struct (e.g. `struct HttpDecoder`). +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// `From<io::Error>` is required in the interest of making `Error` suitable + /// for returning directly from a `FramedRead`, and to enable the default + /// implementation of `decode_eof` to yield an `io::Error` when the decoder + /// fails to consume all available data. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From<io::Error>; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedRead` whenever bytes are ready to be + /// parsed. The provided buffer of bytes is what's been read so far, and + /// this instance of `Decode` can determine whether an entire frame is in + /// the buffer and is ready to be returned. + /// + /// If an entire frame is available, then this instance will remove those + /// bytes from the buffer provided and return them as a decoded + /// frame. Note that removing bytes from the provided buffer doesn't always + /// necessarily copy the bytes, so this should be an efficient operation in + /// most circumstances. + /// + /// If the bytes look valid, but a frame isn't fully available yet, then + /// `Ok(None)` is returned. This indicates to the `Framed` instance that + /// it needs to read some more bytes before calling this method again. + /// + /// Note that the bytes provided may be empty. If a previous call to + /// `decode` consumed all the bytes in the buffer then `decode` will be + /// called again until it returns `None`, indicating that more bytes need to + /// be read. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + /// + /// Note that the `buf` argument may be empty. If a previous call to + /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be + /// called again until it returns `None`, indicating that there are no more + /// frames to yield. This behavior enables returning finalization frames + /// that may not be based on inbound data. + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + match try!(self.decode(buf)) { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "bytes remaining on stream").into()) + } + } + } + } +} diff --git a/tokio-io/src/codec/encoder.rs b/tokio-io/src/codec/encoder.rs new file mode 100644 index 00000000..55056c1c --- /dev/null +++ b/tokio-io/src/codec/encoder.rs @@ -0,0 +1,23 @@ +use std::io; +use bytes::BytesMut; + +/// Trait of helper objects to write out messages as bytes, for use with +/// `FramedWrite`. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + /// + /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>` + /// in the interest letting it return `Error`s directly. + type Error: From<io::Error>; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `item` into the byte buffer provided by `dst`. + /// The `dst` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) + -> Result<(), Self::Error>; +} diff --git a/tokio-io/src/codecs.rs b/tokio-io/src/codec/lines_codec.rs index f61448d1..2e1bea4a 100644 --- a/tokio-io/src/codecs.rs +++ b/tokio-io/src/codec/lines_codec.rs @@ -1,41 +1,7 @@ -use bytes::{Bytes, BufMut, BytesMut}; +use bytes::{BufMut, BytesMut}; use codec::{Encoder, Decoder}; use std::{io, str}; -/// A simple `Codec` implementation that just ships bytes around. -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub struct BytesCodec(()); - -impl BytesCodec { - /// Creates a new `BytesCodec` for shipping around raw bytes. - pub fn new() -> BytesCodec { BytesCodec(()) } -} - -impl Decoder for BytesCodec { - type Item = BytesMut; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { - if buf.len() > 0 { - let len = buf.len(); - Ok(Some(buf.split_to(len))) - } else { - Ok(None) - } - } -} - -impl Encoder for BytesCodec { - type Item = Bytes; - type Error = io::Error; - - fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { - buf.reserve(data.len()); - buf.put(data); - Ok(()) - } -} - /// A simple `Codec` implementation that splits up data into lines. #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct LinesCodec { diff --git a/tokio-io/src/codec.rs b/tokio-io/src/codec/mod.rs index aa91f224..3a80d6a3 100644 --- a/tokio-io/src/codec.rs +++ b/tokio-io/src/codec/mod.rs @@ -10,10 +10,19 @@ //! [`Stream`]: # //! [transports]: # -pub use codecs::{BytesCodec, LinesCodec}; +mod decoder; +mod encoder; +mod bytes_codec; +mod lines_codec; + +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; +pub use self::bytes_codec::BytesCodec; +pub use self::lines_codec::LinesCodec; + pub use framed::{Framed, FramedParts}; -pub use framed_read::{FramedRead, Decoder}; -pub use framed_write::{FramedWrite, Encoder}; +pub use framed_read::FramedRead; +pub use framed_write::FramedWrite; pub mod length_delimited { //! Frame a stream of bytes based on a length prefix diff --git a/tokio-io/src/framed.rs b/tokio-io/src/framed.rs index e3255470..5a3c54a8 100644 --- a/tokio-io/src/framed.rs +++ b/tokio-io/src/framed.rs @@ -2,8 +2,9 @@ use std::io::{self, Read, Write}; use std::fmt; use {AsyncRead, AsyncWrite}; -use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2, Decoder}; -use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2, Encoder}; +use codec::{Decoder, Encoder}; +use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; use futures::{Stream, Sink, StartSend, Poll}; use bytes::{BytesMut}; @@ -225,4 +226,4 @@ pub struct FramedParts<T> pub readbuf: BytesMut, /// A buffer with unprocessed data which are not written yet. pub writebuf: BytesMut -}
\ No newline at end of file +} diff --git a/tokio-io/src/framed_read.rs b/tokio-io/src/framed_read.rs index 6ccfd1aa..e04e3acf 100644 --- a/tokio-io/src/framed_read.rs +++ b/tokio-io/src/framed_read.rs @@ -1,95 +1,12 @@ -use std::{fmt, io}; +use std::fmt; use AsyncRead; +use codec::Decoder; use framed::Fuse; use futures::{Async, Poll, Stream, Sink, StartSend}; use bytes::BytesMut; -/// Decoding of frames via buffers. -/// -/// This trait is used when constructing an instance of `Framed` or -/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has -/// already been buffered in `src` and decodes the data into a stream of -/// `Self::Item` frames. -/// -/// Implementations are able to track state on `self`, which enables -/// implementing stateful streaming parsers. In many cases, though, this type -/// will simply be a unit struct (e.g. `struct HttpDecoder`). -pub trait Decoder { - /// The type of decoded frames. - type Item; - - /// The type of unrecoverable frame decoding errors. - /// - /// If an individual message is ill-formed but can be ignored without - /// interfering with the processing of future messages, it may be more - /// useful to report the failure as an `Item`. - /// - /// `From<io::Error>` is required in the interest of making `Error` suitable - /// for returning directly from a `FramedRead`, and to enable the default - /// implementation of `decode_eof` to yield an `io::Error` when the decoder - /// fails to consume all available data. - /// - /// Note that implementors of this trait can simply indicate `type Error = - /// io::Error` to use I/O errors as this type. - type Error: From<io::Error>; - - /// Attempts to decode a frame from the provided buffer of bytes. - /// - /// This method is called by `FramedRead` whenever bytes are ready to be - /// parsed. The provided buffer of bytes is what's been read so far, and - /// this instance of `Decode` can determine whether an entire frame is in - /// the buffer and is ready to be returned. - /// - /// If an entire frame is available, then this instance will remove those - /// bytes from the buffer provided and return them as a decoded - /// frame. Note that removing bytes from the provided buffer doesn't always - /// necessarily copy the bytes, so this should be an efficient operation in - /// most circumstances. - /// - /// If the bytes look valid, but a frame isn't fully available yet, then - /// `Ok(None)` is returned. This indicates to the `Framed` instance that - /// it needs to read some more bytes before calling this method again. - /// - /// Note that the bytes provided may be empty. If a previous call to - /// `decode` consumed all the bytes in the buffer then `decode` will be - /// called again until it returns `None`, indicating that more bytes need to - /// be read. - /// - /// Finally, if the bytes in the buffer are malformed then an error is - /// returned indicating why. This informs `Framed` that the stream is now - /// corrupt and should be terminated. - fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; - - /// A default method available to be called when there are no more bytes - /// available to be read from the underlying I/O. - /// - /// This method defaults to calling `decode` and returns an error if - /// `Ok(None)` is returned while there is unconsumed data in `buf`. - /// Typically this doesn't need to be implemented unless the framing - /// protocol differs near the end of the stream. - /// - /// Note that the `buf` argument may be empty. If a previous call to - /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be - /// called again until it returns `None`, indicating that there are no more - /// frames to yield. This behavior enables returning finalization frames - /// that may not be based on inbound data. - fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - match try!(self.decode(buf)) { - Some(frame) => Ok(Some(frame)), - None => { - if buf.is_empty() { - Ok(None) - } else { - Err(io::Error::new(io::ErrorKind::Other, - "bytes remaining on stream").into()) - } - } - } - } -} - /// A `Stream` of messages decoded from an `AsyncRead`. pub struct FramedRead<T, D> { inner: FramedRead2<Fuse<T, D>>, diff --git a/tokio-io/src/framed_write.rs b/tokio-io/src/framed_write.rs index f9d0edcb..e16ad3f8 100644 --- a/tokio-io/src/framed_write.rs +++ b/tokio-io/src/framed_write.rs @@ -2,33 +2,12 @@ use std::io::{self, Read}; use std::fmt; use {AsyncRead, AsyncWrite}; -use codec::Decoder; +use codec::{Decoder, Encoder}; use framed::Fuse; use futures::{Async, AsyncSink, Poll, Stream, Sink, StartSend}; use bytes::BytesMut; -/// Trait of helper objects to write out messages as bytes, for use with -/// `FramedWrite`. -pub trait Encoder { - /// The type of items consumed by the `Encoder` - type Item; - - /// The type of encoding errors. - /// - /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>` - /// in the interest letting it return `Error`s directly. - type Error: From<io::Error>; - - /// Encodes a frame into the buffer provided. - /// - /// This method will encode `item` into the byte buffer provided by `dst`. - /// The `dst` provided is an internal buffer of the `Framed` instance and - /// will be written out when possible. - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) - -> Result<(), Self::Error>; -} - /// A `Sink` of frames encoded to an `AsyncWrite`. pub struct FramedWrite<T, E> { inner: FramedWrite2<Fuse<T, E>>, diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs index 4c6d67ef..58dbeedf 100644 --- a/tokio-io/src/lib.rs +++ b/tokio-io/src/lib.rs @@ -49,7 +49,6 @@ pub mod io; pub mod codec; mod allow_std; -mod codecs; mod copy; mod flush; mod framed; |