From 27c25f96ac923bb4d6e02890eae08752f1c8228e Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 22 Dec 2019 02:22:37 -0500 Subject: update decoder and asyncread traits for 0.2 (untested) --- ipfs-api/src/read.rs | 125 +++++++++++++++++++++++++++++---------------------- 1 file changed, 71 insertions(+), 54 deletions(-) diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index a807893..3ae35b0 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -9,16 +9,15 @@ use crate::header::X_STREAM_ERROR; use crate::response::Error; use bytes::{Bytes, BytesMut}; -use futures::{Async, Stream}; +use futures::{ + task::{Context, Poll}, + Stream, +}; use serde::Deserialize; use serde_json; -use std::{ - cmp, - io::{self, Read}, - marker::PhantomData, -}; -use tokio_codec::Decoder; -use tokio_io::AsyncRead; +use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin}; +use tokio::io::AsyncRead; +use tokio_util::codec::Decoder; /// A decoder for a response where each line is a full json object. /// @@ -116,6 +115,18 @@ impl Decoder for LineDecoder { } } +/// Copies bytes from a Bytes chunk into a destination buffer, and returns +/// the number of bytes that were read. +/// +fn copy_from_chunk_to(dest: &mut [u8], chunk: &mut Bytes, chunk_start: usize) -> usize { + let len = cmp::min(dest.len(), chunk.len() - chunk_start); + let chunk_end = chunk_start + len; + + dest[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); + + len +} + /// The state of a stream returning Chunks. /// enum ReadState { @@ -135,9 +146,10 @@ pub struct StreamReader { state: ReadState, } -impl StreamReader +impl StreamReader where - S: Stream, + S: Stream>, + E: Display, { #[inline] pub fn new(stream: S) -> StreamReader { @@ -148,58 +160,63 @@ where } } -impl Read for StreamReader +impl AsyncRead for StreamReader where - S: Stream, + S: Stream> + Unpin, + E: Display, { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - loop { - let ret; - - match self.state { - // Stream yielded a Chunk to read. - // - ReadState::Ready(ref mut chunk, ref mut pos) => { - let chunk_start = *pos; - let len = cmp::min(buf.len(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + mut buf: &mut [u8], + ) -> Poll> { + match self.state { + // Stream yielded a Chunk to read. + // + ReadState::Ready(ref mut chunk, ref mut pos) => { + let bytes_read = copy_from_chunk_to(buf, chunk, *pos); + + if *pos + bytes_read >= chunk.len() { + self.state = ReadState::NotReady; + } else { + *pos += bytes_read; + } - buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); - *pos += len; + return Poll::Ready(Ok(bytes_read)); + } + // Stream is not ready, and a Chunk needs to be read. + // + ReadState::NotReady => { + match Stream::poll_next(Pin::new(&mut self.stream), cx) { + // Polling stream yielded a Chunk that can be read from. + // + Poll::Ready(Some(Ok(mut chunk))) => { + let bytes_read = copy_from_chunk_to(&mut buf, &mut chunk, 0); + + if bytes_read >= chunk.len() { + self.state = ReadState::NotReady; + } else { + self.state = ReadState::Ready(chunk, bytes_read); + } - if *pos == chunk.len() { - ret = len; - } else { - return Ok(len); + return Poll::Ready(Ok(bytes_read)); } - } - // Stream is not ready, and a Chunk needs to be read. - // - ReadState::NotReady => { - match self.stream.poll() { - // Polling stream yielded a Chunk that can be read from. - // - Ok(Async::Ready(Some(chunk))) => { - self.state = ReadState::Ready(chunk, 0); - - continue; - } - // Polling stream yielded EOF. - // - Ok(Async::Ready(None)) => return Ok(0), - // Stream could not be read from. - // - Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), - Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + e.to_string(), + ))); } + // Polling stream yielded EOF. + // + Poll::Ready(None) => return Poll::Ready(Ok(0)), + // Stream could not be read from. + // + Poll::Pending => (), } } - - self.state = ReadState::NotReady; - - return Ok(ret); } + + Poll::Pending } } - -impl AsyncRead for StreamReader where S: Stream {} -- cgit v1.2.3