diff options
author | Tom Fay <tom.fay@metaswitch.com> | 2021-01-09 15:46:47 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-09 10:46:47 -0500 |
commit | 1a0b5a9b7710f4d17d975c46e3e44bb9cb4c0837 (patch) | |
tree | 7a9707ce89239afee916377cb6184b76c2a77d02 /src | |
parent | ae0d4e8d2d4af36cc8959ef313bf1d0054b510f6 (diff) |
Allow `Image::pull` to handle chunks with multiple JSON values (#240)
* add fn for stream post that returns json values
use when pulling docker images, to fix bug where
shiplift would error if multiple JSON values were
returned in a single HTTP chunk
* fix unnecessary lazy evaluation
* add comments to stream post requests
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 57 | ||||
-rw-r--r-- | src/transport.rs | 2 |
2 files changed, 33 insertions, 26 deletions
@@ -164,25 +164,12 @@ impl<'a> Images<'a> { tarball::dir(&mut bytes, &opts.path[..])?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( path.join("?"), Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, ); - let value_stream = chunk_stream - .and_then(|chunk| async move { - let stream = futures_util::stream::iter( - serde_json::Deserializer::from_slice(&chunk) - .into_iter() - .collect::<Vec<_>>(), - ) - .map_err(Error::from); - - Ok(stream) - }) - .try_flatten(); - Ok(value_stream) } .try_flatten_stream(), @@ -237,11 +224,7 @@ impl<'a> Images<'a> { Box::pin( self.docker - .stream_post(path.join("?"), None, headers) - .and_then(move |chunk| { - // todo: give this a proper enum type - futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) - }), + .stream_post_into_values(path.join("?"), None, headers), ) } @@ -272,16 +255,11 @@ impl<'a> Images<'a> { tarball.read_to_end(&mut bytes)?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( "/images/load", Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, ); - - let value_stream = chunk_stream.and_then(|chunk| async move { - serde_json::from_slice(&chunk).map_err(Error::from) - }); - Ok(value_stream) } .try_flatten_stream(), @@ -1165,6 +1143,9 @@ impl Docker { Ok(serde_json::from_str::<T>(&string)?) } + /// Send a streaming post request. + /// + /// Use stream_post_into_values if the endpoint returns JSON values fn stream_post<'a, H>( &'a self, endpoint: impl AsRef<str> + 'a, @@ -1178,6 +1159,32 @@ impl Docker { .stream_chunks(Method::POST, endpoint, body, headers) } + /// Send a streaming post request that returns a stream of JSON values + /// + /// Assumes that each received chunk contains one or more JSON values + fn stream_post_into_values<'a, H>( + &'a self, + endpoint: impl AsRef<str> + 'a, + body: Option<(Body, Mime)>, + headers: Option<H>, + ) -> impl Stream<Item = Result<Value>> + 'a + where + H: IntoIterator<Item = (&'static str, String)> + 'a, + { + self.stream_post(endpoint, body, headers) + .and_then(|chunk| async move { + let stream = futures_util::stream::iter( + serde_json::Deserializer::from_slice(&chunk) + .into_iter() + .collect::<Vec<_>>(), + ) + .map_err(Error::from); + + Ok(stream) + }) + .try_flatten() + } + fn stream_get<'a>( &'a self, endpoint: impl AsRef<str> + Unpin + 'a, diff --git a/src/transport.rs b/src/transport.rs index 5a48f62..3ad3b9e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -122,7 +122,7 @@ impl Transport { message: Self::get_error_message(&message_body).unwrap_or_else(|| { status .canonical_reason() - .unwrap_or_else(|| "unknown error code") + .unwrap_or("unknown error code") .to_owned() }), }) |