From 6cd1d7f93bd6f150341582a1b54087cefffdbf87 Mon Sep 17 00:00:00 2001
From: "Eli W. Hunter" <42009212+elihunter173@users.noreply.github.com>
Date: Thu, 23 Jul 2020 23:54:12 -0400
Subject: Async/Await Support (continuation of #191) (#229)
* it builds!
* remove unused dependencies
* bump dependencies
* reimplement 'exec' endpoint
* update a few more examples
* update remaining examples
* fix doc tests, remove unused 'read' module
* remove feature-gated async closures
* split futures dependency to just 'futures-util'
* update version and readme
* make functions accepting Body generic over Into
again
* update changelog
* reinstate 'unix-socket' feature
* reinstate 'attach' endpoint
* fix clippy lints
* fix documentation typo
* fix container copyfrom/into implementations
* add convenience methods for TtyChunk struct
* remove 'main' from code example to silence clippy lint
* Update hyper to 0.13.1
* Add Send bounds to TtyWriter
* Appease clippy
* Fix examples
* Update issue in changelog
Co-authored-by: Daniel Eades
Co-authored-by: Marc Schreiber
---
CHANGELOG.md | 4 +
Cargo.toml | 21 +-
examples/attach.rs | 32 ++
examples/containercopyfrom.rs | 25 +-
examples/containercopyinto.rs | 18 +-
examples/containercreate.rs | 15 +-
examples/containerdelete.rs | 14 +-
examples/containerexec.rs | 37 +-
examples/containerinspect.rs | 16 +-
examples/containers.rs | 17 +-
examples/custom_host.rs | 15 +-
examples/events.rs | 19 +-
examples/export.rs | 26 +-
examples/imagebuild.rs | 25 +-
examples/imagedelete.rs | 17 +-
examples/imageinspect.rs | 16 +-
examples/imagepull.rs | 23 +-
examples/imagepull_auth.rs | 23 +-
examples/images.rs | 19 +-
examples/imagesearch.rs | 17 +-
examples/imagetag.rs | 10 +-
examples/import.rs | 21 +-
examples/info.rs | 15 +-
examples/logs.rs | 36 +-
examples/networkconnect.rs | 14 +-
examples/networkcreate.rs | 14 +-
examples/networkdelete.rs | 14 +-
examples/networkdisconnect.rs | 25 +-
examples/networkinspect.rs | 16 +-
examples/networks.rs | 20 +-
examples/stats.rs | 21 +-
examples/top.rs | 16 +-
examples/version.rs | 15 +-
examples/volumecreate.rs | 15 +-
examples/volumedelete.rs | 14 +-
examples/volumes.rs | 16 +-
lib.rs | 0
src/errors.rs | 19 +-
src/lib.rs | 793 ++++++++++++++++++++++--------------------
src/read.rs | 105 ------
src/transport.rs | 280 +++++++++------
src/tty.rs | 372 +++++++-------------
42 files changed, 1097 insertions(+), 1153 deletions(-)
create mode 100644 examples/attach.rs
create mode 100644 lib.rs
delete mode 100644 src/read.rs
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b4ad43..8e72e8c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+# 0.7.0
+
+* async-await support [#229](https://github.com/softprops/shiplift/pull/229)
+
# 0.6.0
* add chrono as an optional feature, enabled by default [#190](https://github.com/softprops/shiplift/pull/190)
diff --git a/Cargo.toml b/Cargo.toml
index c85ed4e..2dad6ce 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,28 +18,31 @@ coveralls = { repository = "softprops/shipflit" }
maintenance = { status = "actively-developed" }
[dependencies]
-log = "0.4"
-mime = "0.3"
base64 = "0.11"
byteorder = "1.3"
bytes = "0.4"
chrono = { version = "0.4", optional = true, features = ["serde"] }
flate2 = "1.0"
-futures = "0.1"
-hyper = "0.12"
-hyper-openssl = { version = "0.7", optional = true }
-hyperlocal = { version = "0.6", optional = true }
+futures-util = "0.3"
+futures_codec = "0.3"
+hyper = "0.13"
+hyper-openssl = { version = "0.8", optional = true }
+hyperlocal = { version = "0.7", optional = true }
+log = "0.4"
+mime = "0.3"
openssl = { version = "0.10", optional = true }
+pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tar = "0.4"
-tokio = "0.1"
-tokio-codec = "0.1"
-tokio-io = "0.1"
+tokio = "0.2"
url = "2.1"
[dev-dependencies]
env_logger = "0.7"
+# Required for examples to run
+futures = "0.3.1"
+tokio = { version = "0.2.6", features = ["macros"] }
[features]
default = ["chrono", "unix-socket", "tls"]
diff --git a/examples/attach.rs b/examples/attach.rs
new file mode 100644
index 0000000..e4ed637
--- /dev/null
+++ b/examples/attach.rs
@@ -0,0 +1,32 @@
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker};
+use std::env;
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ let docker = Docker::new();
+ let id = env::args()
+ .nth(1)
+ .expect("You need to specify a container id");
+
+ let tty_multiplexer = docker.containers().get(&id).attach().await?;
+
+ let (mut reader, _writer) = tty_multiplexer.split();
+
+ while let Some(tty_result) = reader.next().await {
+ match tty_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+
+ Ok(())
+}
+
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
+}
diff --git a/examples/containercopyfrom.rs b/examples/containercopyfrom.rs
index 2ebeccf..acbfa19 100644
--- a/examples/containercopyfrom.rs
+++ b/examples/containercopyfrom.rs
@@ -1,8 +1,10 @@
+use futures::TryStreamExt;
use shiplift::Docker;
use std::{env, path};
-use tokio::prelude::{Future, Stream};
+use tar::Archive;
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Box> {
let docker = Docker::new();
let id = env::args()
.nth(1)
@@ -10,17 +12,16 @@ fn main() {
let path = env::args()
.nth(2)
.expect("Usage: cargo run --example containercopyfrom -- ");
- let fut = docker
+
+ let bytes = docker
.containers()
.get(&id)
.copy_from(path::Path::new(&path))
- .collect()
- .and_then(|stream| {
- let tar = stream.concat();
- let mut archive = tar::Archive::new(tar.as_slice());
- archive.unpack(env::current_dir()?)?;
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .try_concat()
+ .await?;
+
+ let mut archive = Archive::new(&bytes[..]);
+ archive.unpack(env::current_dir()?)?;
+
+ Ok(())
}
diff --git a/examples/containercopyinto.rs b/examples/containercopyinto.rs
index 63f0a2d..689e7af 100644
--- a/examples/containercopyinto.rs
+++ b/examples/containercopyinto.rs
@@ -1,8 +1,8 @@
use shiplift::Docker;
-use std::env;
-use tokio::prelude::Future;
+use std::{env, fs::File, io::Read};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args()
.nth(1)
@@ -11,17 +11,17 @@ fn main() {
.nth(2)
.expect("Usage: cargo run --example containercopyinto -- ");
- use std::{fs::File, io::prelude::*};
-
let mut file = File::open(&path).unwrap();
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)
.expect("Cannot read file on the localhost.");
- let fut = docker
+ if let Err(e) = docker
.containers()
.get(&id)
- .copy_file_into(path, &bytes[..])
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .copy_file_into(path, &bytes)
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/containercreate.rs b/examples/containercreate.rs
index d061f70..ef579a6 100644
--- a/examples/containercreate.rs
+++ b/examples/containercreate.rs
@@ -1,16 +1,19 @@
use shiplift::{ContainerOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let image = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
+
+ match docker
.containers()
.create(&ContainerOptions::builder(image.as_ref()).build())
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/containerdelete.rs b/examples/containerdelete.rs
index e3c2036..86bd9c5 100644
--- a/examples/containerdelete.rs
+++ b/examples/containerdelete.rs
@@ -1,16 +1,14 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify an container id");
- let fut = docker
- .containers()
- .get(&id)
- .delete()
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ if let Err(e) = docker.containers().get(&id).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/containerexec.rs b/examples/containerexec.rs
index 7f12f88..6c545a7 100644
--- a/examples/containerexec.rs
+++ b/examples/containerexec.rs
@@ -1,8 +1,9 @@
-use shiplift::{tty::StreamType, Docker, ExecContainerOptions};
-use std::env;
-use tokio::prelude::{Future, Stream};
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker, ExecContainerOptions};
+use std::{env, str::from_utf8};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
@@ -18,19 +19,19 @@ fn main() {
.attach_stdout(true)
.attach_stderr(true)
.build();
- let fut = docker
- .containers()
- .get(&id)
- .exec(&options)
- .for_each(|chunk| {
- match chunk.stream_type {
- StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
- StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
- StreamType::StdIn => unreachable!(),
- }
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ while let Some(exec_result) = docker.containers().get(&id).exec(&options).next().await {
+ match exec_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+}
+
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
}
diff --git a/examples/containerinspect.rs b/examples/containerinspect.rs
index 0f853bc..8de2a67 100644
--- a/examples/containerinspect.rs
+++ b/examples/containerinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example containerinspect -- ");
- let fut = docker
- .containers()
- .get(&id)
- .inspect()
- .map(|container| println!("{:#?}", container))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.containers().get(&id).inspect().await {
+ Ok(container) => println!("{:#?}", container),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/containers.rs b/examples/containers.rs
index 0eb51af..72de140 100644
--- a/examples/containers.rs
+++ b/examples/containers.rs
@@ -1,18 +1,15 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
- let fut = docker
- .containers()
- .list(&Default::default())
- .map(|containers| {
+ match docker.containers().list(&Default::default()).await {
+ Ok(containers) => {
for c in containers {
println!("container -> {:#?}", c)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/custom_host.rs b/examples/custom_host.rs
index 3b1fd3e..8b06eae 100644
--- a/examples/custom_host.rs
+++ b/examples/custom_host.rs
@@ -1,13 +1,10 @@
-use futures::Future;
use shiplift::Docker;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::host("http://yourhost".parse().unwrap());
-
- let fut = docker
- .ping()
- .map(|pong| println!("Ping: {}", pong))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ match docker.ping().await {
+ Ok(pong) => println!("Ping: {}", pong),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/events.rs b/examples/events.rs
index 0e35860..e44d68f 100644
--- a/examples/events.rs
+++ b/examples/events.rs
@@ -1,16 +1,15 @@
+use futures::StreamExt;
use shiplift::Docker;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("listening for events");
- let fut = docker
- .events(&Default::default())
- .for_each(|e| {
- println!("event -> {:?}", e);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ while let Some(event_result) = docker.events(&Default::default()).next().await {
+ match event_result {
+ Ok(event) => println!("event -> {:?}", event),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/export.rs b/examples/export.rs
index 55d1b7b..34f460d 100644
--- a/examples/export.rs
+++ b/examples/export.rs
@@ -1,8 +1,10 @@
-use shiplift::{errors::Error, Docker};
+use futures::StreamExt;
use std::{env, fs::OpenOptions, io::Write};
-use tokio::prelude::{Future, Stream};
-fn main() {
+use shiplift::{errors::Error, Docker};
+
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args().nth(1).expect("You need to specify an image id");
@@ -11,17 +13,13 @@ fn main() {
.create(true)
.open(format!("{}.tar", &id))
.unwrap();
+
let images = docker.images();
- let fut = images
- .get(&id)
- .export()
- .for_each(move |bytes| {
- export_file
- .write(&bytes[..])
- .map(|n| println!("copied {} bytes", n))
- .map_err(Error::IO)
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut)
+ while let Some(export_result) = images.get(&id).export().next().await {
+ match export_result.and_then(|bytes| export_file.write(&bytes).map_err(Error::from)) {
+ Ok(n) => println!("copied {} bytes", n),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagebuild.rs b/examples/imagebuild.rs
index 6dbea78..80d825c 100644
--- a/examples/imagebuild.rs
+++ b/examples/imagebuild.rs
@@ -1,19 +1,22 @@
+use futures::StreamExt;
use shiplift::{BuildOptions, Docker};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args().nth(1).expect("You need to specify a path");
- let fut = docker
- .images()
- .build(&BuildOptions::builder(path).tag("shiplift_test").build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ let options = BuildOptions::builder(path).tag("shiplift_test").build();
- tokio::run(fut);
+ let images = docker.images();
+
+ let mut stream = images.build(&options);
+
+ while let Some(build_result) = stream.next().await {
+ match build_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagedelete.rs b/examples/imagedelete.rs
index efa763c..3b9bc60 100644
--- a/examples/imagedelete.rs
+++ b/examples/imagedelete.rs
@@ -1,21 +1,18 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let img = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
- .images()
- .get(&img[..])
- .delete()
- .map(|statuses| {
+ match docker.images().get(&img).delete().await {
+ Ok(statuses) => {
for status in statuses {
println!("{:?}", status);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imageinspect.rs b/examples/imageinspect.rs
index 494480a..b7d2f9a 100644
--- a/examples/imageinspect.rs
+++ b/examples/imageinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example imageinspect -- ");
- let fut = docker
- .images()
- .get(&id)
- .inspect()
- .map(|image| println!("{:#?}", image))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.images().get(&id).inspect().await {
+ Ok(image) => println!("{:#?}", image),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagepull.rs b/examples/imagepull.rs
index 84a6149..5b3fbf4 100644
--- a/examples/imagepull.rs
+++ b/examples/imagepull.rs
@@ -1,22 +1,25 @@
// cargo run --example imagepull busybox
+use futures::StreamExt;
use shiplift::{Docker, PullOptions};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
+
+ let mut stream = docker
.images()
- .pull(&PullOptions::builder().image(img).build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .pull(&PullOptions::builder().image(img).build());
+
+ while let Some(pull_result) = stream.next().await {
+ match pull_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagepull_auth.rs b/examples/imagepull_auth.rs
index 1c559c7..6f0ceec 100644
--- a/examples/imagepull_auth.rs
+++ b/examples/imagepull_auth.rs
@@ -1,10 +1,11 @@
// cargo run --example imagepull_auth busybox username password
+use futures::StreamExt;
use shiplift::{Docker, PullOptions, RegistryAuth};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
@@ -16,13 +17,15 @@ fn main() {
.username(username)
.password(password)
.build();
- let fut = docker
+
+ let mut stream = docker
.images()
- .pull(&PullOptions::builder().image(img).auth(auth).build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .pull(&PullOptions::builder().image(img).auth(auth).build());
+
+ while let Some(pull_result) = stream.next().await {
+ match pull_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("{}", e),
+ }
+ }
}
diff --git a/examples/images.rs b/examples/images.rs
index 7a8a094..1c0852e 100644
--- a/examples/images.rs
+++ b/examples/images.rs
@@ -1,13 +1,14 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("docker images in stock");
- let fut = docker
- .images()
- .list(&Default::default())
- .map(|images| {
+
+ let result = docker.images().list(&Default::default()).await;
+
+ match result {
+ Ok(images) => {
for i in images {
println!(
"{} {} {:?}",
@@ -16,7 +17,7 @@ fn main() {
i.repo_tags.unwrap_or_else(|| vec!["none".into()])
);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagesearch.rs b/examples/imagesearch.rs
index a6d6e52..e2fd110 100644
--- a/examples/imagesearch.rs
+++ b/examples/imagesearch.rs
@@ -1,17 +1,16 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("remote docker images in stock");
- let fut = docker
- .images()
- .search("rust")
- .map(|results| {
+
+ match docker.images().search("rust").await {
+ Ok(results) => {
for result in results {
println!("{} - {}", result.name, result.description);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagetag.rs b/examples/imagetag.rs
index 7ae78dd..e36af76 100644
--- a/examples/imagetag.rs
+++ b/examples/imagetag.rs
@@ -2,9 +2,9 @@
use shiplift::{Docker, Image, TagOptions};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
@@ -21,7 +21,7 @@ fn main() {
let image = Image::new(&docker, img);
- let fut = image.tag(&tag_opts).map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ if let Err(e) = image.tag(&tag_opts).await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/import.rs b/examples/import.rs
index 20c61e6..7ea35bd 100644
--- a/examples/import.rs
+++ b/examples/import.rs
@@ -1,8 +1,9 @@
+use futures::StreamExt;
use shiplift::Docker;
use std::{env, fs::File};
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args()
.nth(1)
@@ -11,14 +12,12 @@ fn main() {
let reader = Box::from(f);
- let fut = docker
- .images()
- .import(reader)
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ let mut stream = docker.images().import(reader);
- tokio::run(fut);
+ while let Some(import_result) = stream.next().await {
+ match import_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/info.rs b/examples/info.rs
index 05fdede..76036e6 100644
--- a/examples/info.rs
+++ b/examples/info.rs
@@ -1,12 +1,11 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
- tokio::run(
- docker
- .info()
- .map(|info| println!("info {:?}", info))
- .map_err(|e| eprintln!("Error: {}", e)),
- );
+
+ match docker.info().await {
+ Ok(info) => println!("info {:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/logs.rs b/examples/logs.rs
index 57f12be..73c8045 100644
--- a/examples/logs.rs
+++ b/examples/logs.rs
@@ -1,25 +1,31 @@
-use shiplift::{tty::StreamType, Docker, LogsOptions};
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker, LogsOptions};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a container id");
- let fut = docker
+
+ let mut logs_stream = docker
.containers()
.get(&id)
- .logs(&LogsOptions::builder().stdout(true).stderr(true).build())
- .for_each(|chunk| {
- match chunk.stream_type {
- StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
- StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
- StreamType::StdIn => unreachable!(),
- }
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ .logs(&LogsOptions::builder().stdout(true).stderr(true).build());
+
+ while let Some(log_result) = logs_stream.next().await {
+ match log_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+}
- tokio::run(fut);
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
}
diff --git a/examples/networkconnect.rs b/examples/networkconnect.rs
index 0cfc8fc..a04db63 100644
--- a/examples/networkconnect.rs
+++ b/examples/networkconnect.rs
@@ -1,18 +1,20 @@
use shiplift::{ContainerConnectionOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let networks = docker.networks();
+
match (env::args().nth(1), env::args().nth(2)) {
(Some(container_id), Some(network_id)) => {
- let fut = networks
+ if let Err(e) = networks
.get(&network_id)
.connect(&ContainerConnectionOptions::builder(&container_id).build())
- .map(|v| println!("{:?}", v))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
}
_ => eprintln!("please provide a container_id and network_id"),
}
diff --git a/examples/networkcreate.rs b/examples/networkcreate.rs
index 30bb41c..3173ab4 100644
--- a/examples/networkcreate.rs
+++ b/examples/networkcreate.rs
@@ -1,20 +1,22 @@
use shiplift::{Docker, NetworkCreateOptions};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let network_name = env::args()
.nth(1)
.expect("You need to specify a network name");
- let fut = docker
+ match docker
.networks()
.create(
&NetworkCreateOptions::builder(network_name.as_ref())
.driver("bridge")
.build(),
)
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/networkdelete.rs b/examples/networkdelete.rs
index 16fc4ab..e419f90 100644
--- a/examples/networkdelete.rs
+++ b/examples/networkdelete.rs
@@ -1,18 +1,14 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a network id");
- let fut = docker
- .networks()
- .get(&id)
- .delete()
- .map(|network| println!("{:?}", network))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ if let Err(e) = docker.networks().get(&id).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/networkdisconnect.rs b/examples/networkdisconnect.rs
index 9588ecc..8d58b35 100644
--- a/examples/networkdisconnect.rs
+++ b/examples/networkdisconnect.rs
@@ -1,18 +1,27 @@
use shiplift::{ContainerConnectionOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+async fn network_disconnect(
+ container_id: &str,
+ network_id: &str,
+) {
let docker = Docker::new();
let networks = docker.networks();
+
+ if let Err(e) = networks
+ .get(network_id)
+ .disconnect(&ContainerConnectionOptions::builder(container_id).build())
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
+}
+
+#[tokio::main]
+async fn main() {
match (env::args().nth(1), env::args().nth(2)) {
(Some(container_id), Some(network_id)) => {
- let fut = networks
- .get(&network_id)
- .disconnect(&ContainerConnectionOptions::builder(&container_id).build())
- .map(|v| println!("{:?}", v))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ network_disconnect(&container_id, &network_id).await;
}
_ => eprintln!("please provide a container_id and network_id"),
}
diff --git a/examples/networkinspect.rs b/examples/networkinspect.rs
index 86a076b..143c637 100644
--- a/examples/networkinspect.rs
+++ b/examples/networkinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a network id");
- let fut = docker
- .networks()
- .get(&id)
- .inspect()
- .map(|network| println!("{:#?}", network))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.networks().get(&id).inspect().await {
+ Ok(network_info) => println!("{:#?}", network_info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/networks.rs b/examples/networks.rs
index 9ceea99..4a1dcf1 100644
--- a/examples/networks.rs
+++ b/examples/networks.rs
@@ -1,17 +1,17 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
+
let docker = Docker::new();
- let fut = docker
- .networks()
- .list(&Default::default())
- .map(|networks| {
+
+ match docker.networks().list(&Default::default()).await {
+ Ok(networks) => {
for network in networks {
- println!("network -> {:#?}", network);
+ println!("network -> {:#?}", network)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/stats.rs b/examples/stats.rs
index 9e14cf4..063f502 100644
--- a/examples/stats.rs
+++ b/examples/stats.rs
@@ -1,21 +1,20 @@
// cargo run --example stats --
+use futures::StreamExt;
use shiplift::Docker;
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let containers = docker.containers();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example stats -- ");
- let fut = containers
- .get(&id)
- .stats()
- .for_each(|stat| {
- println!("{:?}", stat);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ while let Some(result) = containers.get(&id).stats().next().await {
+ match result {
+ Ok(stat) => println!("{:?}", stat),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/top.rs b/examples/top.rs
index 5fc4229..39e8ea6 100644
--- a/examples/top.rs
+++ b/examples/top.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example top -- ");
- let fut = docker
- .containers()
- .get(&id)
- .top(Default::default())
- .map(|top| println!("{:#?}", top))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.containers().get(&id).top(Default::default()).await {
+ Ok(top) => println!("{:#?}", top),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/version.rs b/examples/version.rs
index 6125e56..e1ab5d2 100644
--- a/examples/version.rs
+++ b/examples/version.rs
@@ -1,13 +1,10 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
- env_logger::init();
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
- let fut = docker
- .version()
- .map(|ver| println!("version -> {:#?}", ver))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ match docker.version().await {
+ Ok(ver) => println!("version -> {:#?}", ver),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/volumecreate.rs b/examples/volumecreate.rs
index 83f5045..a243be6 100644
--- a/examples/volumecreate.rs
+++ b/examples/volumecreate.rs
@@ -1,8 +1,8 @@
use shiplift::{builder::VolumeCreateOptions, Docker};
use std::{collections::HashMap, env};
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
@@ -13,15 +13,16 @@ fn main() {
let mut labels = HashMap::new();
labels.insert("com.github.softprops", "shiplift");
- let fut = volumes
+ match volumes
.create(
&VolumeCreateOptions::builder()
.name(volume_name.as_ref())
.labels(&labels)
.build(),
)
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/volumedelete.rs b/examples/volumedelete.rs
index 3800d22..ec1da7e 100644
--- a/examples/volumedelete.rs
+++ b/examples/volumedelete.rs
@@ -1,8 +1,8 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
@@ -10,11 +10,7 @@ fn main() {
.nth(1)
.expect("You need to specify an volume name");
- let fut = volumes
- .get(&volume_name)
- .delete()
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ if let Err(e) = volumes.get(&volume_name).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/volumes.rs b/examples/volumes.rs
index c5548ec..d45c00a 100644
--- a/examples/volumes.rs
+++ b/examples/volumes.rs
@@ -1,18 +1,16 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
- let fut = volumes
- .list()
- .map(|volumes| {
+ match volumes.list().await {
+ Ok(volumes) => {
for v in volumes {
println!("volume -> {:#?}", v)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/lib.rs b/lib.rs
new file mode 100644
index 0000000..e69de29
diff --git a/src/errors.rs b/src/errors.rs
index 5506198..9d01f91 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -1,8 +1,10 @@
//! Representations of various client errors
-use hyper::{self, StatusCode};
+use hyper::{self, http, StatusCode};
use serde_json::Error as SerdeError;
-use std::{error::Error as StdError, fmt, io::Error as IoError, string::FromUtf8Error};
+use std::{error::Error as StdError, fmt, string::FromUtf8Error};
+
+use futures_util::io::Error as IoError;
#[derive(Debug)]
pub enum Error {
@@ -34,12 +36,25 @@ impl From for Error {
}
}
+impl From for Error {
+ fn from(error: http::uri::InvalidUri) -> Self {
+ let http_error: http::Error = error.into();
+ http_error.into()
+ }
+}
+
impl From for Error {
fn from(error: IoError) -> Error {
Error::IO(error)
}
}
+impl From for Error {
+ fn from(error: FromUtf8Error) -> Error {
+ Error::Encoding(error)
+ }
+}
+
impl fmt::Display for Error {
fn fmt(
&self,
diff --git a/src/lib.rs b/src/lib.rs
index 16e5625..06b31a1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,22 +3,22 @@
//! # examples
//!
//! ```no_run
-//! use tokio::prelude::Future;
-//!
+//! # async {
//! let docker = shiplift::Docker::new();
-//! let fut = docker.images().list(&Default::default()).map(|images| {
-//! println!("docker images in stock");
-//! for i in images {
-//! println!("{:?}", i.repo_tags);
-//! }
-//! }).map_err(|e| eprintln!("Something bad happened! {}", e));
//!
-//! tokio::run(fut);
+//! match docker.images().list(&Default::default()).await {
+//! Ok(images) => {
+//! for image in images {
+//! println!("{:?}", image.repo_tags);
+//! }
+//! },
+//! Err(e) => eprintln!("Something bad happened! {}", e),
+//! }
+//! # };
//! ```
pub mod builder;
pub mod errors;
-pub mod read;
pub mod rep;
pub mod transport;
pub mod tty;
@@ -35,7 +35,6 @@ pub use crate::{
errors::Error,
};
use crate::{
- read::StreamReader,
rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit,
History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo,
@@ -43,9 +42,14 @@ use crate::{
Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep,
},
transport::{tar, Transport},
- tty::TtyDecoder,
+ tty::Multiplexer as TtyMultiPlexer,
+};
+use futures_util::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+ TryFutureExt, TryStreamExt,
};
-use futures::{future::Either, Future, IntoFuture, Stream};
+// use futures::{future::Either, Future, IntoFuture, Stream};
pub use hyper::Uri;
use hyper::{client::HttpConnector, Body, Client, Method};
#[cfg(feature = "tls")]
@@ -56,8 +60,7 @@ use mime::Mime;
#[cfg(feature = "tls")]
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
use serde_json::Value;
-use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration};
-use tokio_codec::{FramedRead, LinesCodec};
+use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration};
use url::form_urlencoded;
/// Represents the result of all docker operations
@@ -70,19 +73,19 @@ pub struct Docker {
}
/// Interface for accessing and manipulating a named docker image
-pub struct Image<'a, 'b> {
+pub struct Image<'a> {
docker: &'a Docker,
- name: Cow<'b, str>,
+ name: Cow<'a, str>,
}
-impl<'a, 'b> Image<'a, 'b> {
+impl<'a> Image<'a> {
/// Exports an interface for operations that may be performed against a named image
pub fn new(
docker: &'a Docker,
name: S,
- ) -> Image<'a, 'b>
+ ) -> Self
where
- S: Into>,
+ S: Into>,
{
Image {
docker,
@@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> {
}
/// Inspects a named image's details
- pub fn inspect(&self) -> impl Future- {
+ pub async fn inspect(&self) -> Result {
self.docker
.get_json(&format!("/images/{}/json", self.name)[..])
+ .await
}
/// Lists the history of the images set of changes
- pub fn history(&self) -> impl Future
- , Error = Error> {
+ pub async fn history(&self) -> Result> {
self.docker
.get_json(&format!("/images/{}/history", self.name)[..])
+ .await
}
/// Deletes an image
- pub fn delete(&self) -> impl Future
- , Error = Error> {
+ pub async fn delete(&self) -> Result> {
self.docker
.delete_json::>(&format!("/images/{}", self.name)[..])
+ .await
}
/// Export this image to a tarball
- pub fn export(&self) -> impl Stream
- , Error = Error> {
- self.docker
- .stream_get(&format!("/images/{}/get", self.name)[..])
- .map(|c| c.to_vec())
+ pub fn export(&self) -> impl Stream
- >> + Unpin + 'a {
+ Box::pin(
+ self.docker
+ .stream_get(format!("/images/{}/get", self.name))
+ .map_ok(|c| c.to_vec()),
+ )
}
/// Adds a tag to an image
- pub fn tag(
+ pub async fn tag(
&self,
opts: &TagOptions,
- ) -> impl Future
- {
+ ) -> Result<()> {
let mut path = vec![format!("/images/{}/tag", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.post::(&path.join("?"), None).map(|_| ())
+ let _ = self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
}
@@ -141,76 +150,74 @@ impl<'a> Images<'a> {
/// Builds a new image build by reading a Dockerfile in a target directory
pub fn build(
- &self,
- opts: &BuildOptions,
- ) -> impl Stream
- {
- let mut path = vec!["/build".to_owned()];
- if let Some(query) = opts.serialize() {
- path.push(query)
- }
+ &'a self,
+ opts: &'a BuildOptions,
+ ) -> impl Stream
- > + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut path = vec!["/build".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+
+ let mut bytes = Vec::default();
+
+ tarball::dir(&mut bytes, &opts.path[..])?;
+
+ let chunk_stream = self.docker.stream_post(
+ path.join("?"),
+ Some((Body::from(bytes), tar())),
+ None::>,
+ );
- let mut bytes = vec![];
-
- match tarball::dir(&mut bytes, &opts.path[..]) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- &path.join("?"),
- Some((Body::from(bytes), tar())),
- None::>,
- )
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::()
- .collect::>(),
- )
- .map_err(Error::from)
- })
- .flatten(),
- ) as Box + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box + Send>,
- }
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
/// Lists the docker images on the current docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &ImageListOptions,
- ) -> impl Future
- , Error = Error> {
+ ) -> Result> {
let mut path = vec!["/images/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.docker.get_json::>(&path.join("?"))
+ self.docker.get_json::>(&path.join("?")).await
}
/// Returns a reference to a set of operations available for a named image
- pub fn get<'b>(
+ pub fn get(
&self,
- name: &'b str,
- ) -> Image<'a, 'b> {
+ name: &'a str,
+ ) -> Image<'a> {
Image::new(self.docker, name)
}
/// Search for docker images by term
- pub fn search(
+ pub async fn search(
&self,
term: &str,
- ) -> impl Future
- , Error = Error> {
+ ) -> Result> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("term", term)
.finish();
self.docker
.get_json::>(&format!("/images/search?{}", query)[..])
+ .await
}
/// Pull and create a new docker images from an existing image
pub fn pull(
&self,
opts: &PullOptions,
- ) -> impl Stream
- {
+ ) -> impl Stream
- > + Unpin + 'a {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
@@ -218,18 +225,15 @@ impl<'a> Images<'a> {
let headers = opts
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
- self.docker
- .stream_post::(&path.join("?"), None, headers)
- // todo: give this a proper enum type
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::()
- .collect::>(),
- )
- .map_err(Error::from)
- })
- .flatten()
+
+ Box::pin(
+ self.docker
+ .stream_post(path.join("?"), None, headers)
+ .and_then(move |chunk| {
+ // todo: give this a proper enum type
+ futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from))
+ }),
+ )
}
/// exports a collection of named images,
@@ -237,14 +241,14 @@ impl<'a> Images<'a> {
pub fn export(
&self,
names: Vec<&str>,
- ) -> impl Stream
- , Error = Error> {
+ ) -> impl Stream
- >> + 'a {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
self.docker
- .stream_get(&format!("/images/get?{}", query)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/images/get?{}", query))
+ .map_ok(|c| c.to_vec())
}
/// imports an image or set of images from a given tarball source
@@ -252,43 +256,44 @@ impl<'a> Images<'a> {
pub fn import(
self,
mut tarball: Box,
- ) -> impl Stream
- {
- let mut bytes = Vec::new();
-
- match tarball.read_to_end(&mut bytes) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- "/images/load",
- Some((Body::from(bytes), tar())),
- None::>,
- )
- .and_then(|bytes| {
- serde_json::from_slice::<'_, Value>(&bytes[..])
- .map_err(Error::from)
- .into_future()
- }),
- ) as Box + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box + Send>,
- }
+ ) -> impl Stream
- > + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut bytes = Vec::default();
+
+ tarball.read_to_end(&mut bytes)?;
+
+ let chunk_stream = self.docker.stream_post(
+ "/images/load",
+ Some((Body::from(bytes), tar())),
+ None::>,
+ );
+
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
}
/// Interface for accessing and manipulating a docker container
-pub struct Container<'a, 'b> {
+pub struct Container<'a> {
docker: &'a Docker,
- id: Cow<'b, str>,
+ id: Cow<'a, str>,
}
-impl<'a, 'b> Container<'a, 'b> {
+impl<'a> Container<'a> {
/// Exports an interface exposing operations against a container instance
pub fn new
(
docker: &'a Docker,
id: S,
- ) -> Container<'a, 'b>
+ ) -> Self
where
- S: Into>,
+ S: Into>,
{
Container {
docker,
@@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> {
}
/// Inspects the current docker container instance's details
- pub fn inspect(&self) -> impl Future- {
+ pub async fn inspect(&self) -> Result {
self.docker
.get_json::(&format!("/containers/{}/json", self.id)[..])
+ .await
}
/// Returns a `top` view of information about the container process
- pub fn top(
+ pub async fn top(
&self,
psargs: Option<&str>,
- ) -> impl Future
- {
+ ) -> Result {
let mut path = vec![format!("/containers/{}/top", self.id)];
if let Some(ref args) = psargs {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.get_json(&path.join("?"))
+ self.docker.get_json(&path.join("?")).await
}
/// Returns a stream of logs emitted but the container instance
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream
- {
+ ) -> impl Stream
- > + Unpin + 'a {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?")));
+ let stream = Box::pin(self.docker.stream_get(path.join("?")));
- FramedRead::new(chunk_stream, decoder)
+ Box::pin(tty::decode(stream))
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach(&self) -> impl Future
- {
- self.docker.stream_post_upgrade_multiplexed::(
- &format!(
- "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
- self.id
- ),
- None,
- )
+ /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
+ async fn attach_raw(&self) -> Result {
+ self.docker
+ .stream_post_upgrade(
+ format!(
+ "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
+ self.id
+ ),
+ None,
+ )
+ .await
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach_blocking(&self) -> Result {
- self.attach().map(|s| s.wait()).wait()
+ /// Attaches a `[TtyMultiplexer]` to the container.
+ ///
+ /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin.
+ ///
+ /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method
+ pub async fn attach(&self) -> Result> {
+ let tcp_stream = self.attach_raw().await?;
+
+ Ok(TtyMultiPlexer::new(tcp_stream))
}
/// Returns a set of changes made to the container instance
- pub fn changes(&self) -> impl Future
- , Error = Error> {
+ pub async fn changes(&self) -> Result> {
self.docker
.get_json::>(&format!("/containers/{}/changes", self.id)[..])
+ .await
}
/// Exports the current docker container into a tarball
- pub fn export(&self) -> impl Stream
- , Error = Error> {
+ pub fn export(&self) -> impl Stream
- >> + 'a {
self.docker
- .stream_get(&format!("/containers/{}/export", self.id)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/containers/{}/export", self.id))
+ .map_ok(|c| c.to_vec())
}
/// Returns a stream of stats specific to this container instance
- pub fn stats(&self) -> impl Stream
- {
- let decoder = LinesCodec::new();
- let stream_of_chunks = StreamReader::new(
- self.docker
- .stream_get(&format!("/containers/{}/stats", self.id)[..]),
- );
+ pub fn stats(&'a self) -> impl Stream
- > + Unpin + 'a {
+ let codec = futures_codec::LinesCodec {};
- FramedRead::new(stream_of_chunks, decoder)
- .map_err(Error::IO)
- .and_then(|s| {
- serde_json::from_str::(&s)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ let reader = Box::pin(
+ self.docker
+ .stream_get(format!("/containers/{}/stats", self.id))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
}
/// Start the container instance
- pub fn start(&self) -> impl Future
- {
+ pub async fn start(&self) -> Result<()> {
self.docker
- .post::(&format!("/containers/{}/start", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/start", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Stop the container instance
- pub fn stop(
+ pub async fn stop(
&self,
wait: Option,
- ) -> impl Future
- {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/stop", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> {
path.push(encoded)
}
- self.docker.post::(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Restart the container instance
- pub fn restart(
+ pub async fn restart(
&self,
wait: Option,
- ) -> impl Future
- {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/restart", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Kill the container instance
- pub fn kill(
+ pub async fn kill(
&self,
signal: Option<&str>,
- ) -> impl Future
- {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/kill", self.id)];
if let Some(sig) = signal {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Rename the container instance
- pub fn rename(
+ pub async fn rename(
&self,
name: &str,
- ) -> impl Future
- {
+ ) -> Result<()> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish();
self.docker
- .post::(
+ .post(
&format!("/containers/{}/rename?{}", self.id, query)[..],
None,
)
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Pause the container instance
- pub fn pause(&self) -> impl Future
- {
+ pub async fn pause(&self) -> Result<()> {
self.docker
- .post::(&format!("/containers/{}/pause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/pause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Unpause the container instance
- pub fn unpause(&self) -> impl Future
- {
+ pub async fn unpause(&self) -> Result<()> {
self.docker
- .post::(&format!("/containers/{}/unpause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/unpause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Wait until the container stops
- pub fn wait(&self) -> impl Future
- {
+ pub async fn wait(&self) -> Result {
self.docker
- .post_json::(&format!("/containers/{}/wait", self.id)[..], None)
+ .post_json(
+ format!("/containers/{}/wait", self.id),
+ Option::<(Body, Mime)>::None,
+ )
+ .await
}
/// Delete the container instance
///
/// Use remove instead to use the force/v options.
- pub fn delete(&self) -> impl Future
- {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/containers/{}", self.id)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Delete the container instance (todo: force/v)
- pub fn remove(
+ pub async fn remove(
&self,
opts: RmContainerOptions,
- ) -> impl Future
- {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.delete(&path.join("?")).map(|_| ())
+ self.docker.delete(&path.join("?")).await?;
+ Ok(())
}
- // TODO(abusch) fix this
- /// Exec the specified command in the container
- pub fn exec(
+ async fn exec_create(
&self,
opts: &ExecContainerOptions,
- ) -> impl Stream
- {
- let data = opts.serialize().unwrap(); // TODO fixme
- let bytes = data.into_bytes();
- let docker2 = self.docker.clone();
- self.docker
- .post(
+ ) -> Result {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ let body: Body = opts.serialize()?.into();
+
+ let Response { id } = self
+ .docker
+ .post_json(
&format!("/containers/{}/exec", self.id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
+ Some((body, mime::APPLICATION_JSON)),
)
- .map(move |res| {
- let data = "{}";
- let bytes = data.as_bytes();
- let id = serde_json::from_str::(res.as_str())
- .ok()
- .and_then(|v| {
- v.as_object()
- .and_then(|v| v.get("Id"))
- .and_then(|v| v.as_str().map(|v| v.to_string()))
- })
- .unwrap(); // TODO fixme
-
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(docker2.stream_post(
- &format!("/exec/{}/start", id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- None::>,
- ));
- FramedRead::new(chunk_stream, decoder)
- })
- .flatten_stream()
+ .await?;
+
+ Ok(id)
+ }
+
+ fn exec_start(
+ &self,
+ id: String,
+ ) -> impl Stream
- > + 'a {
+ let bytes: &[u8] = b"{}";
+
+ let stream = Box::pin(self.docker.stream_post(
+ format!("/exec/{}/start", id),
+ Some((bytes.into(), mime::APPLICATION_JSON)),
+ None::>,
+ ));
+
+ tty::decode(stream)
+ }
+
+ pub fn exec(
+ &'a self,
+ opts: &'a ExecContainerOptions,
+ ) -> impl Stream
- > + Unpin + 'a {
+ Box::pin(
+ async move {
+ let id = self.exec_create(opts).await?;
+ Ok(self.exec_start(id))
+ }
+ .try_flatten_stream(),
+ )
}
/// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
@@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn copy_from(
&self,
path: &Path,
- ) -> impl Stream
- , Error = Error> {
+ ) -> impl Stream
- >> + 'a {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
- self.docker
- .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg))
- .map(|c| c.to_vec())
+
+ let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg);
+ self.docker.stream_get(endpoint).map_ok(|c| c.to_vec())
}
/// Copy a byte slice as file into (see `bytes`) the container.
///
/// The file will be copied at the given location (see `path`) and will be owned by root
/// with access mask 644.
- pub fn copy_file_into>(
+ pub async fn copy_file_into>(
&self,
path: P,
bytes: &[u8],
- ) -> impl Fut