summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/read.rs
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-25 12:11:09 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-25 12:11:09 -0500
commitbe4d9527b0ee4830d5434e6e70415d2f18af0180 (patch)
treebfcdd03b4df19c5148cdc35dd26f345e8e5f6bb3 /ipfs-api/src/read.rs
parentb2ba32dcd849732d62eaed5940f5db5367e42fe1 (diff)
handle streaming errors
Diffstat (limited to 'ipfs-api/src/read.rs')
-rw-r--r--ipfs-api/src/read.rs42
1 files changed, 37 insertions, 5 deletions
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs
index 9cea164..5396f46 100644
--- a/ipfs-api/src/read.rs
+++ b/ipfs-api/src/read.rs
@@ -8,8 +8,10 @@
use bytes::BytesMut;
use futures::{Async, Stream};
+use header::XStreamError;
use hyper::Chunk;
-use response::Error;
+use hyper::header::{Header, Raw};
+use response::{Error, ErrorKind};
use serde::Deserialize;
use serde_json;
use std::cmp;
@@ -22,13 +24,21 @@ use tokio_io::codec::Decoder;
/// A decoder for a response where each line is a full json object.
///
pub struct JsonLineDecoder<T> {
+ /// Set to true if the stream can contain a X-Stream-Error header,
+ /// which indicates an error while streaming.
+ ///
+ parse_stream_error: bool,
+
ty: PhantomData<T>,
}
impl<T> JsonLineDecoder<T> {
#[inline]
- pub fn new() -> JsonLineDecoder<T> {
- JsonLineDecoder { ty: PhantomData }
+ pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> {
+ JsonLineDecoder {
+ parse_stream_error,
+ ty: PhantomData,
+ }
}
}
@@ -48,8 +58,30 @@ where
if let Some(pos) = nl_index {
let slice = src.split_to(pos + 1);
-
- serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from)
+ let slice = &slice[..slice.len() - 1];
+
+ match serde_json::from_slice(slice) {
+ Ok(json) => Ok(json),
+ // If a JSON object couldn't be parsed from the response, it is possible
+ // that a stream error trailing header was returned. If the JSON decoder
+ // was configured to parse these kinds of error, it should try. If a header
+ // couldn't be parsed, it will return the original error.
+ //
+ Err(e) => {
+ if self.parse_stream_error {
+ let raw = Raw::from(slice);
+
+ match XStreamError::parse_header(&raw) {
+ Ok(stream_error) => Err(
+ ErrorKind::StreamError(stream_error.error).into(),
+ ),
+ Err(_) => Err(e.into()),
+ }
+ } else {
+ Err(e.into())
+ }
+ }
+ }
} else {
Ok(None)
}