diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 57 | ||||
-rw-r--r-- | src/transport.rs | 2 |
2 files changed, 33 insertions, 26 deletions
@@ -164,25 +164,12 @@ impl<'a> Images<'a> { tarball::dir(&mut bytes, &opts.path[..])?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( path.join("?"), Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, ); - let value_stream = chunk_stream - .and_then(|chunk| async move { - let stream = futures_util::stream::iter( - serde_json::Deserializer::from_slice(&chunk) - .into_iter() - .collect::<Vec<_>>(), - ) - .map_err(Error::from); - - Ok(stream) - }) - .try_flatten(); - Ok(value_stream) } .try_flatten_stream(), @@ -237,11 +224,7 @@ impl<'a> Images<'a> { Box::pin( self.docker - .stream_post(path.join("?"), None, headers) - .and_then(move |chunk| { - // todo: give this a proper enum type - futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) - }), + .stream_post_into_values(path.join("?"), None, headers), ) } @@ -272,16 +255,11 @@ impl<'a> Images<'a> { tarball.read_to_end(&mut bytes)?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( "/images/load", Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, ); - - let value_stream = chunk_stream.and_then(|chunk| async move { - serde_json::from_slice(&chunk).map_err(Error::from) - }); - Ok(value_stream) } .try_flatten_stream(), @@ -1165,6 +1143,9 @@ impl Docker { Ok(serde_json::from_str::<T>(&string)?) } + /// Send a streaming post request. + /// + /// Use stream_post_into_values if the endpoint returns JSON values fn stream_post<'a, H>( &'a self, endpoint: impl AsRef<str> + 'a, @@ -1178,6 +1159,32 @@ impl Docker { .stream_chunks(Method::POST, endpoint, body, headers) } + /// Send a streaming post request that returns a stream of JSON values + /// + /// Assumes that each received chunk contains one or more JSON values + fn stream_post_into_values<'a, H>( + &'a self, + endpoint: impl AsRef<str> + 'a, + body: Option<(Body, Mime)>, + headers: Option<H>, + ) -> impl Stream<Item = Result<Value>> + 'a + where + H: IntoIterator<Item = (&'static str, String)> + 'a, + { + self.stream_post(endpoint, body, headers) + .and_then(|chunk| async move { + let stream = futures_util::stream::iter( + serde_json::Deserializer::from_slice(&chunk) + .into_iter() + .collect::<Vec<_>>(), + ) + .map_err(Error::from); + + Ok(stream) + }) + .try_flatten() + } + fn stream_get<'a>( &'a self, endpoint: impl AsRef<str> + Unpin + 'a, diff --git a/src/transport.rs b/src/transport.rs index 5a48f62..3ad3b9e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -122,7 +122,7 @@ impl Transport { message: Self::get_error_message(&message_body).unwrap_or_else(|| { status .canonical_reason() - .unwrap_or_else(|| "unknown error code") + .unwrap_or("unknown error code") .to_owned() }), }) |