From 1a0b5a9b7710f4d17d975c46e3e44bb9cb4c0837 Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Sat, 9 Jan 2021 15:46:47 +0000 Subject: Allow `Image::pull` to handle chunks with multiple JSON values (#240) * add fn for stream post that returns json values use when pulling docker images, to fix bug where shiplift would error if multiple JSON values were returned in a single HTTP chunk * fix unnecessary lazy evaluation * add comments to stream post requests --- src/lib.rs | 57 +++++++++++++++++++++++++++++++------------------------- src/transport.rs | 2 +- 2 files changed, 33 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index 7d62523..f44919a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -164,25 +164,12 @@ impl<'a> Images<'a> { tarball::dir(&mut bytes, &opts.path[..])?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( path.join("?"), Some((Body::from(bytes), tar())), None::>, ); - let value_stream = chunk_stream - .and_then(|chunk| async move { - let stream = futures_util::stream::iter( - serde_json::Deserializer::from_slice(&chunk) - .into_iter() - .collect::>(), - ) - .map_err(Error::from); - - Ok(stream) - }) - .try_flatten(); - Ok(value_stream) } .try_flatten_stream(), @@ -237,11 +224,7 @@ impl<'a> Images<'a> { Box::pin( self.docker - .stream_post(path.join("?"), None, headers) - .and_then(move |chunk| { - // todo: give this a proper enum type - futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) - }), + .stream_post_into_values(path.join("?"), None, headers), ) } @@ -272,16 +255,11 @@ impl<'a> Images<'a> { tarball.read_to_end(&mut bytes)?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( "/images/load", Some((Body::from(bytes), tar())), None::>, ); - - let value_stream = chunk_stream.and_then(|chunk| async move { - serde_json::from_slice(&chunk).map_err(Error::from) - }); - Ok(value_stream) } .try_flatten_stream(), @@ -1165,6 +1143,9 @@ impl Docker { Ok(serde_json::from_str::(&string)?) } + /// Send a streaming post request. + /// + /// Use stream_post_into_values if the endpoint returns JSON values fn stream_post<'a, H>( &'a self, endpoint: impl AsRef + 'a, @@ -1178,6 +1159,32 @@ impl Docker { .stream_chunks(Method::POST, endpoint, body, headers) } + /// Send a streaming post request that returns a stream of JSON values + /// + /// Assumes that each received chunk contains one or more JSON values + fn stream_post_into_values<'a, H>( + &'a self, + endpoint: impl AsRef + 'a, + body: Option<(Body, Mime)>, + headers: Option, + ) -> impl Stream> + 'a + where + H: IntoIterator + 'a, + { + self.stream_post(endpoint, body, headers) + .and_then(|chunk| async move { + let stream = futures_util::stream::iter( + serde_json::Deserializer::from_slice(&chunk) + .into_iter() + .collect::>(), + ) + .map_err(Error::from); + + Ok(stream) + }) + .try_flatten() + } + fn stream_get<'a>( &'a self, endpoint: impl AsRef + Unpin + 'a, diff --git a/src/transport.rs b/src/transport.rs index 5a48f62..3ad3b9e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -122,7 +122,7 @@ impl Transport { message: Self::get_error_message(&message_body).unwrap_or_else(|| { status .canonical_reason() - .unwrap_or_else(|| "unknown error code") + .unwrap_or("unknown error code") .to_owned() }), }) -- cgit v1.2.3