summaryrefslogtreecommitdiffstats
path: root/ipfs-api-prelude/src/backend.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api-prelude/src/backend.rs')
-rw-r--r--ipfs-api-prelude/src/backend.rs232
1 files changed, 232 insertions, 0 deletions
diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs
new file mode 100644
index 0000000..6ab1b90
--- /dev/null
+++ b/ipfs-api-prelude/src/backend.rs
@@ -0,0 +1,232 @@
+// Copyright 2021 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+use crate::{
+ header::{TRAILER, X_STREAM_ERROR_KEY},
+ read::{JsonLineDecoder, StreamReader},
+ ApiError, ApiRequest,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use common_multipart_rfc7578::client::multipart;
+use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt};
+use http::{
+ header::{HeaderName, HeaderValue},
+ StatusCode,
+};
+use serde::{Deserialize, Serialize};
+use std::fmt::Display;
+use tokio_util::codec::{Decoder, FramedRead};
+
+#[async_trait(?Send)]
+pub trait Backend {
+ /// HTTP request type.
+ ///
+ type HttpRequest;
+
+ /// HTTP response type.
+ ///
+ type HttpResponse;
+
+ /// Error type for Result.
+ ///
+ type Error: Display + From<ApiError> + From<crate::Error> + 'static;
+
+ /// Builds the url for an api call.
+ ///
+ fn build_base_request<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<Self::HttpRequest, Self::Error>
+ where
+ Req: ApiRequest;
+
+ /// Get the value of a header from an HTTP response.
+ ///
+ fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue>;
+
+ /// Generates a request, and returns the unprocessed response future.
+ ///
+ async fn request_raw<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<(StatusCode, Bytes), Self::Error>
+ where
+ Req: ApiRequest + Serialize;
+
+ fn response_to_byte_stream(
+ res: Self::HttpResponse,
+ ) -> Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Unpin>;
+
+ /// Generic method for making a request that expects back a streaming
+ /// response.
+ ///
+ fn request_stream<Res, F, OutStream>(
+ &self,
+ req: Self::HttpRequest,
+ process: F,
+ ) -> Box<dyn Stream<Item = Result<Res, Self::Error>> + Unpin>
+ where
+ OutStream: Stream<Item = Result<Res, Self::Error>> + Unpin,
+ F: 'static + Fn(Self::HttpResponse) -> OutStream;
+
+ /// Builds an Api error from a response body.
+ ///
+ #[inline]
+ fn process_error_from_body(body: Bytes) -> Self::Error {
+ match serde_json::from_slice::<ApiError>(&body) {
+ Ok(e) => e.into(),
+ Err(_) => {
+ let err = match String::from_utf8(body.to_vec()) {
+ Ok(s) => crate::Error::UnrecognizedApiError(s),
+ Err(e) => crate::Error::from(e),
+ };
+
+ err.into()
+ }
+ }
+ }
+
+ /// Processes a response that expects a json encoded body, returning an
+ /// error or a deserialized json response.
+ ///
+ fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Self::Error>
+ where
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ match status {
+ StatusCode::OK => serde_json::from_slice(&body)
+ .map_err(crate::Error::from)
+ .map_err(Self::Error::from),
+ _ => Err(Self::process_error_from_body(body)),
+ }
+ }
+
+ /// Processes a response that returns a stream of json deserializable
+ /// results.
+ ///
+ fn process_stream_response<D, Res>(
+ res: Self::HttpResponse,
+ decoder: D,
+ ) -> FramedRead<StreamReader<Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Unpin>>, D>
+ where
+ D: Decoder<Item = Res, Error = crate::Error>,
+ {
+ FramedRead::new(
+ StreamReader::new(Self::response_to_byte_stream(res)),
+ decoder,
+ )
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// a deserializable response.
+ ///
+ async fn request<Req, Res>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<Res, Self::Error>
+ where
+ Req: ApiRequest + Serialize,
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ Self::process_json_response(status, chunk)
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a response with no body.
+ ///
+ async fn request_empty<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<(), Self::Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw String response.
+ ///
+ async fn request_string<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<String, Self::Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec())
+ .map_err(crate::Error::from)
+ .map_err(Self::Error::from),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes(
+ &self,
+ req: Self::HttpRequest,
+ ) -> Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Unpin> {
+ self.request_stream(req, |res| Self::response_to_byte_stream(res))
+ }
+
+ /// Generic method to return a streaming response of deserialized json
+ /// objects delineated by new line separators.
+ ///
+ fn request_stream_json<Res>(
+ &self,
+ req: Self::HttpRequest,
+ ) -> Box<dyn Stream<Item = Result<Res, Self::Error>> + Unpin>
+ where
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ self.request_stream(req, |res| {
+ let parse_stream_error = if let Some(trailer) = Self::get_header(&res, TRAILER) {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ if trailer == X_STREAM_ERROR_KEY {
+ true
+ } else {
+ let err = crate::Error::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return future::err(err).into_stream().err_into().left_stream();
+ }
+ } else {
+ false
+ };
+
+ Self::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
+ .err_into()
+ .right_stream()
+ })
+ }
+}