summaryrefslogtreecommitdiffstats
path: root/ipfs-api-prelude/src/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api-prelude/src/read.rs')
-rw-r--r--ipfs-api-prelude/src/read.rs226
1 files changed, 226 insertions, 0 deletions
diff --git a/ipfs-api-prelude/src/read.rs b/ipfs-api-prelude/src/read.rs
new file mode 100644
index 0000000..118ac47
--- /dev/null
+++ b/ipfs-api-prelude/src/read.rs
@@ -0,0 +1,226 @@
+// Copyright 2017 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+use crate::header::X_STREAM_ERROR;
+use bytes::{Bytes, BytesMut};
+use futures::{
+ task::{Context, Poll},
+ Stream,
+};
+use serde::Deserialize;
+use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin};
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio_util::codec::Decoder;
+use tracing::{event, instrument, Level};
+
+/// A decoder for a response where each line is a full json object.
+///
+pub struct JsonLineDecoder<T> {
+ /// Set to true if the stream can contain a X-Stream-Error header,
+ /// which indicates an error while streaming.
+ ///
+ parse_stream_error: bool,
+
+ ty: PhantomData<T>,
+}
+
+impl<T> JsonLineDecoder<T> {
+ #[inline]
+ pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> {
+ JsonLineDecoder {
+ parse_stream_error,
+ ty: PhantomData,
+ }
+ }
+}
+
+impl<T> Decoder for JsonLineDecoder<T>
+where
+ for<'de> T: Deserialize<'de>,
+{
+ type Item = T;
+
+ type Error = crate::Error;
+
+ /// Tries to find a new line character. If it does, it will split the buffer,
+ /// and parse the first slice.
+ ///
+ #[instrument(skip(self, src), fields(stream_trailer = self.parse_stream_error))]
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ let nl_index = src.iter().position(|b| *b == b'\n');
+
+ if let Some(pos) = nl_index {
+ event!(Level::INFO, "Found new line delimeter in buffer");
+
+ let slice = src.split_to(pos + 1);
+ let slice = &slice[..slice.len() - 1];
+
+ match serde_json::from_slice(slice) {
+ Ok(json) => Ok(json),
+ // If a JSON object couldn't be parsed from the response, it is possible
+ // that a stream error trailing header was returned. If the JSON decoder
+ // was configured to parse these kinds of error, it should try. If a header
+ // couldn't be parsed, it will return the original error.
+ //
+ Err(e) => {
+ if self.parse_stream_error {
+ match slice.iter().position(|&x| x == b':') {
+ Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => {
+ let e = crate::Error::StreamError(
+ String::from_utf8_lossy(&slice[colon + 2..]).into(),
+ );
+
+ Err(e)
+ }
+ _ => Err(e.into()),
+ }
+ } else {
+ Err(e.into())
+ }
+ }
+ }
+ } else {
+ event!(Level::INFO, "Waiting for more data to decode JSON");
+
+ Ok(None)
+ }
+ }
+}
+
+/// A decoder that reads a line at a time.
+///
+pub struct LineDecoder;
+
+impl Decoder for LineDecoder {
+ type Item = String;
+
+ type Error = crate::Error;
+
+ /// Attempts to find a new line character, and returns the entire line if
+ /// it finds one.
+ ///
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ let nl_index = src.iter().position(|b| *b == b'\n');
+
+ if let Some(pos) = nl_index {
+ let slice = src.split_to(pos + 1);
+
+ Ok(Some(
+ String::from_utf8_lossy(&slice[..slice.len() - 1]).into_owned(),
+ ))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+/// Copies bytes from a Bytes chunk into a destination buffer, and returns
+/// the number of bytes that were read.
+///
+fn copy_from_chunk_to(dest: &mut ReadBuf<'_>, chunk: &mut Bytes, chunk_start: usize) -> usize {
+ let len = cmp::min(dest.capacity(), chunk.len() - chunk_start);
+ let chunk_end = chunk_start + len;
+
+ dest.put_slice(&chunk[chunk_start..chunk_end]);
+
+ len
+}
+
+/// The state of a stream returning Chunks.
+///
+enum ReadState {
+ /// A chunk is ready to be read from.
+ ///
+ Ready(Bytes, usize),
+
+ /// The next chunk isn't ready yet.
+ ///
+ NotReady,
+}
+
+/// Reads from a stream of chunks asynchronously.
+///
+pub struct StreamReader<S> {
+ stream: S,
+ state: ReadState,
+}
+
+impl<S, E> StreamReader<S>
+where
+ S: Stream<Item = Result<Bytes, E>>,
+ E: Display,
+{
+ #[inline]
+ pub fn new(stream: S) -> StreamReader<S> {
+ StreamReader {
+ stream,
+ state: ReadState::NotReady,
+ }
+ }
+}
+
+impl<S, E> AsyncRead for StreamReader<S>
+where
+ S: Stream<Item = Result<Bytes, E>> + Unpin,
+ E: Display,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.state {
+ // Stream yielded a Chunk to read.
+ //
+ ReadState::Ready(ref mut chunk, ref mut pos) => {
+ let bytes_read = copy_from_chunk_to(buf, chunk, *pos);
+
+ if *pos + bytes_read >= chunk.len() {
+ self.state = ReadState::NotReady;
+ } else {
+ *pos += bytes_read;
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ // Stream is not ready, and a Chunk needs to be read.
+ //
+ ReadState::NotReady => {
+ match Stream::poll_next(Pin::new(&mut self.stream), cx) {
+ // Polling stream yielded a Chunk that can be read from.
+ //
+ Poll::Ready(Some(Ok(mut chunk))) => {
+ let bytes_read = copy_from_chunk_to(buf, &mut chunk, 0);
+
+ if bytes_read >= chunk.len() {
+ self.state = ReadState::NotReady;
+ } else {
+ self.state = ReadState::Ready(chunk, bytes_read);
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ Poll::Ready(Some(Err(e))) => {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ e.to_string(),
+ )));
+ }
+ // Polling stream yielded EOF.
+ //
+ Poll::Ready(None) => return Poll::Ready(Ok(())),
+ // Stream could not be read from.
+ //
+ Poll::Pending => (),
+ }
+ }
+ }
+
+ Poll::Pending
+ }
+}