diff options
Diffstat (limited to 'ipfs-api/src/read.rs')
-rw-r--r-- | ipfs-api/src/read.rs | 226 |
1 files changed, 0 insertions, 226 deletions
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs deleted file mode 100644 index d17e396..0000000 --- a/ipfs-api/src/read.rs +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2017 rust-ipfs-api Developers -// -// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or -// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or -// http://opensource.org/licenses/MIT>, at your option. This file may not be -// copied, modified, or distributed except according to those terms. -// - -use crate::{header::X_STREAM_ERROR, response::Error}; -use bytes::{Bytes, BytesMut}; -use futures::{ - task::{Context, Poll}, - Stream, -}; -use serde::Deserialize; -use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin}; -use tokio::io::{AsyncRead, ReadBuf}; -use tokio_util::codec::Decoder; -use tracing::{event, instrument, Level}; - -/// A decoder for a response where each line is a full json object. -/// -pub struct JsonLineDecoder<T> { - /// Set to true if the stream can contain a X-Stream-Error header, - /// which indicates an error while streaming. - /// - parse_stream_error: bool, - - ty: PhantomData<T>, -} - -impl<T> JsonLineDecoder<T> { - #[inline] - pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> { - JsonLineDecoder { - parse_stream_error, - ty: PhantomData, - } - } -} - -impl<T> Decoder for JsonLineDecoder<T> -where - for<'de> T: Deserialize<'de>, -{ - type Item = T; - - type Error = Error; - - /// Tries to find a new line character. If it does, it will split the buffer, - /// and parse the first slice. - /// - #[instrument(skip(self, src), fields(stream_trailer = self.parse_stream_error))] - fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - let nl_index = src.iter().position(|b| *b == b'\n'); - - if let Some(pos) = nl_index { - event!(Level::INFO, "Found new line delimeter in buffer"); - - let slice = src.split_to(pos + 1); - let slice = &slice[..slice.len() - 1]; - - match serde_json::from_slice(slice) { - Ok(json) => Ok(json), - // If a JSON object couldn't be parsed from the response, it is possible - // that a stream error trailing header was returned. If the JSON decoder - // was configured to parse these kinds of error, it should try. If a header - // couldn't be parsed, it will return the original error. - // - Err(e) => { - if self.parse_stream_error { - match slice.iter().position(|&x| x == b':') { - Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => { - let e = Error::StreamError( - String::from_utf8_lossy(&slice[colon + 2..]).into(), - ); - - Err(e) - } - _ => Err(e.into()), - } - } else { - Err(e.into()) - } - } - } - } else { - event!(Level::INFO, "Waiting for more data to decode JSON"); - - Ok(None) - } - } -} - -/// A decoder that reads a line at a time. -/// -pub struct LineDecoder; - -impl Decoder for LineDecoder { - type Item = String; - - type Error = Error; - - /// Attempts to find a new line character, and returns the entire line if - /// it finds one. - /// - fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - let nl_index = src.iter().position(|b| *b == b'\n'); - - if let Some(pos) = nl_index { - let slice = src.split_to(pos + 1); - - Ok(Some( - String::from_utf8_lossy(&slice[..slice.len() - 1]).into_owned(), - )) - } else { - Ok(None) - } - } -} - -/// Copies bytes from a Bytes chunk into a destination buffer, and returns -/// the number of bytes that were read. -/// -fn copy_from_chunk_to(dest: &mut ReadBuf<'_>, chunk: &mut Bytes, chunk_start: usize) -> usize { - let len = cmp::min(dest.capacity(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; - - dest.put_slice(&chunk[chunk_start..chunk_end]); - - len -} - -/// The state of a stream returning Chunks. -/// -enum ReadState { - /// A chunk is ready to be read from. - /// - Ready(Bytes, usize), - - /// The next chunk isn't ready yet. - /// - NotReady, -} - -/// Reads from a stream of chunks asynchronously. -/// -pub struct StreamReader<S> { - stream: S, - state: ReadState, -} - -impl<S, E> StreamReader<S> -where - S: Stream<Item = Result<Bytes, E>>, - E: Display, -{ - #[inline] - pub fn new(stream: S) -> StreamReader<S> { - StreamReader { - stream, - state: ReadState::NotReady, - } - } -} - -impl<S, E> AsyncRead for StreamReader<S> -where - S: Stream<Item = Result<Bytes, E>> + Unpin, - E: Display, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - match self.state { - // Stream yielded a Chunk to read. - // - ReadState::Ready(ref mut chunk, ref mut pos) => { - let bytes_read = copy_from_chunk_to(buf, chunk, *pos); - - if *pos + bytes_read >= chunk.len() { - self.state = ReadState::NotReady; - } else { - *pos += bytes_read; - } - - return Poll::Ready(Ok(())); - } - // Stream is not ready, and a Chunk needs to be read. - // - ReadState::NotReady => { - match Stream::poll_next(Pin::new(&mut self.stream), cx) { - // Polling stream yielded a Chunk that can be read from. - // - Poll::Ready(Some(Ok(mut chunk))) => { - let bytes_read = copy_from_chunk_to(buf, &mut chunk, 0); - - if bytes_read >= chunk.len() { - self.state = ReadState::NotReady; - } else { - self.state = ReadState::Ready(chunk, bytes_read); - } - - return Poll::Ready(Ok(())); - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - e.to_string(), - ))); - } - // Polling stream yielded EOF. - // - Poll::Ready(None) => return Poll::Ready(Ok(())), - // Stream could not be read from. - // - Poll::Pending => (), - } - } - } - - Poll::Pending - } -} |