summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-25 12:11:09 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-25 12:11:09 -0500
commitbe4d9527b0ee4830d5434e6e70415d2f18af0180 (patch)
treebfcdd03b4df19c5148cdc35dd26f345e8e5f6bb3
parentb2ba32dcd849732d62eaed5940f5db5367e42fe1 (diff)
handle streaming errors
-rw-r--r--ipfs-api/src/client.rs37
-rw-r--r--ipfs-api/src/header.rs26
-rw-r--r--ipfs-api/src/read.rs42
-rw-r--r--ipfs-api/src/response/error.rs4
4 files changed, 80 insertions, 29 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 157caff..486b9b4 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -6,7 +6,7 @@
// copied, modified, or distributed except according to those terms.
//
-use futures::{stream, Stream};
+use futures::Stream;
use futures::future::{Future, IntoFuture};
use header::Trailer;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
@@ -140,28 +140,9 @@ impl IpfsClient {
D: 'static + Decoder<Item = Res, Error = Error>,
Res: 'static,
{
- let err: Option<Error> = if let Some(trailer) = res.headers().get() {
- // Response has the Trailer header set, which is used
- // by Ipfs to indicate an error when preparing an output
- // stream.
- //
- match trailer {
- &Trailer::StreamError => Some(ErrorKind::StreamError.into()),
- }
- } else {
- None
- };
-
let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder);
- if let Some(inner) = err {
- // If there was an error while streaming data back, read
- // as much as possible from the stream, then return an error.
- //
- Box::new(stream.chain(stream::once(Err(inner))))
- } else {
- Box::new(stream)
- }
+ Box::new(stream)
}
/// Sends a request and returns the raw response.
@@ -302,7 +283,19 @@ impl IpfsClient {
.into_future()
.flatten()
.map(|res| {
- IpfsClient::process_stream_response(res, JsonLineDecoder::new())
+ let parse_stream_error = if let Some(trailer) = res.headers().get() {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ match trailer {
+ &Trailer::StreamError => true,
+ }
+ } else {
+ false
+ };
+
+ IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
})
.flatten_stream();
diff --git a/ipfs-api/src/header.rs b/ipfs-api/src/header.rs
index 32941cc..486be0c 100644
--- a/ipfs-api/src/header.rs
+++ b/ipfs-api/src/header.rs
@@ -42,3 +42,29 @@ impl Header for Trailer {
f.fmt_line(&value)
}
}
+
+
+#[derive(Debug, Clone)]
+pub struct XStreamError {
+ pub error: String,
+}
+
+impl Header for XStreamError {
+ fn header_name() -> &'static str {
+ "X-Stream-Error"
+ }
+
+ fn parse_header(raw: &Raw) -> hyper::Result<XStreamError> {
+ if let Some(bytes) = raw.one() {
+ let value = String::from_utf8_lossy(bytes);
+
+ Ok(XStreamError { error: value.into_owned() })
+ } else {
+ Err(hyper::Error::Header)
+ }
+ }
+
+ fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result {
+ f.fmt_line(&self.error)
+ }
+}
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs
index 9cea164..5396f46 100644
--- a/ipfs-api/src/read.rs
+++ b/ipfs-api/src/read.rs
@@ -8,8 +8,10 @@
use bytes::BytesMut;
use futures::{Async, Stream};
+use header::XStreamError;
use hyper::Chunk;
-use response::Error;
+use hyper::header::{Header, Raw};
+use response::{Error, ErrorKind};
use serde::Deserialize;
use serde_json;
use std::cmp;
@@ -22,13 +24,21 @@ use tokio_io::codec::Decoder;
/// A decoder for a response where each line is a full json object.
///
pub struct JsonLineDecoder<T> {
+ /// Set to true if the stream can contain a X-Stream-Error header,
+ /// which indicates an error while streaming.
+ ///
+ parse_stream_error: bool,
+
ty: PhantomData<T>,
}
impl<T> JsonLineDecoder<T> {
#[inline]
- pub fn new() -> JsonLineDecoder<T> {
- JsonLineDecoder { ty: PhantomData }
+ pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> {
+ JsonLineDecoder {
+ parse_stream_error,
+ ty: PhantomData,
+ }
}
}
@@ -48,8 +58,30 @@ where
if let Some(pos) = nl_index {
let slice = src.split_to(pos + 1);
-
- serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from)
+ let slice = &slice[..slice.len() - 1];
+
+ match serde_json::from_slice(slice) {
+ Ok(json) => Ok(json),
+ // If a JSON object couldn't be parsed from the response, it is possible
+ // that a stream error trailing header was returned. If the JSON decoder
+ // was configured to parse these kinds of error, it should try. If a header
+ // couldn't be parsed, it will return the original error.
+ //
+ Err(e) => {
+ if self.parse_stream_error {
+ let raw = Raw::from(slice);
+
+ match XStreamError::parse_header(&raw) {
+ Ok(stream_error) => Err(
+ ErrorKind::StreamError(stream_error.error).into(),
+ ),
+ Err(_) => Err(e.into()),
+ }
+ } else {
+ Err(e.into())
+ }
+ }
+ }
} else {
Ok(None)
}
diff --git a/ipfs-api/src/response/error.rs b/ipfs-api/src/response/error.rs
index f0bac60..5f67eec 100644
--- a/ipfs-api/src/response/error.rs
+++ b/ipfs-api/src/response/error.rs
@@ -40,9 +40,9 @@ error_chain! {
/// A stream error indicated in the Trailer header.
///
- StreamError {
+ StreamError(err: String) {
description("api returned a stream error"),
- display("api returned a stream error")
+ display("api returned an error while streaming: '{}'", err)
}
Uncategorized(err: String) {