// Copyright 2017 rust-ipfs-api Developers
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
use futures::Stream;
use futures::future::{Future, IntoFuture};
use header::Trailer;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
use hyper::{self, Chunk, Request, Response, Uri, Method, StatusCode};
use hyper::client::{Client, Config, HttpConnector};
use hyper_multipart::client::multipart;
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
use tokio_core::reactor::Handle;
use tokio_io::codec::{Decoder, FramedRead};
/// A future response returned by the reqwest HTTP client.
///
type AsyncResponse<T> = Box<Future<Item = T, Error = Error>>;
/// A future that returns a stream of responses.
///
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
/// Asynchronous Ipfs client.
///
pub struct IpfsClient {
base: Uri,
client: Client<HttpConnector, multipart::Body>,
}
impl IpfsClient {
/// Creates a new `IpfsClient`.
///
#[inline]
pub fn new(
handle: &Handle,
host: &str,
port: u16,
) -> Result<IpfsClient, hyper::error::UriError> {
let base_path = IpfsClient::build_base_path(host, port)?;
Ok(IpfsClient {
base: base_path,
client: Config::default()
.body::<multipart::Body>()
.keep_alive(true)
.build(handle),
})
}
/// Creates an `IpfsClient` connected to `localhost:5001`.
///
pub fn default(handle: &Handle) -> IpfsClient {
IpfsClient::new(handle, "localhost", 5001).unwrap()
}
/// Builds the base url path for the Ipfs api.
///
fn build_base_path(host: &str, port: u16) -> Result<Uri, hyper::error::UriError> {
format!("http://{}:{}/api/v0", host, port).parse()
}
/// Builds the url for an api call.
///
fn build_base_request<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> Result<Request<multipart::Body>, Error>
where
Req: ApiRequest + Serialize,
{
let url = format!(
"{}{}?{}",
self.base,
Req::path(),
::serde_urlencoded::to_string(req)?
);
url.parse::<Uri>()
.map(move |url| {
let mut req = Request::new(Method::Get, url);
if let Some(form) = form {
form.set_body(&mut req);
}
req
})
.map_err(From::from)
}
/// Builds an Api error from a response body.
///
#[inline]
fn build_error_from_body(chunk: Chunk) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => ErrorKind::Api(e).into(),
Err(_) => {
match String::from_utf8(chunk.to_vec()) {
Ok(s) => ErrorKind::Uncategorized(s).into(),
Err(e) => e.into(),
}
}
}
}
/// Processes a response that expects a json encoded body, returning an
/// error or a deserialized json response.
///
fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
/// Processes a response that returns a stream of json deserializable
/// results.
///
fn process_stream_response<D, Res>(
res: Response,
decoder: D,
) -> Box<Stream<Item = Res, Error = Error>>
where
D: 'static + Decoder<Item = Res, Error = Error>,