use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; use tokio::{ io::{AsyncRead, AsyncWrite}, stream::Stream, }; use bytes::BytesMut; use futures_core::ready; use futures_sink::Sink; use log::trace; use pin_project_lite::pin_project; use std::borrow::{Borrow, BorrowMut}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { #[derive(Debug)] pub(crate) struct FramedImpl { #[pin] pub(crate) inner: T, pub(crate) state: State, pub(crate) codec: U, } } const INITIAL_CAPACITY: usize = 8 * 1024; const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; pub(crate) struct ReadFrame { pub(crate) eof: bool, pub(crate) is_readable: bool, pub(crate) buffer: BytesMut, } pub(crate) struct WriteFrame { pub(crate) buffer: BytesMut, } #[derive(Default)] pub(crate) struct RWFrames { pub(crate) read: ReadFrame, pub(crate) write: WriteFrame, } impl Default for ReadFrame { fn default() -> Self { Self { eof: false, is_readable: false, buffer: BytesMut::with_capacity(INITIAL_CAPACITY), } } } impl Default for WriteFrame { fn default() -> Self { Self { buffer: BytesMut::with_capacity(INITIAL_CAPACITY), } } } impl From for ReadFrame { fn from(mut buffer: BytesMut) -> Self { let size = buffer.capacity(); if size < INITIAL_CAPACITY { buffer.reserve(INITIAL_CAPACITY - size); } Self { buffer, is_readable: size > 0, eof: false, } } } impl From for WriteFrame { fn from(mut buffer: BytesMut) -> Self { let size = buffer.capacity(); if size < INITIAL_CAPACITY { buffer.reserve(INITIAL_CAPACITY - size); } Self { buffer } } } impl Borrow for RWFrames { fn borrow(&self) -> &ReadFrame { &self.read } } impl BorrowMut for RWFrames { fn borrow_mut(&mut self) -> &mut ReadFrame { &mut self.read } } impl Borrow for RWFrames { fn borrow(&self) -> &WriteFrame { &self.write } } impl BorrowMut for RWFrames { fn borrow_mut(&mut self) -> &mut WriteFrame { &mut self.write } } impl Stream for FramedImpl where T: AsyncRead, U: Decoder, R: BorrowMut, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use crate::util::poll_read_buf; let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. if state.is_readable { if state.eof { let frame = pinned.codec.decode_eof(&mut state.buffer)?; return Poll::Ready(frame.map(Ok)); } trace!("attempting to decode a frame"); if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); } state.is_readable = false; } assert!(!state.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; if bytect == 0 { state.eof = true; } state.is_readable = true; } } } impl Sink for FramedImpl where T: AsyncWrite, U: Encoder, U::Error: From, W: BorrowMut, { type Error = U::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY { self.as_mut().poll_flush(cx) } else { Poll::Ready(Ok(())) } } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { let pinned = self.project(); pinned .codec .encode(item, &mut pinned.state.borrow_mut().buffer)?; Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use crate::util::poll_write_buf; trace!("flushing framed transport"); let mut pinned = self.project(); while !pinned.state.borrow_mut().buffer.is_empty() { let WriteFrame { buffer } = pinned.state.borrow_mut(); trace!("writing; remaining={}", buffer.len()); let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?; if n == 0 { return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, "failed to \ write frame to transport", ) .into())); } } // Try flushing the underlying IO ready!(pinned.inner.poll_flush(cx))?; trace!("framed transport flushed"); Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush(cx))?; ready!(self.project().inner.poll_shutdown(cx))?; Poll::Ready(Ok(())) } }