summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2019-12-22 02:22:37 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2019-12-22 02:22:37 -0500
commit27c25f96ac923bb4d6e02890eae08752f1c8228e (patch)
tree5641b5b7b466caede92f2466fda4d9bdad9a0e59
parent949042dc9dde52c90e29aecb122d8d0105639b8f (diff)
update decoder and asyncread traits for 0.2 (untested)
-rw-r--r--ipfs-api/src/read.rs125
1 files changed, 71 insertions, 54 deletions
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs
index a807893..3ae35b0 100644
--- a/ipfs-api/src/read.rs
+++ b/ipfs-api/src/read.rs
@@ -9,16 +9,15 @@
use crate::header::X_STREAM_ERROR;
use crate::response::Error;
use bytes::{Bytes, BytesMut};
-use futures::{Async, Stream};
+use futures::{
+ task::{Context, Poll},
+ Stream,
+};
use serde::Deserialize;
use serde_json;
-use std::{
- cmp,
- io::{self, Read},
- marker::PhantomData,
-};
-use tokio_codec::Decoder;
-use tokio_io::AsyncRead;
+use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin};
+use tokio::io::AsyncRead;
+use tokio_util::codec::Decoder;
/// A decoder for a response where each line is a full json object.
///
@@ -116,6 +115,18 @@ impl Decoder for LineDecoder {
}
}
+/// Copies bytes from a Bytes chunk into a destination buffer, and returns
+/// the number of bytes that were read.
+///
+fn copy_from_chunk_to(dest: &mut [u8], chunk: &mut Bytes, chunk_start: usize) -> usize {
+ let len = cmp::min(dest.len(), chunk.len() - chunk_start);
+ let chunk_end = chunk_start + len;
+
+ dest[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
+
+ len
+}
+
/// The state of a stream returning Chunks.
///
enum ReadState {
@@ -135,9 +146,10 @@ pub struct StreamReader<S> {
state: ReadState,
}
-impl<S> StreamReader<S>
+impl<S, E> StreamReader<S>
where
- S: Stream<Item = Bytes, Error = Error>,
+ S: Stream<Item = Result<Bytes, E>>,
+ E: Display,
{
#[inline]
pub fn new(stream: S) -> StreamReader<S> {
@@ -148,58 +160,63 @@ where
}
}
-impl<S> Read for StreamReader<S>
+impl<S, E> AsyncRead for StreamReader<S>
where
- S: Stream<Item = Bytes, Error = Error>,
+ S: Stream<Item = Result<Bytes, E>> + Unpin,
+ E: Display,
{
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- loop {
- let ret;
-
- match self.state {
- // Stream yielded a Chunk to read.
- //
- ReadState::Ready(ref mut chunk, ref mut pos) => {
- let chunk_start = *pos;
- let len = cmp::min(buf.len(), chunk.len() - chunk_start);
- let chunk_end = chunk_start + len;
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context,
+ mut buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.state {
+ // Stream yielded a Chunk to read.
+ //
+ ReadState::Ready(ref mut chunk, ref mut pos) => {
+ let bytes_read = copy_from_chunk_to(buf, chunk, *pos);
+
+ if *pos + bytes_read >= chunk.len() {
+ self.state = ReadState::NotReady;
+ } else {
+ *pos += bytes_read;
+ }
- buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
- *pos += len;
+ return Poll::Ready(Ok(bytes_read));
+ }
+ // Stream is not ready, and a Chunk needs to be read.
+ //
+ ReadState::NotReady => {
+ match Stream::poll_next(Pin::new(&mut self.stream), cx) {
+ // Polling stream yielded a Chunk that can be read from.
+ //
+ Poll::Ready(Some(Ok(mut chunk))) => {
+ let bytes_read = copy_from_chunk_to(&mut buf, &mut chunk, 0);
+
+ if bytes_read >= chunk.len() {
+ self.state = ReadState::NotReady;
+ } else {
+ self.state = ReadState::Ready(chunk, bytes_read);
+ }
- if *pos == chunk.len() {
- ret = len;
- } else {
- return Ok(len);
+ return Poll::Ready(Ok(bytes_read));
}
- }
- // Stream is not ready, and a Chunk needs to be read.
- //
- ReadState::NotReady => {
- match self.stream.poll() {
- // Polling stream yielded a Chunk that can be read from.
- //
- Ok(Async::Ready(Some(chunk))) => {
- self.state = ReadState::Ready(chunk, 0);
-
- continue;
- }
- // Polling stream yielded EOF.
- //
- Ok(Async::Ready(None)) => return Ok(0),
- // Stream could not be read from.
- //
- Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
- Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+ Poll::Ready(Some(Err(e))) => {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ e.to_string(),
+ )));
}
+ // Polling stream yielded EOF.
+ //
+ Poll::Ready(None) => return Poll::Ready(Ok(0)),
+ // Stream could not be read from.
+ //
+ Poll::Pending => (),
}
}
-
- self.state = ReadState::NotReady;
-
- return Ok(ret);
}
+
+ Poll::Pending
}
}
-
-impl<S> AsyncRead for StreamReader<S> where S: Stream<Item = Bytes, Error = Error> {}