From 930679587ae42e4df3113159ccf33fb5923dd73a Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 15 Nov 2019 08:30:07 +0100 Subject: codec: Remove Unpin requirement from Framed[Read,Write,] (#1758) cc #1252 --- tokio-util/src/codec/framed.rs | 114 +++++++++++++++++++++++------------ tokio-util/src/codec/framed_read.rs | 111 +++++++++++++++++++++++++--------- tokio-util/src/codec/framed_write.rs | 94 +++++++++++++++++++---------- tokio-util/src/codec/macros.rs | 7 --- tokio-util/src/codec/mod.rs | 3 - 5 files changed, 218 insertions(+), 111 deletions(-) delete mode 100644 tokio-util/src/codec/macros.rs (limited to 'tokio-util/src/codec') diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index c95d643b..214fac14 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -8,6 +8,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; use bytes::BytesMut; use futures_core::Stream; use futures_sink::Sink; +use pin_project::pin_project; use std::fmt; use std::io::{self, BufRead, Read, Write}; use std::pin::Pin; @@ -17,11 +18,40 @@ use std::task::{Context, Poll}; /// the `Encoder` and `Decoder` traits to encode and decode frames. /// /// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +#[pin_project] pub struct Framed { + #[pin] inner: FramedRead2>>, } -pub(crate) struct Fuse(pub(crate) T, pub(crate) U); +#[pin_project] +pub(crate) struct Fuse { + #[pin] + pub(crate) io: T, + pub(crate) codec: U, +} + +/// Abstracts over `FramedRead2` being either `FramedRead2>>` or +/// `FramedRead2>` and lets the io and codec parts be extracted in either case. +pub(crate) trait ProjectFuse { + type Io; + type Codec; + + fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec>; +} + +impl ProjectFuse for Fuse { + type Io = T; + type Codec = U; + + fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec> { + let self_ = self.project(); + Fuse { + io: self_.io, + codec: self_.codec, + } + } +} impl Framed where @@ -47,7 +77,7 @@ where /// break them into separate objects, allowing them to interact more easily. pub fn new(inner: T, codec: U) -> Framed { Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec))), + inner: framed_read2(framed_write2(Fuse { io: inner, codec })), } } } @@ -76,7 +106,13 @@ impl Framed { pub fn from_parts(parts: FramedParts) -> Framed { Framed { inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), + framed_write2_with_buffer( + Fuse { + io: parts.io, + codec: parts.codec, + }, + parts.write_buf, + ), parts.read_buf, ), } @@ -89,7 +125,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().0 + &self.inner.get_ref().get_ref().io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -99,7 +135,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().0 + &mut self.inner.get_mut().get_mut().io } /// Returns a reference to the underlying codec wrapped by @@ -108,7 +144,7 @@ impl Framed { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec(&self) -> &U { - &self.inner.get_ref().get_ref().1 + &self.inner.get_ref().get_ref().codec } /// Returns a mutable reference to the underlying codec wrapped by @@ -117,7 +153,7 @@ impl Framed { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().1 + &mut self.inner.get_mut().get_mut().codec } /// Returns a reference to the read buffer. @@ -131,7 +167,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().0 + self.inner.into_inner().into_inner().io } /// Consumes the `Frame`, returning its underlying I/O stream, the buffer @@ -145,8 +181,8 @@ impl Framed { let (inner, write_buf) = inner.into_parts(); FramedParts { - io: inner.0, - codec: inner.1, + io: inner.io, + codec: inner.codec, read_buf, write_buf, _priv: (), @@ -156,38 +192,38 @@ impl Framed { impl Stream for Framed where - T: AsyncRead + Unpin, - U: Decoder + Unpin, + T: AsyncRead, + U: Decoder, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().inner).poll_next(cx) + self.project().inner.poll_next(cx) } } impl Sink for Framed where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, + T: AsyncWrite, + U: Encoder, U::Error: From, { type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx) + self.project().inner.get_pin_mut().poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item) + self.project().inner.get_pin_mut().start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx) + self.project().inner.get_pin_mut().poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx) + self.project().inner.get_pin_mut().poll_close(cx) } } @@ -198,8 +234,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().0) - .field("codec", &self.inner.get_ref().get_ref().1) + .field("io", &self.inner.get_ref().get_ref().io) + .field("codec", &self.inner.get_ref().get_ref().codec) .finish() } } @@ -208,23 +244,23 @@ where impl Read for Fuse { fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.0.read(dst) + self.io.read(dst) } } impl BufRead for Fuse { fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.0.fill_buf() + self.io.fill_buf() } fn consume(&mut self, amt: usize) { - self.0.consume(amt) + self.io.consume(amt) } } -impl AsyncRead for Fuse { +impl AsyncRead for Fuse { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) + self.io.prepare_uninitialized_buffer(buf) } fn poll_read( @@ -232,45 +268,45 @@ impl AsyncRead for Fuse { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - pin!(self.get_mut().0).poll_read(cx, buf) + self.project().io.poll_read(cx, buf) } } -impl AsyncBufRead for Fuse { +impl AsyncBufRead for Fuse { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().0).poll_fill_buf(cx) + self.project().io.poll_fill_buf(cx) } fn consume(self: Pin<&mut Self>, amt: usize) { - pin!(self.get_mut().0).consume(amt) + self.project().io.consume(amt) } } impl Write for Fuse { fn write(&mut self, src: &[u8]) -> io::Result { - self.0.write(src) + self.io.write(src) } fn flush(&mut self) -> io::Result<()> { - self.0.flush() + self.io.flush() } } -impl AsyncWrite for Fuse { +impl AsyncWrite for Fuse { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - pin!(self.get_mut().0).poll_write(cx, buf) + self.project().io.poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().0).poll_flush(cx) + self.project().io.poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().0).poll_shutdown(cx) + self.project().io.poll_shutdown(cx) } } @@ -279,11 +315,11 @@ impl Decoder for Fuse { type Error = U::Error; fn decode(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.1.decode(buffer) + self.codec.decode(buffer) } fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.1.decode_eof(buffer) + self.codec.decode_eof(buffer) } } @@ -292,7 +328,7 @@ impl Encoder for Fuse { type Error = U::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.1.encode(item, dst) + self.codec.encode(item, dst) } } diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index e10f2968..9e8beb03 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,4 +1,4 @@ -use crate::codec::framed::Fuse; +use crate::codec::framed::{Fuse, ProjectFuse}; use crate::codec::Decoder; use tokio::io::AsyncRead; @@ -7,16 +7,21 @@ use bytes::BytesMut; use futures_core::Stream; use futures_sink::Sink; use log::trace; +use pin_project::pin_project; use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; /// A `Stream` of messages decoded from an `AsyncRead`. +#[pin_project] pub struct FramedRead { + #[pin] inner: FramedRead2>, } +#[pin_project] pub(crate) struct FramedRead2 { + #[pin] inner: T, eof: bool, is_readable: bool, @@ -35,7 +40,10 @@ where /// Creates a new `FramedRead` with the given `decoder`. pub fn new(inner: T, decoder: D) -> FramedRead { FramedRead { - inner: framed_read2(Fuse(inner, decoder)), + inner: framed_read2(Fuse { + io: inner, + codec: decoder, + }), } } } @@ -48,7 +56,7 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.0 + &self.inner.inner.io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -58,7 +66,7 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 + &mut self.inner.inner.io } /// Consumes the `FramedRead`, returning its underlying I/O stream. @@ -67,17 +75,17 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.0 + self.inner.inner.io } /// Returns a reference to the underlying decoder. pub fn decoder(&self) -> &D { - &self.inner.inner.1 + &self.inner.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.1 + &mut self.inner.inner.codec } /// Returns a reference to the read buffer. @@ -88,38 +96,61 @@ impl FramedRead { impl Stream for FramedRead where - T: AsyncRead + Unpin, - D: Decoder + Unpin, + T: AsyncRead, + D: Decoder, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().inner).poll_next(cx) + self.project().inner.poll_next(cx) } } // This impl just defers to the underlying T: Sink impl Sink for FramedRead where - T: Sink + Unpin, - D: Unpin, + T: Sink, { type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - pin!(Pin::get_mut(self).inner.inner.0).start_send(item) + self.project() + .inner + .project() + .inner + .project() + .io + .start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_close(cx) } } @@ -130,8 +161,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.0) - .field("decoder", &self.inner.inner.1) + .field("inner", &self.inner.inner.io) + .field("decoder", &self.inner.inner.codec) .field("eof", &self.inner.eof) .field("is_readable", &self.inner.is_readable) .field("buffer", &self.inner.buffer) @@ -180,6 +211,10 @@ impl FramedRead2 { &mut self.inner } + pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { + self.project().inner + } + pub(crate) fn buffer(&self) -> &BytesMut { &self.buffer } @@ -187,49 +222,65 @@ impl FramedRead2 { impl Stream for FramedRead2 where - T: AsyncRead + Decoder + Unpin, + T: ProjectFuse + AsyncRead, + T::Codec: Decoder, { - type Item = Result; + type Item = Result<::Item, ::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let pinned = Pin::get_mut(self); + let mut pinned = self.project(); loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if pinned.is_readable { - if pinned.eof { - let frame = pinned.inner.decode_eof(&mut pinned.buffer)?; + if *pinned.is_readable { + if *pinned.eof { + let frame = pinned + .inner + .as_mut() + .project() + .codec + .decode_eof(&mut pinned.buffer)?; return Poll::Ready(frame.map(Ok)); } trace!("attempting to decode a frame"); - if let Some(frame) = pinned.inner.decode(&mut pinned.buffer)? { + if let Some(frame) = pinned + .inner + .as_mut() + .project() + .codec + .decode(&mut pinned.buffer)? + { trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); } - pinned.is_readable = false; + *pinned.is_readable = false; } - assert!(!pinned.eof); + assert!(!*pinned.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF pinned.buffer.reserve(1); - let bytect = match pin!(pinned.inner).poll_read_buf(cx, &mut pinned.buffer)? { + let bytect = match pinned + .inner + .as_mut() + .poll_read_buf(cx, &mut pinned.buffer)? + { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; if bytect == 0 { - pinned.eof = true; + *pinned.eof = true; } - pinned.is_readable = true; + *pinned.is_readable = true; } } } diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 3a95612c..a88d0893 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -1,6 +1,6 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed::Fuse; +use crate::codec::framed::{Fuse, ProjectFuse}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; @@ -8,17 +8,22 @@ use bytes::BytesMut; use futures_core::{ready, Stream}; use futures_sink::Sink; use log::trace; +use pin_project::pin_project; use std::fmt; use std::io::{self, BufRead, Read}; use std::pin::Pin; use std::task::{Context, Poll}; /// A `Sink` of frames encoded to an `AsyncWrite`. +#[pin_project] pub struct FramedWrite { + #[pin] inner: FramedWrite2>, } +#[pin_project] pub(crate) struct FramedWrite2 { + #[pin] inner: T, buffer: BytesMut, } @@ -34,7 +39,10 @@ where /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite { FramedWrite { - inner: framed_write2(Fuse(inner, encoder)), + inner: framed_write2(Fuse { + io: inner, + codec: encoder, + }), } } } @@ -47,7 +55,7 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.0 + &self.inner.inner.io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -57,7 +65,7 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 + &mut self.inner.inner.io } /// Consumes the `FramedWrite`, returning its underlying I/O stream. @@ -66,55 +74,60 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.0 + self.inner.inner.io } /// Returns a reference to the underlying decoder. pub fn encoder(&self) -> &E { - &self.inner.inner.1 + &self.inner.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.1 + &mut self.inner.inner.codec } } // This impl just defers to the underlying FramedWrite2 impl Sink for FramedWrite where - T: AsyncWrite + Unpin, - E: Encoder + Unpin, + T: AsyncWrite, + E: Encoder, E::Error: From, { type Error = E::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner).poll_ready(cx) + self.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - pin!(Pin::get_mut(self).inner).start_send(item) + self.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner).poll_flush(cx) + self.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(Pin::get_mut(self).inner).poll_close(cx) + self.project().inner.poll_close(cx) } } impl Stream for FramedWrite where - T: Stream + Unpin, - D: Unpin, + T: Stream, { type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(Pin::get_mut(self).get_mut()).poll_next(cx) + self.project() + .inner + .project() + .inner + .project() + .io + .poll_next(cx) } } @@ -125,8 +138,8 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().0) - .field("encoder", &self.inner.get_ref().1) + .field("inner", &self.inner.get_ref().io) + .field("encoder", &self.inner.get_ref().codec) .field("buffer", &self.inner.buffer) .finish() } @@ -169,9 +182,10 @@ impl FramedWrite2 { impl Sink for FramedWrite2 where - T: AsyncWrite + Encoder + Unpin, + T: ProjectFuse + AsyncWrite, + T::Codec: Encoder, { - type Error = T::Error; + type Error = ::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's @@ -191,20 +205,24 @@ where } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let pinned = Pin::get_mut(self); - pinned.inner.encode(item, &mut pinned.buffer)?; + let mut pinned = self.project(); + pinned + .inner + .project() + .codec + .encode(item, &mut pinned.buffer)?; Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("flushing framed transport"); - let pinned = Pin::get_mut(self); + let mut pinned = self.project(); while !pinned.buffer.is_empty() { trace!("writing; remaining={}", pinned.buffer.len()); let buf = &pinned.buffer; - let n = ready!(pin!(pinned.inner).poll_write(cx, &buf))?; + let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -220,15 +238,15 @@ where } // Try flushing the underlying IO - ready!(pin!(pinned.inner).poll_flush(cx))?; + ready!(pinned.inner.poll_flush(cx))?; trace!("framed transport flushed"); Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(pin!(self).poll_flush(cx))?; - ready!(pin!(self.inner).poll_shutdown(cx))?; + ready!(self.as_mut().poll_flush(cx))?; + ready!(self.project().inner.poll_shutdown(cx))?; Poll::Ready(Ok(())) } @@ -263,7 +281,7 @@ impl BufRead for FramedWrite2 { } } -impl AsyncRead for FramedWrite2 { +impl AsyncRead for FramedWrite2 { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } @@ -273,16 +291,28 @@ impl AsyncRead for FramedWrite2 { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - pin!(self.get_mut().inner).poll_read(cx, buf) + self.project().inner.poll_read(cx, buf) } } -impl AsyncBufRead for FramedWrite2 { +impl AsyncBufRead for FramedWrite2 { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pin!(self.get_mut().inner).poll_fill_buf(cx) + self.project().inner.poll_fill_buf(cx) } fn consume(self: Pin<&mut Self>, amt: usize) { - pin!(self.get_mut().inner).consume(amt) + self.project().inner.consume(amt) + } +} + +impl ProjectFuse for FramedWrite2 +where + T: ProjectFuse, +{ + type Io = T::Io; + type Codec = T::Codec; + + fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec> { + self.project().inner.project() } } diff --git a/tokio-util/src/codec/macros.rs b/tokio-util/src/codec/macros.rs deleted file mode 100644 index 902ada1c..00000000 --- a/tokio-util/src/codec/macros.rs +++ /dev/null @@ -1,7 +0,0 @@ -/// A macro to reduce some of the boilerplate for projecting from -/// `Pin<&mut T>` to `Pin<&mut T.field>` -macro_rules! pin { - ($e:expr) => { - std::pin::Pin::new(&mut $e) - }; -} diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index 203efae4..b162dd3a 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -9,9 +9,6 @@ //! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html -#[macro_use] -mod macros; - mod bytes_codec; pub use self::bytes_codec::BytesCodec; -- cgit v1.2.3