diff options
-rw-r--r-- | ipfs-api/src/client.rs | 37 | ||||
-rw-r--r-- | ipfs-api/src/header.rs | 26 | ||||
-rw-r--r-- | ipfs-api/src/read.rs | 42 | ||||
-rw-r--r-- | ipfs-api/src/response/error.rs | 4 |
4 files changed, 80 insertions, 29 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 157caff..486b9b4 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use futures::{stream, Stream}; +use futures::Stream; use futures::future::{Future, IntoFuture}; use header::Trailer; use read::{JsonLineDecoder, LineDecoder, StreamReader}; @@ -140,28 +140,9 @@ impl IpfsClient { D: 'static + Decoder<Item = Res, Error = Error>, Res: 'static, { - let err: Option<Error> = if let Some(trailer) = res.headers().get() { - // Response has the Trailer header set, which is used - // by Ipfs to indicate an error when preparing an output - // stream. - // - match trailer { - &Trailer::StreamError => Some(ErrorKind::StreamError.into()), - } - } else { - None - }; - let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder); - if let Some(inner) = err { - // If there was an error while streaming data back, read - // as much as possible from the stream, then return an error. - // - Box::new(stream.chain(stream::once(Err(inner)))) - } else { - Box::new(stream) - } + Box::new(stream) } /// Sends a request and returns the raw response. @@ -302,7 +283,19 @@ impl IpfsClient { .into_future() .flatten() .map(|res| { - IpfsClient::process_stream_response(res, JsonLineDecoder::new()) + let parse_stream_error = if let Some(trailer) = res.headers().get() { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + match trailer { + &Trailer::StreamError => true, + } + } else { + false + }; + + IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) }) .flatten_stream(); diff --git a/ipfs-api/src/header.rs b/ipfs-api/src/header.rs index 32941cc..486be0c 100644 --- a/ipfs-api/src/header.rs +++ b/ipfs-api/src/header.rs @@ -42,3 +42,29 @@ impl Header for Trailer { f.fmt_line(&value) } } + + +#[derive(Debug, Clone)] +pub struct XStreamError { + pub error: String, +} + +impl Header for XStreamError { + fn header_name() -> &'static str { + "X-Stream-Error" + } + + fn parse_header(raw: &Raw) -> hyper::Result<XStreamError> { + if let Some(bytes) = raw.one() { + let value = String::from_utf8_lossy(bytes); + + Ok(XStreamError { error: value.into_owned() }) + } else { + Err(hyper::Error::Header) + } + } + + fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result { + f.fmt_line(&self.error) + } +} diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index 9cea164..5396f46 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -8,8 +8,10 @@ use bytes::BytesMut; use futures::{Async, Stream}; +use header::XStreamError; use hyper::Chunk; -use response::Error; +use hyper::header::{Header, Raw}; +use response::{Error, ErrorKind}; use serde::Deserialize; use serde_json; use std::cmp; @@ -22,13 +24,21 @@ use tokio_io::codec::Decoder; /// A decoder for a response where each line is a full json object. /// pub struct JsonLineDecoder<T> { + /// Set to true if the stream can contain a X-Stream-Error header, + /// which indicates an error while streaming. + /// + parse_stream_error: bool, + ty: PhantomData<T>, } impl<T> JsonLineDecoder<T> { #[inline] - pub fn new() -> JsonLineDecoder<T> { - JsonLineDecoder { ty: PhantomData } + pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> { + JsonLineDecoder { + parse_stream_error, + ty: PhantomData, + } } } @@ -48,8 +58,30 @@ where if let Some(pos) = nl_index { let slice = src.split_to(pos + 1); - - serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from) + let slice = &slice[..slice.len() - 1]; + + match serde_json::from_slice(slice) { + Ok(json) => Ok(json), + // If a JSON object couldn't be parsed from the response, it is possible + // that a stream error trailing header was returned. If the JSON decoder + // was configured to parse these kinds of error, it should try. If a header + // couldn't be parsed, it will return the original error. + // + Err(e) => { + if self.parse_stream_error { + let raw = Raw::from(slice); + + match XStreamError::parse_header(&raw) { + Ok(stream_error) => Err( + ErrorKind::StreamError(stream_error.error).into(), + ), + Err(_) => Err(e.into()), + } + } else { + Err(e.into()) + } + } + } } else { Ok(None) } diff --git a/ipfs-api/src/response/error.rs b/ipfs-api/src/response/error.rs index f0bac60..5f67eec 100644 --- a/ipfs-api/src/response/error.rs +++ b/ipfs-api/src/response/error.rs @@ -40,9 +40,9 @@ error_chain! { /// A stream error indicated in the Trailer header. /// - StreamError { + StreamError(err: String) { description("api returned a stream error"), - display("api returned a stream error") + display("api returned an error while streaming: '{}'", err) } Uncategorized(err: String) { |