diff options
author | Markus Westerlind <marwes91@gmail.com> | 2019-11-15 08:30:07 +0100 |
---|---|---|
committer | Taiki Endo <te316e89@gmail.com> | 2019-11-15 16:30:07 +0900 |
commit | 930679587ae42e4df3113159ccf33fb5923dd73a (patch) | |
tree | a03a255fc40092185de040e4e825da13d26c10e4 | |
parent | 27e5b41067d01c0c9fac230c5addb58034201a63 (diff) |
codec: Remove Unpin requirement from Framed[Read,Write,] (#1758)
cc #1252
-rw-r--r-- | tokio-util/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-util/src/codec/framed.rs | 114 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_read.rs | 111 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_write.rs | 94 | ||||
-rw-r--r-- | tokio-util/src/codec/macros.rs | 7 | ||||
-rw-r--r-- | tokio-util/src/codec/mod.rs | 3 |
6 files changed, 219 insertions, 111 deletions
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 034dcd02..69b804b7 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -26,6 +26,7 @@ bytes = "0.4.7" futures-core = "0.3.0" futures-sink = "0.3.0" log = "0.4" +pin-project = "0.4" [dev-dependencies] tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index c95d643b..214fac14 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -8,6 +8,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; use bytes::BytesMut; use futures_core::Stream; use futures_sink::Sink; +use pin_project::pin_project; use std::fmt; use std::io::{self, BufRead, Read, Write}; use std::pin::Pin; @@ -17,11 +18,40 @@ use std::task::{Context, Poll}; /// the `Encoder` and `Decoder` traits to encode and decode frames. /// /// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +#[pin_project] pub struct Framed<T, U> { + #[pin] inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, } -pub(crate) struct Fuse<T, U>(pub(crate) T, pub(crate) U); +#[pin_project] +pub(crate) struct Fuse<T, U> { + #[pin] + pub(crate) io: T, + pub(crate) codec: U, +} + +/// Abstracts over `FramedRead2` being either `FramedRead2<FramedWrite2<Fuse<T, U>>>` or +/// `FramedRead2<Fuse<T, U>>` and lets the io and codec parts be extracted in either case. +pub(crate) trait ProjectFuse { + type Io; + type Codec; + + fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec>; +} + +impl<T, U> ProjectFuse for Fuse<T, U> { + type Io = T; + type Codec = U; + + fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> { + let self_ = self.project(); + Fuse { + io: self_.io, + codec: self_.codec, + } + } +} impl<T, U> Framed<T, U> where @@ -47,7 +77,7 @@ where /// break them into separate objects, allowing them to interact more easily. pub fn new(inner: T, codec: U) -> Framed<T, U> { Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec))), + inner: framed_read2(framed_write2(Fuse { io: inner, codec })), } } } @@ -76,7 +106,13 @@ impl<T, U> Framed<T, U> { pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { Framed { inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), + framed_write2_with_buffer( + Fuse { + io: parts.io, + codec: parts.codec, + }, + parts.write_buf, + ), parts.read_buf, ), } @@ -89,7 +125,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().0 + &self.inner.get_ref().get_ref().io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -99,7 +135,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().0 + &mut self.inner.get_mut().get_mut().io } /// Returns a reference to the underlying codec wrapped by @@ -108,7 +144,7 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec(&self) -> &U { - &self.inner.get_ref().get_ref().1 + &self.inner.get_ref().get_ref().codec } /// Returns a mutable reference to the underlying codec wrapped by @@ -117,7 +153,7 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().1 + &mut self.inner.get_mut().get_mut().codec } /// Returns a reference to the read buffer. @@ -131,7 +167,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().0 + self.inner.into_inner().into_inner().io } /// Consumes the `Frame`, returning its underlying I/O stream, the buffer @@ -145,8 +181,8 @@ impl<T, U> Framed<T, U> { let (inner, write_buf) = inner.into_parts(); FramedParts { - io: inner.0, - codec: inner.1, + io: inner.io, + codec: inner.codec, read_buf, write_buf, _priv: (), @@ -156,38 +192,38 @@ impl<T, U> Framed<T, U> { impl<T, U> Stream for Framed<T, U> where - T: AsyncRead + Unpin, - U: Decoder + Unpin, + T: AsyncRead, + U: Decoder, { type Item = Result<U::Item, U::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - pin!(self.get_mut().inner).poll_next(cx) + self.project().inner.poll_next(cx) } } impl<T, I, U> Sink<I> for Framed<T, U> where - T: AsyncWrite + Unpin, - U: Encoder<Item = I> + Unpin, + T: AsyncWrite, + U: Encoder<Item = I>, U::Error: From<io::Error>, { type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx) + self.project().inner.get_pin_mut().poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item) + self.project().inner.get_pin_mut().start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx) + self.project().inner.get_pin_mut().poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx) + self.project().inner.get_pin_mut().poll_close(cx) } } @@ -198,8 +234,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().0) - .field("codec", &self.inner.get_ref().get_ref().1) + .field("io", &self.inner.get_ref().get_ref().io) + .field("codec", &self.inner.get_ref().get_ref().codec) .finish() } } @@ -208,23 +244,23 @@ where impl<T: Read, U> Read for Fuse<T, U> { fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { - self.0.read(dst) + self.io.read(dst) } } impl<T: BufRead, U> BufRead for Fuse<T, U> { fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.0.fill_buf() + self.io.fill_buf() } fn consume(&mut self, amt: usize) { - self.0.consume(amt) + self.io.consume(amt) } } -impl<T: AsyncRead + Unpin, U: Unpin> AsyncRead for Fuse<T, U> { +impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) + self.io.prepare_uninitialized_buffer(buf) } fn poll_read( @@ -232,45 +268,45 @@ impl<T: AsyncRead + Unpin, U: Unpin> AsyncRead for Fuse<T, U> { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>> { - pin!(self.get_mut().0).poll_read(cx, buf) + self.project().io.poll_read(cx, buf) } } -impl<T: AsyncBufRead + Unpin, U: Unpin> AsyncBufRead for Fuse<T, U> { +impl<T: AsyncBufRead, U> AsyncBufRead for Fuse<T, U> { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - pin!(self.get_mut().0).poll_fill_buf(cx) + self.project().io.poll_fill_buf(cx) } fn consume(self: Pin<&mut Self>, amt: usize) { - pin!(self.get_mut().0).consume(amt) + self.project().io.consume(amt) } } impl<T: Write, U> Write for Fuse<T, U> { fn write(&mut self, src: &[u8]) -> io::Result<usize> { - self.0.write(src) + self.io.write(src) } fn flush(&mut self) -> io::Result<()> { - self.0.flush() + self.io.flush() } } -impl<T: AsyncWrite + Unpin, U: Unpin> AsyncWrite for Fuse<T, U> { +impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>> { - pin!(self.get_mut().0).poll_write(cx, buf) + self.project().io.poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - pin!(self.get_mut().0).poll_flush(cx) + self.project().io.poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - pin!(self.get_mut().0).poll_shutdown(cx) + self.project().io.poll_shutdown(cx) } } @@ -279,11 +315,11 @@ impl<T, U: Decoder> Decoder for Fuse<T, U> { type Error = U::Error; fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.1.decode(buffer) + self.codec.decode(buffer) } fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.1.decode_eof(buffer) + self.codec.decode_eof(buffer) } } @@ -292,7 +328,7 @@ impl<T, U: Encoder> Encoder for Fuse<T, U> { type Error = U::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.1.encode(item, dst) + self.codec.encode(item, dst) } } diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index e10f2968..9e8beb03 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,4 +1,4 @@ -use crate::codec::framed::Fuse; +use crate::codec::framed::{Fuse, ProjectFuse}; use crate::codec::Decoder; use tokio::io::AsyncRead; @@ -7,16 +7,21 @@ use bytes::BytesMut; use futures_core::Stream; use futures_sink::Sink; use log::trace; +use pin_project::pin_project; use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; /// A `Stream` of messages decoded from an `AsyncRead`. +#[pin_project] pub struct FramedRead<T, D> { + #[pin] inner: FramedRead2<Fuse<T, D>>, } +#[pin_project] pub(crate) struct FramedRead2<T> { + #[pin] inner: T, eof: bool, is_readable: bool, @@ -35,7 +40,10 @@ where /// Creates a new `FramedRead` with the given `decoder`. pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { FramedRead { - inner: framed_read2(Fuse(inner, decoder)), + inner: framed_read2(Fuse { + io: inner, + codec: decoder, + }), } } } @@ -48,7 +56,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.0 + &self.inner.inner.io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -58,7 +66,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 + &mut self.inner.inner.io } /// Consumes the `FramedRead`, returning its underlying I/O stream. @@ -67,17 +75,17 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.0 + self.inner.inner.io } /// Returns a reference to the underlying decoder. pub fn decoder(&self) -> &D { - &self.inner.inner.1 + &self.inner.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.1 + &mut self.inner.inner.codec } /// Returns a reference to the read buffer. @@ -88,38 +96,61 @@ impl<T, D> FramedRead<T, D> { impl<T, D> Stream for FramedRead<T, D> where - T: AsyncRead + Unpin, - D: Decoder + Unpin, + T: AsyncRead, + D: Decoder, { type Item = Result<D::Item, D::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - pin!(self.get_mut().inner).poll_next(cx) + self.project().inner.poll_next(cx) } } // This impl just defers to the underlying T: Sink impl<T, I, D> Sink<I> for FramedRead<T, D> where - T: Sink<I> + Unpin, - D: Unpin, + T: Sink<I>, { type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - pin!(Pin::get_mut(self).inner.inner.0).start_send(item) + self.project() + .inner + .project() + .inner + .project() + .io + .start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_close(cx) } } @@ -130,8 +161,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.0) - .field("decoder", &self.inner.inner.1) + .field("inner", &self.inner.inner.io) + .field("decoder", &self.inner.inner.codec) .field("eof", &self.inner.eof) .field("is_readable", &self.inner.is_readable) .field("buffer", &self.inner.buffer) @@ -180,6 +211,10 @@ impl<T> FramedRead2<T> { &mut self.inner } + pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { + self.project().inner + } + pub(crate) fn buffer(&self) -> &BytesMut { &self.buffer } @@ -187,49 +222,65 @@ impl<T> FramedRead2<T> { impl<T> Stream for FramedRead2<T> where - T: AsyncRead + Decoder + Unpin, + T: ProjectFuse + AsyncRead, + T::Codec: Decoder, { - type Item = Result<T::Item, T::Error>; + type Item = Result<<T::Codec as Decoder>::Item, <T::Codec as Decoder>::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let pinned = Pin::get_mut(self); + let mut pinned = self.project(); loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if pinned.is_readable { - if pinned.eof { - let frame = pinned.inner.decode_eof(&mut pinned.buffer)?; + if *pinned.is_readable { + if *pinned.eof { + let frame = pinned + .inner + .as_mut() + .project() + .codec + .decode_eof(&mut pinned.buffer)?; return Poll::Ready(frame.map(Ok)); } trace!("attempting to decode a frame"); - if let Some(frame) = pinned.inner.decode(&mut pinned.buffer)? { + if let Some(frame) = pinned + .inner + .as_mut() + .project() + .codec + .decode(&mut pinned.buffer)? + { trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); } - pinned.is_readable = false; + *pinned.is_readable = false; } - assert!(!pinned.eof); + assert!(!*pinned.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF pinned.buffer.reserve(1); - let bytect = match pin!(pinned.inner).poll_read_buf(cx, &mut pinned.buffer)? { + let bytect = match pinned + .inner + .as_mut() + .poll_read_buf(cx, &mut pinned.buffer)? + { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; if bytect == 0 { - pinned.eof = true; + *pinned.eof = true; } - pinned.is_readable = true; + *pinned.is_readable = true; } } } diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 3a95612c..a88d0893 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -1,6 +1,6 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed::Fuse; +use crate::codec::framed::{Fuse, ProjectFuse}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; @@ -8,17 +8,22 @@ use bytes::BytesMut; use futures_core::{ready, Stream}; use futures_sink::Sink; use log::trace; +use pin_project::pin_project; use std::fmt; use std::io::{self, BufRead, Read}; use std::pin::Pin; use std::task::{Context, Poll}; /// A `Sink` of frames encoded to an `AsyncWrite`. +#[pin_project] pub struct FramedWrite<T, E> { + #[pin] inner: FramedWrite2<Fuse<T, E>>, } +#[pin_project] pub(crate) struct FramedWrite2<T> { + #[pin] inner: T, buffer: BytesMut, } @@ -34,7 +39,10 @@ where /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { FramedWrite { - inner: framed_write2(Fuse(inner, encoder)), + inner: framed_write2(Fuse { + io: inner, + codec: encoder, + }), } } } @@ -47,7 +55,7 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.0 + &self.inner.inner.io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -57,7 +65,7 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 + &mut self.inner.inner.io } /// Consumes the `FramedWrite`, returning its underlying I/O stream. @@ -66,55 +74,60 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.0 + self.inner.inner.io } /// Returns a reference to the underlying decoder. pub fn encoder(&self) -> &E { - &self.inner.inner.1 + &self.inner.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.1 + &mut self.inner.inner.codec } } // This impl just defers to the underlying FramedWrite2 impl<T, I, E> Sink<I> for FramedWrite<T, E> where - T: AsyncWrite + Unpin, - E: Encoder<Item = I> + Unpin, + T: AsyncWrite, + E: Encoder<Item = I>, E::Error: From<io::Error>, { type Error = E::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner).poll_ready(cx) + self.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - pin!(Pin::get_mut(self).inner).start_send(item) + self.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner).poll_flush(cx) + self.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - pin!(Pin::get_mut(self).inner).poll_close(cx) + self.project().inner.poll_close(cx) } } impl<T, D> Stream for FramedWrite<T, D> where - T: Stream + Unpin, - D: Unpin, + T: Stream, { type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - Pin::new(Pin::get_mut(self).get_mut()).poll_next(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_next(cx) } } @@ -125,8 +138,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().0) - .field("encoder", &self.inner.get_ref().1) + .field("inner", &self.inner.get_ref().io) + .field("encoder", &self.inner.get_ref().codec) .field("buffer", &self.inner.buffer) .finish() } @@ -169,9 +182,10 @@ impl<T> FramedWrite2<T> { impl<I, T> Sink<I> for FramedWrite2<T> where - T: AsyncWrite + Encoder<Item = I> + Unpin, + T: ProjectFuse + AsyncWrite, + T::Codec: Encoder<Item = I>, { - type Error = T::Error; + type Error = <T::Codec as Encoder>::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's @@ -191,20 +205,24 @@ where } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let pinned = Pin::get_mut(self); - pinned.inner.encode(item, &mut pinned.buffer)?; + let mut pinned = self.project(); + pinned + .inner + .project() + .codec + .encode(item, &mut pinned.buffer)?; Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { trace!("flushing framed transport"); - let pinned = Pin::get_mut(self); + let mut pinned = self.project(); while !pinned.buffer.is_empty() { trace!("writing; remaining={}", pinned.buffer.len()); let buf = &pinned.buffer; - let n = ready!(pin!(pinned.inner).poll_write(cx, &buf))?; + let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -220,15 +238,15 @@ where } // Try flushing the underlying IO - ready!(pin!(pinned.inner).poll_flush(cx))?; + ready!(pinned.inner.poll_flush(cx))?; trace!("framed transport flushed"); Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - ready!(pin!(self).poll_flush(cx))?; - ready!(pin!(self.inner).poll_shutdown(cx))?; + ready!(self.as_mut().poll_flush(cx))?; + ready!(self.project().inner.poll_shutdown(cx))?; Poll::Ready(Ok(())) } @@ -263,7 +281,7 @@ impl<T: BufRead> BufRead for FramedWrite2<T> { } } -impl<T: AsyncRead + Unpin> AsyncRead for FramedWrite2<T> { +impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } @@ -273,16 +291,28 @@ impl<T: AsyncRead + Unpin> AsyncRead for FramedWrite2<T> { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>> { - pin!(self.get_mut().inner).poll_read(cx, buf) + self.project().inner.poll_read(cx, buf) } } -impl<T: AsyncBufRead + Unpin> AsyncBufRead for FramedWrite2<T> { +impl<T: AsyncBufRead> AsyncBufRead for FramedWrite2<T> { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - pin!(self.get_mut().inner).poll_fill_buf(cx) + self.project().inner.poll_fill_buf(cx) } fn consume(self: Pin<&mut Self>, amt: usize) { - pin!(self.get_mut().inner).consume(amt) + self.project().inner.consume(amt) + } +} + +impl<T> ProjectFuse for FramedWrite2<T> +where + T: ProjectFuse, +{ + type Io = T::Io; + type Codec = T::Codec; + + fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> { + self.project().inner.project() } } diff --git a/tokio-util/src/codec/macros.rs b/tokio-util/src/codec/macros.rs deleted file mode 100644 index 902ada1c..00000000 --- a/tokio-util/src/codec/macros.rs +++ /dev/null @@ -1,7 +0,0 @@ -/// A macro to reduce some of the boilerplate for projecting from -/// `Pin<&mut T>` to `Pin<&mut T.field>` -macro_rules! pin { - ($e:expr) => { - std::pin::Pin::new(&mut $e) - }; -} diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index 203efae4..b162dd3a 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -9,9 +9,6 @@ //! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html -#[macro_use] -mod macros; - mod bytes_codec; pub use self::bytes_codec::BytesCodec; |