summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTom Fay <tom.fay@metaswitch.com>2021-01-09 15:46:47 +0000
committerGitHub <noreply@github.com>2021-01-09 10:46:47 -0500
commit1a0b5a9b7710f4d17d975c46e3e44bb9cb4c0837 (patch)
tree7a9707ce89239afee916377cb6184b76c2a77d02 /src
parentae0d4e8d2d4af36cc8959ef313bf1d0054b510f6 (diff)
Allow `Image::pull` to handle chunks with multiple JSON values (#240)
* add fn for stream post that returns json values use when pulling docker images, to fix bug where shiplift would error if multiple JSON values were returned in a single HTTP chunk * fix unnecessary lazy evaluation * add comments to stream post requests
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs57
-rw-r--r--src/transport.rs2
2 files changed, 33 insertions, 26 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 7d62523..f44919a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -164,25 +164,12 @@ impl<'a> Images<'a> {
tarball::dir(&mut bytes, &opts.path[..])?;
- let chunk_stream = self.docker.stream_post(
+ let value_stream = self.docker.stream_post_into_values(
path.join("?"),
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);
- let value_stream = chunk_stream
- .and_then(|chunk| async move {
- let stream = futures_util::stream::iter(
- serde_json::Deserializer::from_slice(&chunk)
- .into_iter()
- .collect::<Vec<_>>(),
- )
- .map_err(Error::from);
-
- Ok(stream)
- })
- .try_flatten();
-
Ok(value_stream)
}
.try_flatten_stream(),
@@ -237,11 +224,7 @@ impl<'a> Images<'a> {
Box::pin(
self.docker
- .stream_post(path.join("?"), None, headers)
- .and_then(move |chunk| {
- // todo: give this a proper enum type
- futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from))
- }),
+ .stream_post_into_values(path.join("?"), None, headers),
)
}
@@ -272,16 +255,11 @@ impl<'a> Images<'a> {
tarball.read_to_end(&mut bytes)?;
- let chunk_stream = self.docker.stream_post(
+ let value_stream = self.docker.stream_post_into_values(
"/images/load",
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);
-
- let value_stream = chunk_stream.and_then(|chunk| async move {
- serde_json::from_slice(&chunk).map_err(Error::from)
- });
-
Ok(value_stream)
}
.try_flatten_stream(),
@@ -1165,6 +1143,9 @@ impl Docker {
Ok(serde_json::from_str::<T>(&string)?)
}
+ /// Send a streaming post request.
+ ///
+ /// Use stream_post_into_values if the endpoint returns JSON values
fn stream_post<'a, H>(
&'a self,
endpoint: impl AsRef<str> + 'a,
@@ -1178,6 +1159,32 @@ impl Docker {
.stream_chunks(Method::POST, endpoint, body, headers)
}
+ /// Send a streaming post request that returns a stream of JSON values
+ ///
+ /// Assumes that each received chunk contains one or more JSON values
+ fn stream_post_into_values<'a, H>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<Value>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ self.stream_post(endpoint, body, headers)
+ .and_then(|chunk| async move {
+ let stream = futures_util::stream::iter(
+ serde_json::Deserializer::from_slice(&chunk)
+ .into_iter()
+ .collect::<Vec<_>>(),
+ )
+ .map_err(Error::from);
+
+ Ok(stream)
+ })
+ .try_flatten()
+ }
+
fn stream_get<'a>(
&'a self,
endpoint: impl AsRef<str> + Unpin + 'a,
diff --git a/src/transport.rs b/src/transport.rs
index 5a48f62..3ad3b9e 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -122,7 +122,7 @@ impl Transport {
message: Self::get_error_message(&message_body).unwrap_or_else(|| {
status
.canonical_reason()
- .unwrap_or_else(|| "unknown error code")
+ .unwrap_or("unknown error code")
.to_owned()
}),
})