summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman <humbug@deeptown.org>2018-02-12 21:10:19 +0400
committerCarl Lerche <me@carllerche.com>2018-02-12 09:10:19 -0800
commit35aeabd3ffda78fea756b786bbf0fe38f7e71d83 (patch)
tree68c5ce082e8d3736a7815afb49cac3cc018e7bbf
parent0b58bded7c0c04f07dd5418fdbaa0976369985ba (diff)
Split codec code (#128)
* move src/codec.rs -> src/codec/mod.rs * Move traits Encoder and Decoder from src/framed_{read|write}.rs into src/codec/{encoder,decoder}.rs * Move LinesCodec and BytesCodec from src/codecs.rs into src/codec/{lines,bytes}_codec.rs
-rw-r--r--tokio-io/src/codec/bytes_codec.rs37
-rw-r--r--tokio-io/src/codec/decoder.rs86
-rw-r--r--tokio-io/src/codec/encoder.rs23
-rw-r--r--tokio-io/src/codec/lines_codec.rs (renamed from tokio-io/src/codecs.rs)36
-rw-r--r--tokio-io/src/codec/mod.rs (renamed from tokio-io/src/codec.rs)15
-rw-r--r--tokio-io/src/framed.rs7
-rw-r--r--tokio-io/src/framed_read.rs87
-rw-r--r--tokio-io/src/framed_write.rs23
-rw-r--r--tokio-io/src/lib.rs1
9 files changed, 166 insertions, 149 deletions
diff --git a/tokio-io/src/codec/bytes_codec.rs b/tokio-io/src/codec/bytes_codec.rs
new file mode 100644
index 00000000..ce26b9e6
--- /dev/null
+++ b/tokio-io/src/codec/bytes_codec.rs
@@ -0,0 +1,37 @@
+use bytes::{Bytes, BufMut, BytesMut};
+use codec::{Encoder, Decoder};
+use std::io;
+
+/// A simple `Codec` implementation that just ships bytes around.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+ /// Creates a new `BytesCodec` for shipping around raw bytes.
+ pub fn new() -> BytesCodec { BytesCodec(()) }
+}
+
+impl Decoder for BytesCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl Encoder for BytesCodec {
+ type Item = Bytes;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
diff --git a/tokio-io/src/codec/decoder.rs b/tokio-io/src/codec/decoder.rs
new file mode 100644
index 00000000..e4c3d79e
--- /dev/null
+++ b/tokio-io/src/codec/decoder.rs
@@ -0,0 +1,86 @@
+use std::io;
+use bytes::BytesMut;
+
+/// Decoding of frames via buffers.
+///
+/// This trait is used when constructing an instance of `Framed` or
+/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
+/// already been buffered in `src` and decodes the data into a stream of
+/// `Self::Item` frames.
+///
+/// Implementations are able to track state on `self`, which enables
+/// implementing stateful streaming parsers. In many cases, though, this type
+/// will simply be a unit struct (e.g. `struct HttpDecoder`).
+pub trait Decoder {
+ /// The type of decoded frames.
+ type Item;
+
+ /// The type of unrecoverable frame decoding errors.
+ ///
+ /// If an individual message is ill-formed but can be ignored without
+ /// interfering with the processing of future messages, it may be more
+ /// useful to report the failure as an `Item`.
+ ///
+ /// `From<io::Error>` is required in the interest of making `Error` suitable
+ /// for returning directly from a `FramedRead`, and to enable the default
+ /// implementation of `decode_eof` to yield an `io::Error` when the decoder
+ /// fails to consume all available data.
+ ///
+ /// Note that implementors of this trait can simply indicate `type Error =
+ /// io::Error` to use I/O errors as this type.
+ type Error: From<io::Error>;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ ///
+ /// This method is called by `FramedRead` whenever bytes are ready to be
+ /// parsed. The provided buffer of bytes is what's been read so far, and
+ /// this instance of `Decode` can determine whether an entire frame is in
+ /// the buffer and is ready to be returned.
+ ///
+ /// If an entire frame is available, then this instance will remove those
+ /// bytes from the buffer provided and return them as a decoded
+ /// frame. Note that removing bytes from the provided buffer doesn't always
+ /// necessarily copy the bytes, so this should be an efficient operation in
+ /// most circumstances.
+ ///
+ /// If the bytes look valid, but a frame isn't fully available yet, then
+ /// `Ok(None)` is returned. This indicates to the `Framed` instance that
+ /// it needs to read some more bytes before calling this method again.
+ ///
+ /// Note that the bytes provided may be empty. If a previous call to
+ /// `decode` consumed all the bytes in the buffer then `decode` will be
+ /// called again until it returns `None`, indicating that more bytes need to
+ /// be read.
+ ///
+ /// Finally, if the bytes in the buffer are malformed then an error is
+ /// returned indicating why. This informs `Framed` that the stream is now
+ /// corrupt and should be terminated.
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
+
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the underlying I/O.
+ ///
+ /// This method defaults to calling `decode` and returns an error if
+ /// `Ok(None)` is returned while there is unconsumed data in `buf`.
+ /// Typically this doesn't need to be implemented unless the framing
+ /// protocol differs near the end of the stream.
+ ///
+ /// Note that the `buf` argument may be empty. If a previous call to
+ /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
+ /// called again until it returns `None`, indicating that there are no more
+ /// frames to yield. This behavior enables returning finalization frames
+ /// that may not be based on inbound data.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ match try!(self.decode(buf)) {
+ Some(frame) => Ok(Some(frame)),
+ None => {
+ if buf.is_empty() {
+ Ok(None)
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "bytes remaining on stream").into())
+ }
+ }
+ }
+ }
+}
diff --git a/tokio-io/src/codec/encoder.rs b/tokio-io/src/codec/encoder.rs
new file mode 100644
index 00000000..55056c1c
--- /dev/null
+++ b/tokio-io/src/codec/encoder.rs
@@ -0,0 +1,23 @@
+use std::io;
+use bytes::BytesMut;
+
+/// Trait of helper objects to write out messages as bytes, for use with
+/// `FramedWrite`.
+pub trait Encoder {
+ /// The type of items consumed by the `Encoder`
+ type Item;
+
+ /// The type of encoding errors.
+ ///
+ /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
+ /// in the interest letting it return `Error`s directly.
+ type Error: From<io::Error>;
+
+ /// Encodes a frame into the buffer provided.
+ ///
+ /// This method will encode `item` into the byte buffer provided by `dst`.
+ /// The `dst` provided is an internal buffer of the `Framed` instance and
+ /// will be written out when possible.
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut)
+ -> Result<(), Self::Error>;
+}
diff --git a/tokio-io/src/codecs.rs b/tokio-io/src/codec/lines_codec.rs
index f61448d1..2e1bea4a 100644
--- a/tokio-io/src/codecs.rs
+++ b/tokio-io/src/codec/lines_codec.rs
@@ -1,41 +1,7 @@
-use bytes::{Bytes, BufMut, BytesMut};
+use bytes::{BufMut, BytesMut};
use codec::{Encoder, Decoder};
use std::{io, str};
-/// A simple `Codec` implementation that just ships bytes around.
-#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
-pub struct BytesCodec(());
-
-impl BytesCodec {
- /// Creates a new `BytesCodec` for shipping around raw bytes.
- pub fn new() -> BytesCodec { BytesCodec(()) }
-}
-
-impl Decoder for BytesCodec {
- type Item = BytesMut;
- type Error = io::Error;
-
- fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
- if buf.len() > 0 {
- let len = buf.len();
- Ok(Some(buf.split_to(len)))
- } else {
- Ok(None)
- }
- }
-}
-
-impl Encoder for BytesCodec {
- type Item = Bytes;
- type Error = io::Error;
-
- fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
- buf.reserve(data.len());
- buf.put(data);
- Ok(())
- }
-}
-
/// A simple `Codec` implementation that splits up data into lines.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LinesCodec {
diff --git a/tokio-io/src/codec.rs b/tokio-io/src/codec/mod.rs
index aa91f224..3a80d6a3 100644
--- a/tokio-io/src/codec.rs
+++ b/tokio-io/src/codec/mod.rs
@@ -10,10 +10,19 @@
//! [`Stream`]: #
//! [transports]: #
-pub use codecs::{BytesCodec, LinesCodec};
+mod decoder;
+mod encoder;
+mod bytes_codec;
+mod lines_codec;
+
+pub use self::decoder::Decoder;
+pub use self::encoder::Encoder;
+pub use self::bytes_codec::BytesCodec;
+pub use self::lines_codec::LinesCodec;
+
pub use framed::{Framed, FramedParts};
-pub use framed_read::{FramedRead, Decoder};
-pub use framed_write::{FramedWrite, Encoder};
+pub use framed_read::FramedRead;
+pub use framed_write::FramedWrite;
pub mod length_delimited {
//! Frame a stream of bytes based on a length prefix
diff --git a/tokio-io/src/framed.rs b/tokio-io/src/framed.rs
index e3255470..5a3c54a8 100644
--- a/tokio-io/src/framed.rs
+++ b/tokio-io/src/framed.rs
@@ -2,8 +2,9 @@ use std::io::{self, Read, Write};
use std::fmt;
use {AsyncRead, AsyncWrite};
-use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2, Decoder};
-use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2, Encoder};
+use codec::{Decoder, Encoder};
+use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
+use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
use futures::{Stream, Sink, StartSend, Poll};
use bytes::{BytesMut};
@@ -225,4 +226,4 @@ pub struct FramedParts<T>
pub readbuf: BytesMut,
/// A buffer with unprocessed data which are not written yet.
pub writebuf: BytesMut
-} \ No newline at end of file
+}
diff --git a/tokio-io/src/framed_read.rs b/tokio-io/src/framed_read.rs
index 6ccfd1aa..e04e3acf 100644
--- a/tokio-io/src/framed_read.rs
+++ b/tokio-io/src/framed_read.rs
@@ -1,95 +1,12 @@
-use std::{fmt, io};
+use std::fmt;
use AsyncRead;
+use codec::Decoder;
use framed::Fuse;
use futures::{Async, Poll, Stream, Sink, StartSend};
use bytes::BytesMut;
-/// Decoding of frames via buffers.
-///
-/// This trait is used when constructing an instance of `Framed` or
-/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
-/// already been buffered in `src` and decodes the data into a stream of
-/// `Self::Item` frames.
-///
-/// Implementations are able to track state on `self`, which enables
-/// implementing stateful streaming parsers. In many cases, though, this type
-/// will simply be a unit struct (e.g. `struct HttpDecoder`).
-pub trait Decoder {
- /// The type of decoded frames.
- type Item;
-
- /// The type of unrecoverable frame decoding errors.
- ///
- /// If an individual message is ill-formed but can be ignored without
- /// interfering with the processing of future messages, it may be more
- /// useful to report the failure as an `Item`.
- ///
- /// `From<io::Error>` is required in the interest of making `Error` suitable
- /// for returning directly from a `FramedRead`, and to enable the default
- /// implementation of `decode_eof` to yield an `io::Error` when the decoder
- /// fails to consume all available data.
- ///
- /// Note that implementors of this trait can simply indicate `type Error =
- /// io::Error` to use I/O errors as this type.
- type Error: From<io::Error>;
-
- /// Attempts to decode a frame from the provided buffer of bytes.
- ///
- /// This method is called by `FramedRead` whenever bytes are ready to be
- /// parsed. The provided buffer of bytes is what's been read so far, and
- /// this instance of `Decode` can determine whether an entire frame is in
- /// the buffer and is ready to be returned.
- ///
- /// If an entire frame is available, then this instance will remove those
- /// bytes from the buffer provided and return them as a decoded
- /// frame. Note that removing bytes from the provided buffer doesn't always
- /// necessarily copy the bytes, so this should be an efficient operation in
- /// most circumstances.
- ///
- /// If the bytes look valid, but a frame isn't fully available yet, then
- /// `Ok(None)` is returned. This indicates to the `Framed` instance that
- /// it needs to read some more bytes before calling this method again.
- ///
- /// Note that the bytes provided may be empty. If a previous call to
- /// `decode` consumed all the bytes in the buffer then `decode` will be
- /// called again until it returns `None`, indicating that more bytes need to
- /// be read.
- ///
- /// Finally, if the bytes in the buffer are malformed then an error is
- /// returned indicating why. This informs `Framed` that the stream is now
- /// corrupt and should be terminated.
- fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
-
- /// A default method available to be called when there are no more bytes
- /// available to be read from the underlying I/O.
- ///
- /// This method defaults to calling `decode` and returns an error if
- /// `Ok(None)` is returned while there is unconsumed data in `buf`.
- /// Typically this doesn't need to be implemented unless the framing
- /// protocol differs near the end of the stream.
- ///
- /// Note that the `buf` argument may be empty. If a previous call to
- /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
- /// called again until it returns `None`, indicating that there are no more
- /// frames to yield. This behavior enables returning finalization frames
- /// that may not be based on inbound data.
- fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
- match try!(self.decode(buf)) {
- Some(frame) => Ok(Some(frame)),
- None => {
- if buf.is_empty() {
- Ok(None)
- } else {
- Err(io::Error::new(io::ErrorKind::Other,
- "bytes remaining on stream").into())
- }
- }
- }
- }
-}
-
/// A `Stream` of messages decoded from an `AsyncRead`.
pub struct FramedRead<T, D> {
inner: FramedRead2<Fuse<T, D>>,
diff --git a/tokio-io/src/framed_write.rs b/tokio-io/src/framed_write.rs
index f9d0edcb..e16ad3f8 100644
--- a/tokio-io/src/framed_write.rs
+++ b/tokio-io/src/framed_write.rs
@@ -2,33 +2,12 @@ use std::io::{self, Read};
use std::fmt;
use {AsyncRead, AsyncWrite};
-use codec::Decoder;
+use codec::{Decoder, Encoder};
use framed::Fuse;
use futures::{Async, AsyncSink, Poll, Stream, Sink, StartSend};
use bytes::BytesMut;
-/// Trait of helper objects to write out messages as bytes, for use with
-/// `FramedWrite`.
-pub trait Encoder {
- /// The type of items consumed by the `Encoder`
- type Item;
-
- /// The type of encoding errors.
- ///
- /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
- /// in the interest letting it return `Error`s directly.
- type Error: From<io::Error>;
-
- /// Encodes a frame into the buffer provided.
- ///
- /// This method will encode `item` into the byte buffer provided by `dst`.
- /// The `dst` provided is an internal buffer of the `Framed` instance and
- /// will be written out when possible.
- fn encode(&mut self, item: Self::Item, dst: &mut BytesMut)
- -> Result<(), Self::Error>;
-}
-
/// A `Sink` of frames encoded to an `AsyncWrite`.
pub struct FramedWrite<T, E> {
inner: FramedWrite2<Fuse<T, E>>,
diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs
index 4c6d67ef..58dbeedf 100644
--- a/tokio-io/src/lib.rs
+++ b/tokio-io/src/lib.rs
@@ -49,7 +49,6 @@ pub mod io;
pub mod codec;
mod allow_std;
-mod codecs;
mod copy;
mod flush;
mod framed;