diff options
Diffstat (limited to 'tokio-util/src/codec/framed.rs')
-rw-r--r-- | tokio-util/src/codec/framed.rs | 208 |
1 files changed, 47 insertions, 161 deletions
diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index d2e7659e..bf05a3ae 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -1,10 +1,9 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; -use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; use tokio::{ - io::{AsyncBufRead, AsyncRead, AsyncWrite}, + io::{AsyncRead, AsyncWrite}, stream::Stream, }; @@ -12,8 +11,7 @@ use bytes::BytesMut; use futures_sink::Sink; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, BufRead, Read, Write}; -use std::mem::MaybeUninit; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -30,37 +28,7 @@ pin_project! { /// [`Decoder::framed`]: crate::codec::Decoder::framed() pub struct Framed<T, U> { #[pin] - inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, - } -} - -pin_project! { - pub(crate) struct Fuse<T, U> { - #[pin] - pub(crate) io: T, - pub(crate) codec: U, - } -} - -/// Abstracts over `FramedRead2` being either `FramedRead2<FramedWrite2<Fuse<T, U>>>` or -/// `FramedRead2<Fuse<T, U>>` and lets the io and codec parts be extracted in either case. -pub(crate) trait ProjectFuse { - type Io; - type Codec; - - fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec>; -} - -impl<T, U> ProjectFuse for Fuse<T, U> { - type Io = T; - type Codec = U; - - fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> { - let self_ = self.project(); - Fuse { - io: self_.io, - codec: self_.codec, - } + inner: FramedImpl<T, U, RWFrames> } } @@ -93,7 +61,11 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn new(inner: T, codec: U) -> Framed<T, U> { Framed { - inner: framed_read2(framed_write2(Fuse { io: inner, codec })), + inner: FramedImpl { + inner, + codec, + state: Default::default(), + }, } } @@ -123,10 +95,18 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { Framed { - inner: framed_read2_with_buffer( - framed_write2(Fuse { io: inner, codec }), - BytesMut::with_capacity(capacity), - ), + inner: FramedImpl { + inner, + codec, + state: RWFrames { + read: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), + }, + write: WriteFrame::default(), + }, + }, } } } @@ -161,16 +141,14 @@ impl<T, U> Framed<T, U> { /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer( - Fuse { - io: parts.io, - codec: parts.codec, - }, - parts.write_buf, - ), - parts.read_buf, - ), + inner: FramedImpl { + inner: parts.io, + codec: parts.codec, + state: RWFrames { + read: parts.read_buf.into(), + write: parts.write_buf.into(), + }, + }, } } @@ -181,7 +159,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -191,7 +169,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().io + &mut self.inner.inner } /// Returns a reference to the underlying codec wrapped by @@ -200,7 +178,7 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec(&self) -> &U { - &self.inner.get_ref().get_ref().codec + &self.inner.codec } /// Returns a mutable reference to the underlying codec wrapped by @@ -209,12 +187,12 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - self.inner.buffer() + &self.inner.state.read.buffer } /// Consumes the `Framed`, returning its underlying I/O stream. @@ -223,7 +201,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().io + self.inner.inner } /// Consumes the `Framed`, returning its underlying I/O stream, the buffer @@ -233,19 +211,17 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_parts(self) -> FramedParts<T, U> { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf) = inner.into_parts(); - FramedParts { - io: inner.io, - codec: inner.codec, - read_buf, - write_buf, + io: self.inner.inner, + codec: self.inner.codec, + read_buf: self.inner.state.read.buffer, + write_buf: self.inner.state.write.buffer, _priv: (), } } } +// This impl just defers to the underlying FramedImpl impl<T, U> Stream for Framed<T, U> where T: AsyncRead, @@ -258,6 +234,7 @@ where } } +// This impl just defers to the underlying FramedImpl impl<T, I, U> Sink<I> for Framed<T, U> where T: AsyncWrite, @@ -267,19 +244,19 @@ where type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_ready(cx) + self.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project().inner.get_pin_mut().start_send(item) + self.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_flush(cx) + self.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_close(cx) + self.project().inner.poll_close(cx) } } @@ -290,103 +267,12 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().io) - .field("codec", &self.inner.get_ref().get_ref().codec) + .field("io", self.get_ref()) + .field("codec", self.codec()) .finish() } } -// ===== impl Fuse ===== - -impl<T: Read, U> Read for Fuse<T, U> { - fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { - self.io.read(dst) - } -} - -impl<T: BufRead, U> BufRead for Fuse<T, U> { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.io.fill_buf() - } - - fn consume(&mut self, amt: usize) { - self.io.consume(amt) - } -} - -impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { - self.io.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<usize, io::Error>> { - self.project().io.poll_read(cx, buf) - } -} - -impl<T: AsyncBufRead, U> AsyncBufRead for Fuse<T, U> { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - self.project().io.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().io.consume(amt) - } -} - -impl<T: Write, U> Write for Fuse<T, U> { - fn write(&mut self, src: &[u8]) -> io::Result<usize> { - self.io.write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.io.flush() - } -} - -impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize, io::Error>> { - self.project().io.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - self.project().io.poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - self.project().io.poll_shutdown(cx) - } -} - -impl<T, U: Decoder> Decoder for Fuse<T, U> { - type Item = U::Item; - type Error = U::Error; - - fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.codec.decode(buffer) - } - - fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.codec.decode_eof(buffer) - } -} - -impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> { - type Error = U::Error; - - fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.codec.encode(item, dst) - } -} - /// `FramedParts` contains an export of the data of a Framed transport. /// It can be used to construct a new [`Framed`] with a different codec. /// It contains all current buffers and the inner transport. |