diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-25 12:11:09 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-25 12:11:09 -0500 |
commit | be4d9527b0ee4830d5434e6e70415d2f18af0180 (patch) | |
tree | bfcdd03b4df19c5148cdc35dd26f345e8e5f6bb3 /ipfs-api/src/read.rs | |
parent | b2ba32dcd849732d62eaed5940f5db5367e42fe1 (diff) |
handle streaming errors
Diffstat (limited to 'ipfs-api/src/read.rs')
-rw-r--r-- | ipfs-api/src/read.rs | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index 9cea164..5396f46 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -8,8 +8,10 @@ use bytes::BytesMut; use futures::{Async, Stream}; +use header::XStreamError; use hyper::Chunk; -use response::Error; +use hyper::header::{Header, Raw}; +use response::{Error, ErrorKind}; use serde::Deserialize; use serde_json; use std::cmp; @@ -22,13 +24,21 @@ use tokio_io::codec::Decoder; /// A decoder for a response where each line is a full json object. /// pub struct JsonLineDecoder<T> { + /// Set to true if the stream can contain a X-Stream-Error header, + /// which indicates an error while streaming. + /// + parse_stream_error: bool, + ty: PhantomData<T>, } impl<T> JsonLineDecoder<T> { #[inline] - pub fn new() -> JsonLineDecoder<T> { - JsonLineDecoder { ty: PhantomData } + pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> { + JsonLineDecoder { + parse_stream_error, + ty: PhantomData, + } } } @@ -48,8 +58,30 @@ where if let Some(pos) = nl_index { let slice = src.split_to(pos + 1); - - serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from) + let slice = &slice[..slice.len() - 1]; + + match serde_json::from_slice(slice) { + Ok(json) => Ok(json), + // If a JSON object couldn't be parsed from the response, it is possible + // that a stream error trailing header was returned. If the JSON decoder + // was configured to parse these kinds of error, it should try. If a header + // couldn't be parsed, it will return the original error. + // + Err(e) => { + if self.parse_stream_error { + let raw = Raw::from(slice); + + match XStreamError::parse_header(&raw) { + Ok(stream_error) => Err( + ErrorKind::StreamError(stream_error.error).into(), + ), + Err(_) => Err(e.into()), + } + } else { + Err(e.into()) + } + } + } } else { Ok(None) } |