From 79d65c286025c551a775c0964d168e6feb4b3409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20B=C3=BCsch?= Date: Wed, 14 Nov 2018 20:36:14 +1100 Subject: Async api (#128) * Refactored Transport for better async use Still a bit rough, but it now builds a big future using combinators. It still does one `Runtime::block_on()` to keep the existing API, but this is a first up before making the whole API async. * Migrate most APIs to be Future-based I still need to finish a few of the more tricky ones that I've commented out for now, but most of it compiles and some examples work. In particular, `Docker::stats()` now properly returns an async stream of stats. * Fix events and containerinspect examples * Fix imageinspect, images, info and top examples * Fix containercreate, imagedelete and imagepull examples * Fix more examples * Add back debug statement in Transport::request * De-glob imports in examples * Remove unused imports in examples * Fix NetworkCreateOptions serialization * Add back error message extraction in Transport * Fix Container::create serialization of options * Add containerdelete example * Simplify result * Fix some error handling to remove unwrap() * Fix Image::export() * Fix imagebuild example * Add adapter from Stream of Chunks to AsyncRead Having an `AsyncRead` is required to be able to use the `FramedRead` and `Decoder` stuff from tokio_codec. This code is "borrowed" from https:/github.com/ferristseng/rust-ipfs-api though should probably be moved to its own crate or to tokio_codec. * Fix Container::logs() It now properly demuxes stdout/stderr, and returns a `Stream`. * Fix Container::export() * Use LineCodec for streaming JSON Although in my limited testing it seemed to work fine, there is no guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON structure seems to be serialized on one line, so use LineCodec to turn the body into a stream of lines, then deserialize over this. * Fix serialization of ExecContainerOptions * Fix Container::exec() (kind of...) * Simplify deserialisation in Image::delete() * Small clean-ups * More clean ups * Fix rustdoc + remove extraneous "extern crate" * Fix doc example * Fix formatting --- src/lib.rs | 434 ++++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 239 insertions(+), 195 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index 6823ea3..19f0cff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,19 +4,27 @@ //! //! ```no_run //! extern crate shiplift; +//! extern crate tokio; +//! +//! use tokio::prelude::Future; //! //! let docker = shiplift::Docker::new(); -//! let images = docker.images().list(&Default::default()).unwrap(); -//! println!("docker images in stock"); -//! for i in images { -//! println!("{:?}", i.repo_tags); -//! } +//! let fut = docker.images().list(&Default::default()).map(|images| { +//! println!("docker images in stock"); +//! for i in images { +//! println!("{:?}", i.repo_tags); +//! } +//! }).map_err(|e| eprintln!("Something bad happened! {}", e)); +//! +//! tokio::run(fut); //! ``` #[macro_use] extern crate log; extern crate byteorder; +extern crate bytes; extern crate flate2; +extern crate futures; extern crate http; extern crate hyper; extern crate hyper_openssl; @@ -32,9 +40,12 @@ extern crate serde; #[macro_use] extern crate serde_json; extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; pub mod builder; pub mod errors; +pub mod read; pub mod rep; pub mod transport; pub mod tty; @@ -47,6 +58,7 @@ pub use builder::{ LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions, RmContainerOptions, }; pub use errors::Error; +use futures::{future::Either, Future, IntoFuture, Stream}; use hyper::client::HttpConnector; use hyper::Body; use hyper::{Client, Method, Uri}; @@ -55,6 +67,7 @@ use hyper_openssl::HttpsConnector; use hyperlocal::UnixConnector; use mime::Mime; use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; +use read::StreamReader; use rep::Image as ImageRep; use rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History, @@ -63,19 +76,19 @@ use rep::{ use rep::{NetworkCreateInfo, NetworkDetails as NetworkInfo}; use serde_json::Value; use std::borrow::Cow; -use std::cell::RefCell; use std::env; -use std::io::prelude::*; use std::path::Path; use std::time::Duration; +use tokio_codec::{FramedRead, LinesCodec}; use transport::{tar, Transport}; -use tty::Tty; +use tty::{TtyDecoder, TtyLine}; use url::form_urlencoded; /// Represents the result of all docker operations pub type Result = std::result::Result; /// Entrypoint interface for communicating with docker daemon +#[derive(Clone)] pub struct Docker { transport: Transport, } @@ -102,55 +115,28 @@ impl<'a, 'b> Image<'a, 'b> { } /// Inspects a named image's details - pub fn inspect(&self) -> Result { - let raw = self - .docker - .get(&format!("/images/{}/json", self.name)[..])?; - Ok(serde_json::from_str::(&raw)?) + pub fn inspect(&self) -> impl Future { + self.docker + .get_json(&format!("/images/{}/json", self.name)[..]) } /// Lists the history of the images set of changes - pub fn history(&self) -> Result> { - let raw = self - .docker - .get(&format!("/images/{}/history", self.name)[..])?; - Ok(serde_json::from_str::>(&raw)?) - } - - /// Delete's an image - pub fn delete(&self) -> Result> { - let raw = self.docker.delete(&format!("/images/{}", self.name)[..])?; - Ok(match serde_json::from_str(&raw)? { - Value::Array(ref xs) => xs.iter().map(|j| { - let obj = j.as_object().expect("expected json object"); - obj.get("Untagged") - .map(|sha| { - Status::Untagged( - sha.as_str() - .expect("expected Untagged to be a string") - .to_owned(), - ) - }) - .or_else(|| { - obj.get("Deleted").map(|sha| { - Status::Deleted( - sha.as_str() - .expect("expected Deleted to be a string") - .to_owned(), - ) - }) - }) - .expect("expected Untagged or Deleted") - }), - _ => unreachable!(), - } - .collect()) + pub fn history(&self) -> impl Future, Error = Error> { + self.docker + .get_json(&format!("/images/{}/history", self.name)[..]) + } + + /// Deletes an image + pub fn delete(&self) -> impl Future, Error = Error> { + self.docker + .delete_json::>(&format!("/images/{}", self.name)[..]) } /// Export this image to a tarball - pub fn export(&self) -> Result> { + pub fn export(&self) -> impl Stream, Error = Error> { self.docker .stream_get(&format!("/images/{}/get", self.name)[..]) + .map(|c| c.to_vec()) } } @@ -169,7 +155,7 @@ impl<'a> Images<'a> { pub fn build( &self, opts: &BuildOptions, - ) -> Result> { + ) -> impl Stream { let mut path = vec!["/build".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) @@ -177,29 +163,31 @@ impl<'a> Images<'a> { let mut bytes = vec![]; - tarball::dir(&mut bytes, &opts.path[..])?; - - self.docker - .stream_post(&path.join("?"), Some((Body::from(bytes), tar()))) - .and_then(|r| { - serde_json::Deserializer::from_reader(r) - .into_iter::() - .map(|res| res.map_err(Error::from)) - .collect() - }) + match tarball::dir(&mut bytes, &opts.path[..]) { + Ok(_) => Box::new( + self.docker + .stream_post(&path.join("?"), Some((Body::from(bytes), tar()))) + .and_then(|bytes| { + serde_json::from_slice::<'_, Value>(&bytes[..]) + .map_err(Error::from) + .into_future() + }), + ) as Box + Send>, + Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) + as Box + Send>, + } } /// Lists the docker images on the current docker host pub fn list( &self, opts: &ImageListOptions, - ) -> Result> { + ) -> impl Future, Error = Error> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::>(&raw)?) + self.docker.get_json::>(&path.join("?")) } /// Returns a reference to a set of operations available for a named image @@ -214,31 +202,26 @@ impl<'a> Images<'a> { pub fn search( &self, term: &str, - ) -> Result> { + ) -> impl Future, Error = Error> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); - let raw = self.docker.get(&format!("/images/search?{}", query)[..])?; - Ok(serde_json::from_str::>(&raw)?) + self.docker + .get_json::>(&format!("/images/search?{}", query)[..]) } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, - ) -> Result> { + ) -> impl Stream { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } self.docker .stream_post::(&path.join("?"), None) - .and_then(|r| { - serde_json::Deserializer::from_reader(r) - .into_iter::() - .map(|res| res.map_err(Error::from)) - .collect() - }) + .and_then(|r| serde_json::from_slice::(&r[..]).map_err(Error::from)) } /// exports a collection of named images, @@ -246,13 +229,14 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> Result> { + ) -> impl Stream, Error = Error> { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker .stream_get(&format!("/images/get?{}", query)[..]) + .map(|c| c.to_vec()) } // pub fn import(self, tarball: Box) -> Result<()> { @@ -287,18 +271,16 @@ impl<'a, 'b> Container<'a, 'b> { } /// Inspects the current docker container instance's details - pub fn inspect(&self) -> Result { - let raw = self - .docker - .get(&format!("/containers/{}/json", self.id)[..])?; - Ok(serde_json::from_str::(&raw)?) + pub fn inspect(&self) -> impl Future { + self.docker + .get_json::(&format!("/containers/{}/json", self.id)[..]) } /// Returns a `top` view of information about the container process pub fn top( &self, psargs: Option<&str>, - ) -> Result { + ) -> impl Future { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -306,46 +288,57 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - let raw = self.docker.get(&path.join("?"))?; - - Ok(serde_json::from_str::(&raw)?) + self.docker.get_json(&path.join("?")) } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, - ) -> Result> { + ) -> impl Stream { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.stream_get(&path.join("?")) + + let decoder = TtyDecoder::new(); + let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?"))); + + FramedRead::new(chunk_stream, decoder) } /// Returns a set of changes made to the container instance - pub fn changes(&self) -> Result> { - let raw = self - .docker - .get(&format!("/containers/{}/changes", self.id)[..])?; - Ok(serde_json::from_str::>(&raw)?) + pub fn changes(&self) -> impl Future, Error = Error> { + self.docker + .get_json::>(&format!("/containers/{}/changes", self.id)[..]) } /// Exports the current docker container into a tarball - pub fn export(&self) -> Result> { + pub fn export(&self) -> impl Stream, Error = Error> { self.docker .stream_get(&format!("/containers/{}/export", self.id)[..]) + .map(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&self) -> Result> { - self.docker - .stream_get(&format!("/containers/{}/stats", self.id)[..]) - .and_then(|r| serde_json::from_reader::<_, Vec>(r).map_err(Error::from)) + pub fn stats(&self) -> impl Stream { + let decoder = LinesCodec::new(); + let stream_of_chunks = StreamReader::new( + self.docker + .stream_get(&format!("/containers/{}/stats", self.id)[..]), + ); + + FramedRead::new(stream_of_chunks, decoder) + .map_err(Error::IO) + .and_then(|s| { + serde_json::from_str::(&s) + .map_err(Error::SerdeJsonError) + .into_future() + }) } /// Start the container instance - pub fn start(&self) -> Result<()> { + pub fn start(&self) -> impl Future { self.docker .post::(&format!("/containers/{}/start", self.id)[..], None) .map(|_| ()) @@ -355,7 +348,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn stop( &self, wait: Option, - ) -> Result<()> { + ) -> impl Future { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -371,7 +364,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn restart( &self, wait: Option, - ) -> Result<()> { + ) -> impl Future { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -386,7 +379,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn kill( &self, signal: Option<&str>, - ) -> Result<()> { + ) -> impl Future { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -401,7 +394,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn rename( &self, name: &str, - ) -> Result<()> { + ) -> impl Future { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); @@ -414,31 +407,29 @@ impl<'a, 'b> Container<'a, 'b> { } /// Pause the container instance - pub fn pause(&self) -> Result<()> { + pub fn pause(&self) -> impl Future { self.docker .post::(&format!("/containers/{}/pause", self.id)[..], None) .map(|_| ()) } /// Unpause the container instance - pub fn unpause(&self) -> Result<()> { + pub fn unpause(&self) -> impl Future { self.docker .post::(&format!("/containers/{}/unpause", self.id)[..], None) .map(|_| ()) } /// Wait until the container stops - pub fn wait(&self) -> Result { - let raw = self - .docker - .post::(&format!("/containers/{}/wait", self.id)[..], None)?; - Ok(serde_json::from_str::(&raw)?) + pub fn wait(&self) -> impl Future { + self.docker + .post_json::(&format!("/containers/{}/wait", self.id)[..], None) } /// Delete the container instance /// /// Use remove instead to use the force/v options. - pub fn delete(&self) -> Result<()> { + pub fn delete(&self) -> impl Future { self.docker .delete(&format!("/containers/{}", self.id)[..]) .map(|_| ()) @@ -448,51 +439,48 @@ impl<'a, 'b> Container<'a, 'b> { pub fn remove( &self, opts: RmContainerOptions, - ) -> Result<()> { + ) -> impl Future { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.delete(&path.join("?"))?; - Ok(()) + self.docker.delete(&path.join("?")).map(|_| ()) } + // TODO(abusch) fix this /// Exec the specified command in the container pub fn exec( &self, opts: &ExecContainerOptions, - ) -> Result { - let data = opts.serialize()?; + ) -> impl Stream { + let data = opts.serialize().unwrap(); // TODO fixme let bytes = data.into_bytes(); - match self.docker.post( - &format!("/containers/{}/exec", self.id)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) { - Err(e) => Err(e), - Ok(res) => { + let docker2 = self.docker.clone(); + self.docker + .post( + &format!("/containers/{}/exec", self.id)[..], + Some((bytes, mime::APPLICATION_JSON)), + ) + .map(move |res| { let data = "{}"; - let mut bytes = data.as_bytes(); - let json: Value = serde_json::from_str(res.as_str())?; - - if let Value::Object(ref obj) = json { - self.docker - .stream_post( - &format!( - "/exec/{}/start", - obj - .get("Id") - .unwrap() - .as_str() - .unwrap() - )[..], - Some((bytes, mime::APPLICATION_JSON)), - ).map(Tty::new) - } else { - // TODO - panic!() - } - } - } + let bytes = data.as_bytes(); + let id = serde_json::from_str::(res.as_str()) + .ok() + .and_then(|v| { + v.as_object() + .and_then(|v| v.get("Id")) + .and_then(|v| v.as_str().map(|v| v.to_string())) + }) + .unwrap(); // TODO fixme + + let decoder = TtyDecoder::new(); + let chunk_stream = StreamReader::new(docker2.stream_post( + &format!("/exec/{}/start", id)[..], + Some((bytes, mime::APPLICATION_JSON)), + )); + FramedRead::new(chunk_stream, decoder) + }) + .flatten_stream() } // todo attach, attach/ws, copy, archive @@ -513,13 +501,12 @@ impl<'a> Containers<'a> { pub fn list( &self, opts: &ContainerListOptions, - ) -> Result> { + ) -> impl Future, Error = Error> { let mut path = vec!["/containers/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::>(&raw)?) + self.docker.get_json::>(&path.join("?")) } /// Returns a reference to a set of operations available to a specific container instance @@ -534,8 +521,12 @@ impl<'a> Containers<'a> { pub fn create( &self, opts: &ContainerOptions, - ) -> Result { - let data = opts.serialize()?; + ) -> impl Future { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; + let bytes = data.into_bytes(); let mut path = vec!["/containers/create".to_owned()]; @@ -547,10 +538,10 @@ impl<'a> Containers<'a> { ); } - let raw = self - .docker - .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?; - Ok(serde_json::from_str::(&raw)?) + Either::B( + self.docker + .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), + ) } } @@ -569,13 +560,12 @@ impl<'a> Networks<'a> { pub fn list( &self, opts: &NetworkListOptions, - ) -> Result> { + ) -> impl Future, Error = Error> { let mut path = vec!["/networks".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::>(&raw)?) + self.docker.get_json(&path.join("?")) } /// Returns a reference to a set of operations available to a specific network instance @@ -586,18 +576,22 @@ impl<'a> Networks<'a> { Network::new(self.docker, id) } + /// Create a new Network instance pub fn create( &self, opts: &NetworkCreateOptions, - ) -> Result { - let data = opts.serialize()?; + ) -> impl Future { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; let bytes = data.into_bytes(); let path = vec!["/networks/create".to_owned()]; - let raw = self - .docker - .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?; - Ok(serde_json::from_str::(&raw)?) + Either::B( + self.docker + .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), + ) } } @@ -628,13 +622,12 @@ impl<'a, 'b> Network<'a, 'b> { } /// Inspects the current docker network instance's details - pub fn inspect(&self) -> Result { - let raw = self.docker.get(&format!("/networks/{}", self.id)[..])?; - Ok(serde_json::from_str::(&raw)?) + pub fn inspect(&self) -> impl Future { + self.docker.get_json(&format!("/networks/{}", self.id)[..]) } /// Delete the network instance - pub fn delete(&self) -> Result<()> { + pub fn delete(&self) -> impl Future { self.docker .delete(&format!("/networks/{}", self.id)[..]) .map(|_| ()) @@ -644,7 +637,7 @@ impl<'a, 'b> Network<'a, 'b> { pub fn connect( &self, opts: &ContainerConnectionOptions, - ) -> Result<()> { + ) -> impl Future { self.do_connection("connect", opts) } @@ -652,7 +645,7 @@ impl<'a, 'b> Network<'a, 'b> { pub fn disconnect( &self, opts: &ContainerConnectionOptions, - ) -> Result<()> { + ) -> impl Future { self.do_connection("disconnect", opts) } @@ -660,16 +653,21 @@ impl<'a, 'b> Network<'a, 'b> { &self, segment: &str, opts: &ContainerConnectionOptions, - ) -> Result<()> { - let data = opts.serialize()?; + ) -> impl Future { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; let bytes = data.into_bytes(); - self.docker - .post( - &format!("/networks/{}/{}", self.id, segment)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) - .map(|_| ()) + Either::B( + self.docker + .post( + &format!("/networks/{}/{}", self.id, segment)[..], + Some((bytes, mime::APPLICATION_JSON)), + ) + .map(|_| ()), + ) } } @@ -700,7 +698,6 @@ impl Docker { Docker { transport: Transport::Unix { client: Client::builder().keep_alive(false).build(UnixConnector), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: socket_path.into(), }, } @@ -720,7 +717,6 @@ impl Docker { Some("unix") => Docker { transport: Transport::Unix { client: Client::builder().build(UnixConnector), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: host.path().to_owned(), }, }, @@ -755,7 +751,6 @@ impl Docker { transport: Transport::EncryptedTcp { client: Client::builder() .build(HttpsConnector::with_connector(http, connector).unwrap()), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), host: tcp_host_str, }, } @@ -763,7 +758,6 @@ impl Docker { Docker { transport: Transport::Tcp { client: Client::builder().build(http), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), host: tcp_host_str, }, } @@ -787,76 +781,126 @@ impl Docker { } /// Returns version information associated with the docker daemon - pub fn version(&self) -> Result { - let raw = self.get("/version")?; - Ok(serde_json::from_str::(&raw)?) + pub fn version(&self) -> impl Future { + self.get_json("/version") } /// Returns information associated with the docker daemon - pub fn info(&self) -> Result { - let raw = self.get("/info")?; - Ok(serde_json::from_str::(&raw)?) + pub fn info(&self) -> impl Future { + self.get_json("/info") } /// Returns a simple ping response indicating the docker daemon is accessible - pub fn ping(&self) -> Result { + pub fn ping(&self) -> impl Future { self.get("/_ping") } - /// Returns an interator over streamed docker events + /// Returns a stream of docker events pub fn events( &self, opts: &EventsOptions, - ) -> Result> { + ) -> impl Stream { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.stream_get(&path.join("?")[..]) - .and_then(|r| serde_json::from_reader::<_, Vec>(r).map_err(Error::from)) + let stream_of_chunks = self.stream_get(&path.join("?")[..]); + let reader = StreamReader::new(stream_of_chunks); + FramedRead::new(reader, LinesCodec::new()) + .map_err(Error::IO) + .and_then(|line| serde_json::from_str::(&line).map_err(Error::from)) } + // + // Utility functions to make requests + // + fn get( &self, endpoint: &str, - ) -> Result { + ) -> impl Future { self.transport.request::(Method::GET, endpoint, None) } + fn get_json( + &self, + endpoint: &str, + ) -> impl Future { + self.transport + .request::(Method::GET, endpoint, None) + .and_then(|v| { + serde_json::from_str::(&v) + .map_err(Error::SerdeJsonError) + .into_future() + }) + } + fn post( &self, endpoint: &str, body: Option<(B, Mime)>, - ) -> Result + ) -> impl Future where B: Into, { self.transport.request(Method::POST, endpoint, body) } + fn post_json( + &self, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Future + where + B: Into, + T: serde::de::DeserializeOwned, + { + self.transport + .request(Method::POST, endpoint, body) + .and_then(|v| { + serde_json::from_str::(&v) + .map_err(Error::SerdeJsonError) + .into_future() + }) + } + fn delete( &self, endpoint: &str, - ) -> Result { + ) -> impl Future { self.transport .request::(Method::DELETE, endpoint, None) } + fn delete_json( + &self, + endpoint: &str, + ) -> impl Future { + self.transport + .request::(Method::DELETE, endpoint, None) + .and_then(|v| { + serde_json::from_str::(&v) + .map_err(Error::SerdeJsonError) + .into_future() + }) + } + fn stream_post( &self, endpoint: &str, body: Option<(B, Mime)>, - ) -> Result> + ) -> impl Stream where B: Into, { - self.transport.stream(Method::POST, endpoint, body) + self.transport.stream_chunks(Method::POST, endpoint, body) } fn stream_get( &self, endpoint: &str, - ) -> Result> { - self.transport.stream::(Method::GET, endpoint, None) + ) -> impl Stream { + self.transport + .stream_chunks::(Method::GET, endpoint, None) } } -- cgit v1.2.3