From 7dddbde3a1abcb96b75aa4636a07498193f7c551 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 20:41:44 -0500 Subject: migrate common code to ipfs-api-prelude --- ipfs-api-prelude/src/backend.rs | 220 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 ipfs-api-prelude/src/backend.rs (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs new file mode 100644 index 0000000..bcc32e9 --- /dev/null +++ b/ipfs-api-prelude/src/backend.rs @@ -0,0 +1,220 @@ +// Copyright 2021 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use crate::{ + header::{TRAILER, X_STREAM_ERROR_KEY}, + read::{JsonLineDecoder, StreamReader}, + ApiError, ApiRequest, +}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt}; +use http::{ + header::{HeaderName, HeaderValue}, + StatusCode, +}; +use serde::{Deserialize, Serialize}; +use std::{fmt::Display, string::FromUtf8Error}; +use tokio_util::codec::{Decoder, FramedRead}; + +#[async_trait(?Send)] +pub trait Backend: Default { + /// HTTP request type. + /// + type HttpRequest; + + /// HTTP response type. + /// + type HttpResponse; + + /// HTTP multipart form type. + /// + type MultipartForm: Default; + + /// Error type for Result. + /// + type Error: Display + + From + + From + + From + + From + + 'static; + + fn build_base_request( + &self, + req: &Req, + form: Option, + ) -> Result + where + Req: ApiRequest; + + fn get_header<'a>(res: &'a Self::HttpResponse, key: HeaderName) -> Option<&'a HeaderValue>; + + async fn request_raw( + &self, + req: Req, + form: Option, + ) -> Result<(StatusCode, Bytes), Self::Error> + where + Req: ApiRequest + Serialize; + + fn response_to_byte_stream( + res: Self::HttpResponse, + ) -> Box> + Unpin>; + + fn request_stream( + &self, + req: Self::HttpRequest, + process: F, + ) -> Box> + Unpin> + where + OutStream: Stream> + Unpin, + F: 'static + Fn(Self::HttpResponse) -> OutStream; + + /// Builds an Api error from a response body. + /// + #[inline] + fn process_error_from_body(body: Bytes) -> Self::Error { + match serde_json::from_slice::(&body) { + Ok(e) => e.into(), + Err(_) => match String::from_utf8(body.to_vec()) { + Ok(s) => crate::Error::UnrecognizedApiError(s).into(), + Err(e) => e.into(), + }, + } + } + + /// Processes a response that expects a json encoded body, returning an + /// error or a deserialized json response. + /// + fn process_json_response(status: StatusCode, body: Bytes) -> Result + where + for<'de> Res: 'static + Deserialize<'de>, + { + match status { + StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + _ => Err(Self::process_error_from_body(body)), + } + } + + /// Processes a response that returns a stream of json deserializable + /// results. + /// + fn process_stream_response( + res: Self::HttpResponse, + decoder: D, + ) -> FramedRead> + Unpin>>, D> + where + D: Decoder, + { + FramedRead::new( + StreamReader::new(Self::response_to_byte_stream(res)), + decoder, + ) + } + + /// Generic method for making a request to the Ipfs server, and getting + /// a deserializable response. + /// + async fn request( + &self, + req: Req, + form: Option, + ) -> Result + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de>, + { + let (status, chunk) = self.request_raw(req, form).await?; + + Self::process_json_response(status, chunk) + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a response with no body. + /// + async fn request_empty( + &self, + req: Req, + form: Option, + ) -> Result<(), Self::Error> + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + async fn request_string( + &self, + req: Req, + form: Option, + ) -> Result + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(Self::Error::from), + _ => Err(Self::process_error_from_body(chunk)), + } + } + + fn request_stream_bytes( + &self, + req: Self::HttpRequest, + ) -> Box> + Unpin> { + self.request_stream(req, |res| Self::response_to_byte_stream(res)) + } + + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json( + &self, + req: Self::HttpRequest, + ) -> Box> + Unpin> + where + for<'de> Res: 'static + Deserialize<'de>, + { + self.request_stream(req, |res| { + let parse_stream_error = if let Some(trailer) = Self::get_header(&res, TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == X_STREAM_ERROR_KEY { + true + } else { + let err = crate::Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return future::err(err).into_stream().err_into().left_stream(); + } + } else { + false + }; + + Self::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) + .err_into() + .right_stream() + }) + } +} -- cgit v1.2.3 From 4ede307efb87bad2627190665279123e588a3494 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 22:56:04 -0500 Subject: centralize errors in prelude as much as possible --- ipfs-api-prelude/src/backend.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index bcc32e9..6a335f8 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -38,12 +38,7 @@ pub trait Backend: Default { /// Error type for Result. /// - type Error: Display - + From - + From - + From - + From - + 'static; + type Error: Display + From + From + 'static; fn build_base_request( &self, @@ -82,10 +77,14 @@ pub trait Backend: Default { fn process_error_from_body(body: Bytes) -> Self::Error { match serde_json::from_slice::(&body) { Ok(e) => e.into(), - Err(_) => match String::from_utf8(body.to_vec()) { - Ok(s) => crate::Error::UnrecognizedApiError(s).into(), - Err(e) => e.into(), - }, + Err(_) => { + let err = match String::from_utf8(body.to_vec()) { + Ok(s) => crate::Error::UnrecognizedApiError(s), + Err(e) => crate::Error::from(e), + }; + + err.into() + } } } @@ -97,7 +96,9 @@ pub trait Backend: Default { for<'de> Res: 'static + Deserialize<'de>, { match status { - StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + StatusCode::OK => serde_json::from_slice(&body) + .map_err(crate::Error::from) + .map_err(Self::Error::from), _ => Err(Self::process_error_from_body(body)), } } @@ -168,7 +169,9 @@ pub trait Backend: Default { let (status, chunk) = self.request_raw(req, form).await?; match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(Self::Error::from), + StatusCode::OK => String::from_utf8(chunk.to_vec()) + .map_err(crate::Error::from) + .map_err(Self::Error::from), _ => Err(Self::process_error_from_body(chunk)), } } -- cgit v1.2.3 From 93e8c2e2ad4a3c755a707d268ede8e661c45c68d Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 22:56:45 -0500 Subject: use common form obj --- ipfs-api-prelude/src/backend.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index 6a335f8..0e5bfef 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -13,13 +13,14 @@ use crate::{ }; use async_trait::async_trait; use bytes::Bytes; +use common_multipart_rfc7578::client::multipart; use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt}; use http::{ header::{HeaderName, HeaderValue}, StatusCode, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Display, string::FromUtf8Error}; +use std::fmt::Display; use tokio_util::codec::{Decoder, FramedRead}; #[async_trait(?Send)] @@ -32,10 +33,6 @@ pub trait Backend: Default { /// type HttpResponse; - /// HTTP multipart form type. - /// - type MultipartForm: Default; - /// Error type for Result. /// type Error: Display + From + From + 'static; @@ -43,7 +40,7 @@ pub trait Backend: Default { fn build_base_request( &self, req: &Req, - form: Option, + form: Option>, ) -> Result where Req: ApiRequest; @@ -53,7 +50,7 @@ pub trait Backend: Default { async fn request_raw( &self, req: Req, - form: Option, + form: Option>, ) -> Result<(StatusCode, Bytes), Self::Error> where Req: ApiRequest + Serialize; @@ -125,7 +122,7 @@ pub trait Backend: Default { async fn request( &self, req: Req, - form: Option, + form: Option>, ) -> Result where Req: ApiRequest + Serialize, @@ -142,7 +139,7 @@ pub trait Backend: Default { async fn request_empty( &self, req: Req, - form: Option, + form: Option>, ) -> Result<(), Self::Error> where Req: ApiRequest + Serialize, @@ -161,7 +158,7 @@ pub trait Backend: Default { async fn request_string( &self, req: Req, - form: Option, + form: Option>, ) -> Result where Req: ApiRequest + Serialize, -- cgit v1.2.3 From b75c0ac5a35f87a4222aeddc11d710aa01f859e0 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 23:23:38 -0500 Subject: add some documentation --- ipfs-api-prelude/src/backend.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index 0e5bfef..433cf4a 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -173,6 +173,9 @@ pub trait Backend: Default { } } + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// fn request_stream_bytes( &self, req: Self::HttpRequest, -- cgit v1.2.3 From 306f307222358047ace385bb4126d9571858497f Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Tue, 23 Feb 2021 23:24:47 -0500 Subject: add some documentation --- ipfs-api-prelude/src/backend.rs | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index 433cf4a..f2505ca 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -37,6 +37,8 @@ pub trait Backend: Default { /// type Error: Display + From + From + 'static; + /// Builds the url for an api call. + /// fn build_base_request( &self, req: &Req, @@ -45,8 +47,12 @@ pub trait Backend: Default { where Req: ApiRequest; + /// Get the value of a header from an HTTP response. + /// fn get_header<'a>(res: &'a Self::HttpResponse, key: HeaderName) -> Option<&'a HeaderValue>; + /// Generates a request, and returns the unprocessed response future. + /// async fn request_raw( &self, req: Req, @@ -59,6 +65,9 @@ pub trait Backend: Default { res: Self::HttpResponse, ) -> Box> + Unpin>; + /// Generic method for making a request that expects back a streaming + /// response. + /// fn request_stream( &self, req: Self::HttpRequest, -- cgit v1.2.3 From 60746a30c80f6b55263ef366cf8222d7c58c0c28 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 4 Apr 2021 18:50:23 -0400 Subject: remove Default constraint --- ipfs-api-prelude/src/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index f2505ca..f70854a 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -24,7 +24,7 @@ use std::fmt::Display; use tokio_util::codec::{Decoder, FramedRead}; #[async_trait(?Send)] -pub trait Backend: Default { +pub trait Backend { /// HTTP request type. /// type HttpRequest; -- cgit v1.2.3 From 554debce4fcb833cce3c8d0657846d089062d6f8 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 4 Apr 2021 19:50:13 -0400 Subject: clippy --- ipfs-api-prelude/src/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ipfs-api-prelude/src/backend.rs') diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs index f70854a..6ab1b90 100644 --- a/ipfs-api-prelude/src/backend.rs +++ b/ipfs-api-prelude/src/backend.rs @@ -49,7 +49,7 @@ pub trait Backend { /// Get the value of a header from an HTTP response. /// - fn get_header<'a>(res: &'a Self::HttpResponse, key: HeaderName) -> Option<&'a HeaderValue>; + fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue>; /// Generates a request, and returns the unprocessed response future. /// -- cgit v1.2.3