summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2021-02-22 20:42:28 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2021-02-22 20:42:28 -0500
commit89519b4c5ce4839a362ab9ec9c6359f3c6aa844b (patch)
treeff74ea88dc1b32585728543dd6fc2f49c8d25d79
parentcc0131c9c6bea175853c995768fc817b5b066be7 (diff)
add actix backend
-rw-r--r--ipfs-api-backend-actix/Cargo.toml26
-rw-r--r--ipfs-api-backend-actix/src/backend.rs140
-rw-r--r--ipfs-api-backend-actix/src/error.rs46
-rw-r--r--ipfs-api-backend-actix/src/lib.rs14
4 files changed, 226 insertions, 0 deletions
diff --git a/ipfs-api-backend-actix/Cargo.toml b/ipfs-api-backend-actix/Cargo.toml
new file mode 100644
index 0000000..3a616a6
--- /dev/null
+++ b/ipfs-api-backend-actix/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "ipfs-api-backend-actix"
+description = "Actix implementation of IPFS HTTP API"
+authors = ["Ferris Tseng <ferristseng@fastmail.fm>"]
+edition = "2018"
+documentation = "https://docs.rs/ipfs-api"
+repository = "https://github.com/ferristseng/rust-ipfs-api"
+keywords = ["ipfs"]
+categories = ["filesystem", "web-programming"]
+version = "0.1.0"
+readme = "../README.md"
+license = "MIT OR Apache-2.0"
+
+[dependencies]
+actix-http = "2.2"
+actix-multipart-rfc7578 = "0.4"
+awc = "2.0"
+async-trait = "0.1"
+bytes = "1.0"
+futures = "0.3"
+http = "0.2"
+ipfs-api-prelude = { version = "0.1.0", path = "../ipfs-api-prelude" }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+serde_urlencoded = "0.7"
+thiserror = "1.0"
diff --git a/ipfs-api-backend-actix/src/backend.rs b/ipfs-api-backend-actix/src/backend.rs
new file mode 100644
index 0000000..6aa8f50
--- /dev/null
+++ b/ipfs-api-backend-actix/src/backend.rs
@@ -0,0 +1,140 @@
+use crate::error::Error;
+use async_trait::async_trait;
+use awc::Client;
+use bytes::Bytes;
+use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
+use http::{
+ header::{HeaderName, HeaderValue},
+ uri::Scheme,
+ StatusCode, Uri,
+};
+use ipfs_api_prelude::{ApiRequest, Backend, TryFromUri};
+use serde::Serialize;
+use std::time::Duration;
+
+const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
+
+pub struct ActixBackend {
+ base: Uri,
+ client: Client,
+}
+
+impl Default for ActixBackend {
+ fn default() -> Self {
+ Self::from_ipfs_config()
+ .unwrap_or_else(|| Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap())
+ }
+}
+
+impl TryFromUri for ActixBackend {
+ fn build_with_base_uri(base: Uri) -> Self {
+ let client = Client::default();
+
+ ActixBackend { base, client }
+ }
+}
+
+#[async_trait(?Send)]
+impl Backend for ActixBackend {
+ type HttpRequest = awc::SendClientRequest;
+
+ type HttpResponse = awc::ClientResponse<
+ actix_http::encoding::Decoder<actix_http::Payload<actix_http::PayloadStream>>,
+ >;
+
+ type MultipartForm = multipart::client::multipart::Form<'static>;
+
+ type Error = Error;
+
+ fn build_base_request<Req>(
+ &self,
+ req: &Req,
+ form: Option<Self::MultipartForm>,
+ ) -> Result<Self::HttpRequest, Error>
+ where
+ Req: ApiRequest,
+ {
+ req.absolute_url(&self.base).and_then(|url| {
+ let req = if let Some(form) = form {
+ self.client
+ .post(url)
+ .timeout(ACTIX_REQUEST_TIMEOUT)
+ .content_type(form.content_type())
+ .send_body(multipart::client::multipart::Body::from(form))
+ } else {
+ self.client.post(url).timeout(ACTIX_REQUEST_TIMEOUT).send()
+ };
+
+ Ok(req)
+ })
+ }
+
+ fn get_header<'a>(res: &'a Self::HttpResponse, key: HeaderName) -> Option<&'a HeaderValue> {
+ res.headers().get(key)
+ }
+
+ async fn request_raw<Req>(
+ &self,
+ req: Req,
+ form: Option<Self::MultipartForm>,
+ ) -> Result<(StatusCode, Bytes), Self::Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let req = self.build_base_request(&req, form)?;
+ let mut res = req.await?;
+ let status = res.status();
+ let body = res.body().await?;
+
+ // FIXME: Actix compat with bytes 1.0
+ Ok((status, Bytes::copy_from_slice(body.as_ref())))
+ }
+
+ fn response_to_byte_stream(
+ res: Self::HttpResponse,
+ ) -> Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Unpin> {
+ let stream = res
+ .map_ok(|bytes| Bytes::copy_from_slice(bytes.as_ref()))
+ .err_into();
+
+ Box::new(stream)
+ }
+
+ fn request_stream<Res, F, OutStream>(
+ &self,
+ req: Self::HttpRequest,
+ process: F,
+ ) -> Box<dyn Stream<Item = Result<Res, Self::Error>> + Unpin>
+ where
+ OutStream: Stream<Item = Result<Res, Self::Error>> + Unpin,
+ F: 'static + Fn(Self::HttpResponse) -> OutStream,
+ {
+ let stream = req
+ .err_into()
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => res
+ .body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => {
+ // FIXME: Actix compat with bytes 1.0
+ let body = Bytes::copy_from_slice(body.as_ref());
+
+ Err(Self::process_error_from_body(body))
+ }
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream();
+
+ Box::new(stream)
+ }
+}
diff --git a/ipfs-api-backend-actix/src/error.rs b/ipfs-api-backend-actix/src/error.rs
new file mode 100644
index 0000000..4cd9a25
--- /dev/null
+++ b/ipfs-api-backend-actix/src/error.rs
@@ -0,0 +1,46 @@
+// Copyright 2017 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+use std::string::FromUtf8Error;
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error("api returned error `{0}`")]
+ Api(ipfs_api_prelude::ApiError),
+
+ #[error("actix client payload error `{0}`")]
+ ClientPayload(#[from] awc::error::PayloadError),
+
+ #[error("actix client send request error `{0}`")]
+ ClientSend(#[from] awc::error::SendRequestError),
+
+ #[error("http error `{0}`")]
+ Http(#[from] http::Error),
+
+ #[error("json parse error `{0}`")]
+ Parse(#[from] serde_json::Error),
+
+ #[error("utf8 decoding error `{0}`")]
+ ParseUtf8(#[from] FromUtf8Error),
+
+ #[error("uri error `{0}`")]
+ Url(#[from] http::uri::InvalidUri),
+
+ #[error("url encoding error `{0}`")]
+ EncodeUrl(#[from] serde_urlencoded::ser::Error),
+
+ #[error("ipfs client error `{0}`")]
+ IpfsClientError(#[from] ipfs_api_prelude::Error),
+}
+
+impl From<ipfs_api_prelude::ApiError> for Error {
+ fn from(err: ipfs_api_prelude::ApiError) -> Self {
+ Error::Api(err)
+ }
+}
diff --git a/ipfs-api-backend-actix/src/lib.rs b/ipfs-api-backend-actix/src/lib.rs
new file mode 100644
index 0000000..414aff8
--- /dev/null
+++ b/ipfs-api-backend-actix/src/lib.rs
@@ -0,0 +1,14 @@
+// Copyright 2019 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+extern crate actix_multipart_rfc7578 as multipart;
+
+mod backend;
+mod error;
+
+pub use crate::{backend::ActixBackend as IpfsApi, error::Error};