summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-07-10 13:06:51 +0200
committerGitHub <noreply@github.com>2021-07-10 13:06:51 +0200
commit7aa868fc3dc13445bdf3452146d3f7a7799d2da2 (patch)
tree7fc8856f173ab074c58578c6a367df720e86c60f /src
parent874400fa2ce416bc1d1ff82f51d47652128e89d4 (diff)
parent4514c2566b8f328b5a883e408cf09510a0a97efe (diff)
Merge pull request #306 from matthiasbeyer/image-pull-chunked
Image pull chunked
Diffstat (limited to 'src')
-rw-r--r--src/docker.rs8
-rw-r--r--src/image.rs57
2 files changed, 51 insertions, 14 deletions
diff --git a/src/docker.rs b/src/docker.rs
index edb97e4..1b7f603 100644
--- a/src/docker.rs
+++ b/src/docker.rs
@@ -7,8 +7,7 @@ use std::{collections::HashMap, env, io, path::Path};
use futures_util::{stream::Stream, TryStreamExt};
use hyper::{client::HttpConnector, Body, Client, Method};
use mime::Mime;
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
+use serde::{de, Deserialize, Serialize};
use url::form_urlencoded;
use crate::{
@@ -355,14 +354,15 @@ impl Docker {
/// Send a streaming post request that returns a stream of JSON values
///
/// Assumes that each received chunk contains one or more JSON values
- pub(crate) fn stream_post_into_values<'a, H>(
+ pub(crate) fn stream_post_into<'a, H, T>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
headers: Option<H>,
- ) -> impl Stream<Item = Result<Value>> + 'a
+ ) -> impl Stream<Item = Result<T>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
+ T: de::DeserializeOwned,
{
self.stream_post(endpoint, body, headers)
.and_then(|chunk| async move {
diff --git a/src/image.rs b/src/image.rs
index 6c26e7c..f19dbad 100644
--- a/src/image.rs
+++ b/src/image.rs
@@ -7,7 +7,6 @@ use std::{collections::HashMap, io::Read, iter};
use futures_util::{stream::Stream, TryFutureExt, TryStreamExt};
use hyper::Body;
use serde::{Deserialize, Serialize};
-use serde_json::Value;
use url::form_urlencoded;
use crate::{docker::Docker, errors::Result, tarball, transport::tar};
@@ -111,7 +110,7 @@ impl<'docker> Images<'docker> {
pub fn build(
&self,
opts: &BuildOptions,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
+ ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker {
let mut endpoint = vec!["/build".to_owned()];
if let Some(query) = opts.serialize() {
endpoint.push(query)
@@ -131,7 +130,7 @@ impl<'docker> Images<'docker> {
// Bubble up error inside the stream for backwards compatability
tar_result?;
- let value_stream = docker.stream_post_into_values(
+ let value_stream = docker.stream_post_into(
endpoint.join("?"),
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
@@ -191,7 +190,7 @@ impl<'docker> Images<'docker> {
pub fn pull(
&self,
opts: &PullOptions,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
+ ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
@@ -200,10 +199,7 @@ impl<'docker> Images<'docker> {
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
- Box::pin(
- self.docker
- .stream_post_into_values(path.join("?"), None, headers),
- )
+ Box::pin(self.docker.stream_post_into(path.join("?"), None, headers))
}
/// exports a collection of named images,
@@ -230,7 +226,7 @@ impl<'docker> Images<'docker> {
pub fn import<R>(
self,
mut tarball: R,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker
+ ) -> impl Stream<Item = Result<ImageBuildChunk>> + Unpin + 'docker
where
R: Read + Send + 'docker,
{
@@ -240,7 +236,7 @@ impl<'docker> Images<'docker> {
tarball.read_to_end(&mut bytes)?;
- let value_stream = self.docker.stream_post_into_values(
+ let value_stream = self.docker.stream_post_into(
"/images/load",
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
@@ -873,6 +869,47 @@ pub enum Status {
Deleted(String),
}
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(untagged)]
+/// Represents a response chunk from Docker api when building, pulling or importing an image.
+pub enum ImageBuildChunk {
+ Update {
+ stream: String,
+ },
+ Error {
+ error: String,
+ #[serde(rename = "errorDetail")]
+ error_detail: ErrorDetail,
+ },
+ Digest {
+ aux: Aux,
+ },
+ PullStatus {
+ status: String,
+ id: Option<String>,
+ progress: Option<String>,
+ #[serde(rename = "progressDetail")]
+ progress_detail: Option<ProgressDetail>,
+ },
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct Aux {
+ #[serde(rename = "ID")]
+ id: String,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ErrorDetail {
+ message: String,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ProgressDetail {
+ current: Option<u64>,
+ total: Option<u64>,
+}
+
#[cfg(test)]
mod tests {
use super::*;