diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-07-10 13:06:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-10 13:06:51 +0200 |
commit | 7aa868fc3dc13445bdf3452146d3f7a7799d2da2 (patch) | |
tree | 7fc8856f173ab074c58578c6a367df720e86c60f | |
parent | 874400fa2ce416bc1d1ff82f51d47652128e89d4 (diff) | |
parent | 4514c2566b8f328b5a883e408cf09510a0a97efe (diff) |
Merge pull request #306 from matthiasbeyer/image-pull-chunked
Image pull chunked
-rw-r--r-- | src/docker.rs | 8 | ||||
-rw-r--r-- | src/image.rs | 57 |
2 files changed, 51 insertions, 14 deletions
diff --git a/src/docker.rs b/src/docker.rs index edb97e4..1b7f603 100644 --- a/src/docker.rs +++ b/src/docker.rs @@ -7,8 +7,7 @@ use std::{collections::HashMap, env, io, path::Path}; use futures_util::{stream::Stream, TryStreamExt}; use hyper::{client::HttpConnector, Body, Client, Method}; use mime::Mime; -use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde::{de, Deserialize, Serialize}; use url::form_urlencoded; use crate::{ @@ -355,14 +354,15 @@ impl Docker { /// Send a streaming post request that returns a stream of JSON values /// /// Assumes that each received chunk contains one or more JSON values - pub(crate) fn stream_post_into_values<'a, H>( + pub(crate) fn stream_post_into<'a, H, T>( &'a self, endpoint: impl AsRef<str> + 'a, body: Option<(Body, Mime)>, headers: Option<H>, - ) -> impl Stream<Item = Result<Value>> + 'a + ) -> impl Stream<Item = Result<T>> + 'a where H: IntoIterator<Item = (&'static str, String)> + 'a, + T: de::DeserializeOwned, { self.stream_post(endpoint, body, headers) .and_then(|chunk| async move { diff --git a/src/image.rs b/src/image.rs index 6c26e7c..f19dbad 100644 --- a/src/image.rs +++ b/src/image.rs @@ -7,7 +7,6 @@ use std::{collections::HashMap, io::Read, iter}; use futures_util::{stream::Stream, TryFutureExt, TryStreamExt}; use hyper::Body; use serde::{Deserialize, Serialize}; -use serde_json::Value; use url::form_urlencoded; use crate::{docker::Docker, errors::Result, tarball, transport::tar}; @@ -111,7 +110,7 @@ impl<'docker> Images<'docker> { pub fn build( &self, opts: &BuildOptions, - ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker { + ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker { let mut endpoint = vec!["/build".to_owned()]; if let Some(query) = opts.serialize() { endpoint.push(query) @@ -131,7 +130,7 @@ impl<'docker> Images<'docker> { // Bubble up error inside the stream for backwards compatability tar_result?; - let value_stream = docker.stream_post_into_values( + let value_stream = docker.stream_post_into( endpoint.join("?"), Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, @@ -191,7 +190,7 @@ impl<'docker> Images<'docker> { pub fn pull( &self, opts: &PullOptions, - ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker { + ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); @@ -200,10 +199,7 @@ impl<'docker> Images<'docker> { .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); - Box::pin( - self.docker - .stream_post_into_values(path.join("?"), None, headers), - ) + Box::pin(self.docker.stream_post_into(path.join("?"), None, headers)) } /// exports a collection of named images, @@ -230,7 +226,7 @@ impl<'docker> Images<'docker> { pub fn import<R>( self, mut tarball: R, - ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker + ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker where R: Read + Send + 'docker, { @@ -240,7 +236,7 @@ impl<'docker> Images<'docker> { tarball.read_to_end(&mut bytes)?; - let value_stream = self.docker.stream_post_into_values( + let value_stream = self.docker.stream_post_into( "/images/load", Some((Body::from(bytes), tar())), None::<iter::Empty<_>>, @@ -873,6 +869,47 @@ pub enum Status { Deleted(String), } +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +/// Represents a response chunk from Docker api when building, pulling or importing an image. +pub enum ImageBuildChunk { + Update { + stream: String, + }, + Error { + error: String, + #[serde(rename = "errorDetail")] + error_detail: ErrorDetail, + }, + Digest { + aux: Aux, + }, + PullStatus { + status: String, + id: Option<String>, + progress: Option<String>, + #[serde(rename = "progressDetail")] + progress_detail: Option<ProgressDetail>, + }, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Aux { + #[serde(rename = "ID")] + id: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ErrorDetail { + message: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ProgressDetail { + current: Option<u64>, + total: Option<u64>, +} + #[cfg(test)] mod tests { use super::*; |