summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEli W. Hunter <42009212+elihunter173@users.noreply.github.com>2020-07-23 23:54:12 -0400
committerGitHub <noreply@github.com>2020-07-23 23:54:12 -0400
commit6cd1d7f93bd6f150341582a1b54087cefffdbf87 (patch)
tree88c109ec79e679d5aa041b20f074cf7b57d97cda
parenta4cd2185976ad56b880d5a10374c4dee6d116e6a (diff)
downloadshiplift-6cd1d7f93bd6f150341582a1b54087cefffdbf87.tar.gz
shiplift-6cd1d7f93bd6f150341582a1b54087cefffdbf87.tar.xz
Async/Await Support (continuation of #191) (#229)
* it builds! * remove unused dependencies * bump dependencies * reimplement 'exec' endpoint * update a few more examples * update remaining examples * fix doc tests, remove unused 'read' module * remove feature-gated async closures * split futures dependency to just 'futures-util' * update version and readme * make functions accepting Body generic over Into<Body> again * update changelog * reinstate 'unix-socket' feature * reinstate 'attach' endpoint * fix clippy lints * fix documentation typo * fix container copyfrom/into implementations * add convenience methods for TtyChunk struct * remove 'main' from code example to silence clippy lint * Update hyper to 0.13.1 * Add Send bounds to TtyWriter * Appease clippy * Fix examples * Update issue in changelog Co-authored-by: Daniel Eades <danieleades@hotmail.com> Co-authored-by: Marc Schreiber <marc.schreiber@aixigo.de>
-rw-r--r--CHANGELOG.md4
-rw-r--r--Cargo.toml21
-rw-r--r--examples/attach.rs32
-rw-r--r--examples/containercopyfrom.rs25
-rw-r--r--examples/containercopyinto.rs18
-rw-r--r--examples/containercreate.rs15
-rw-r--r--examples/containerdelete.rs14
-rw-r--r--examples/containerexec.rs37
-rw-r--r--examples/containerinspect.rs16
-rw-r--r--examples/containers.rs17
-rw-r--r--examples/custom_host.rs15
-rw-r--r--examples/events.rs19
-rw-r--r--examples/export.rs26
-rw-r--r--examples/imagebuild.rs25
-rw-r--r--examples/imagedelete.rs17
-rw-r--r--examples/imageinspect.rs16
-rw-r--r--examples/imagepull.rs23
-rw-r--r--examples/imagepull_auth.rs23
-rw-r--r--examples/images.rs19
-rw-r--r--examples/imagesearch.rs17
-rw-r--r--examples/imagetag.rs10
-rw-r--r--examples/import.rs21
-rw-r--r--examples/info.rs15
-rw-r--r--examples/logs.rs36
-rw-r--r--examples/networkconnect.rs14
-rw-r--r--examples/networkcreate.rs14
-rw-r--r--examples/networkdelete.rs14
-rw-r--r--examples/networkdisconnect.rs25
-rw-r--r--examples/networkinspect.rs16
-rw-r--r--examples/networks.rs20
-rw-r--r--examples/stats.rs21
-rw-r--r--examples/top.rs16
-rw-r--r--examples/version.rs15
-rw-r--r--examples/volumecreate.rs15
-rw-r--r--examples/volumedelete.rs14
-rw-r--r--examples/volumes.rs16
-rw-r--r--lib.rs0
-rw-r--r--src/errors.rs19
-rw-r--r--src/lib.rs793
-rw-r--r--src/read.rs105
-rw-r--r--src/transport.rs280
-rw-r--r--src/tty.rs372
42 files changed, 1097 insertions, 1153 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b4ad43..8e72e8c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+# 0.7.0
+
+* async-await support [#229](https://github.com/softprops/shiplift/pull/229)
+
# 0.6.0
* add chrono as an optional feature, enabled by default [#190](https://github.com/softprops/shiplift/pull/190)
diff --git a/Cargo.toml b/Cargo.toml
index c85ed4e..2dad6ce 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,28 +18,31 @@ coveralls = { repository = "softprops/shipflit" }
maintenance = { status = "actively-developed" }
[dependencies]
-log = "0.4"
-mime = "0.3"
base64 = "0.11"
byteorder = "1.3"
bytes = "0.4"
chrono = { version = "0.4", optional = true, features = ["serde"] }
flate2 = "1.0"
-futures = "0.1"
-hyper = "0.12"
-hyper-openssl = { version = "0.7", optional = true }
-hyperlocal = { version = "0.6", optional = true }
+futures-util = "0.3"
+futures_codec = "0.3"
+hyper = "0.13"
+hyper-openssl = { version = "0.8", optional = true }
+hyperlocal = { version = "0.7", optional = true }
+log = "0.4"
+mime = "0.3"
openssl = { version = "0.10", optional = true }
+pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tar = "0.4"
-tokio = "0.1"
-tokio-codec = "0.1"
-tokio-io = "0.1"
+tokio = "0.2"
url = "2.1"
[dev-dependencies]
env_logger = "0.7"
+# Required for examples to run
+futures = "0.3.1"
+tokio = { version = "0.2.6", features = ["macros"] }
[features]
default = ["chrono", "unix-socket", "tls"]
diff --git a/examples/attach.rs b/examples/attach.rs
new file mode 100644
index 0000000..e4ed637
--- /dev/null
+++ b/examples/attach.rs
@@ -0,0 +1,32 @@
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker};
+use std::env;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let docker = Docker::new();
+ let id = env::args()
+ .nth(1)
+ .expect("You need to specify a container id");
+
+ let tty_multiplexer = docker.containers().get(&id).attach().await?;
+
+ let (mut reader, _writer) = tty_multiplexer.split();
+
+ while let Some(tty_result) = reader.next().await {
+ match tty_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+
+ Ok(())
+}
+
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
+}
diff --git a/examples/containercopyfrom.rs b/examples/containercopyfrom.rs
index 2ebeccf..acbfa19 100644
--- a/examples/containercopyfrom.rs
+++ b/examples/containercopyfrom.rs
@@ -1,8 +1,10 @@
+use futures::TryStreamExt;
use shiplift::Docker;
use std::{env, path};
-use tokio::prelude::{Future, Stream};
+use tar::Archive;
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
let docker = Docker::new();
let id = env::args()
.nth(1)
@@ -10,17 +12,16 @@ fn main() {
let path = env::args()
.nth(2)
.expect("Usage: cargo run --example containercopyfrom -- <container> <path in container>");
- let fut = docker
+
+ let bytes = docker
.containers()
.get(&id)
.copy_from(path::Path::new(&path))
- .collect()
- .and_then(|stream| {
- let tar = stream.concat();
- let mut archive = tar::Archive::new(tar.as_slice());
- archive.unpack(env::current_dir()?)?;
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .try_concat()
+ .await?;
+
+ let mut archive = Archive::new(&bytes[..]);
+ archive.unpack(env::current_dir()?)?;
+
+ Ok(())
}
diff --git a/examples/containercopyinto.rs b/examples/containercopyinto.rs
index 63f0a2d..689e7af 100644
--- a/examples/containercopyinto.rs
+++ b/examples/containercopyinto.rs
@@ -1,8 +1,8 @@
use shiplift::Docker;
-use std::env;
-use tokio::prelude::Future;
+use std::{env, fs::File, io::Read};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args()
.nth(1)
@@ -11,17 +11,17 @@ fn main() {
.nth(2)
.expect("Usage: cargo run --example containercopyinto -- <local path> <container>");
- use std::{fs::File, io::prelude::*};
-
let mut file = File::open(&path).unwrap();
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)
.expect("Cannot read file on the localhost.");
- let fut = docker
+ if let Err(e) = docker
.containers()
.get(&id)
- .copy_file_into(path, &bytes[..])
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .copy_file_into(path, &bytes)
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/containercreate.rs b/examples/containercreate.rs
index d061f70..ef579a6 100644
--- a/examples/containercreate.rs
+++ b/examples/containercreate.rs
@@ -1,16 +1,19 @@
use shiplift::{ContainerOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let image = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
+
+ match docker
.containers()
.create(&ContainerOptions::builder(image.as_ref()).build())
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/containerdelete.rs b/examples/containerdelete.rs
index e3c2036..86bd9c5 100644
--- a/examples/containerdelete.rs
+++ b/examples/containerdelete.rs
@@ -1,16 +1,14 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify an container id");
- let fut = docker
- .containers()
- .get(&id)
- .delete()
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ if let Err(e) = docker.containers().get(&id).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/containerexec.rs b/examples/containerexec.rs
index 7f12f88..6c545a7 100644
--- a/examples/containerexec.rs
+++ b/examples/containerexec.rs
@@ -1,8 +1,9 @@
-use shiplift::{tty::StreamType, Docker, ExecContainerOptions};
-use std::env;
-use tokio::prelude::{Future, Stream};
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker, ExecContainerOptions};
+use std::{env, str::from_utf8};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
@@ -18,19 +19,19 @@ fn main() {
.attach_stdout(true)
.attach_stderr(true)
.build();
- let fut = docker
- .containers()
- .get(&id)
- .exec(&options)
- .for_each(|chunk| {
- match chunk.stream_type {
- StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
- StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
- StreamType::StdIn => unreachable!(),
- }
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ while let Some(exec_result) = docker.containers().get(&id).exec(&options).next().await {
+ match exec_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+}
+
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
}
diff --git a/examples/containerinspect.rs b/examples/containerinspect.rs
index 0f853bc..8de2a67 100644
--- a/examples/containerinspect.rs
+++ b/examples/containerinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example containerinspect -- <container>");
- let fut = docker
- .containers()
- .get(&id)
- .inspect()
- .map(|container| println!("{:#?}", container))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.containers().get(&id).inspect().await {
+ Ok(container) => println!("{:#?}", container),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/containers.rs b/examples/containers.rs
index 0eb51af..72de140 100644
--- a/examples/containers.rs
+++ b/examples/containers.rs
@@ -1,18 +1,15 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
- let fut = docker
- .containers()
- .list(&Default::default())
- .map(|containers| {
+ match docker.containers().list(&Default::default()).await {
+ Ok(containers) => {
for c in containers {
println!("container -> {:#?}", c)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/custom_host.rs b/examples/custom_host.rs
index 3b1fd3e..8b06eae 100644
--- a/examples/custom_host.rs
+++ b/examples/custom_host.rs
@@ -1,13 +1,10 @@
-use futures::Future;
use shiplift::Docker;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::host("http://yourhost".parse().unwrap());
-
- let fut = docker
- .ping()
- .map(|pong| println!("Ping: {}", pong))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ match docker.ping().await {
+ Ok(pong) => println!("Ping: {}", pong),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/events.rs b/examples/events.rs
index 0e35860..e44d68f 100644
--- a/examples/events.rs
+++ b/examples/events.rs
@@ -1,16 +1,15 @@
+use futures::StreamExt;
use shiplift::Docker;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("listening for events");
- let fut = docker
- .events(&Default::default())
- .for_each(|e| {
- println!("event -> {:?}", e);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ while let Some(event_result) = docker.events(&Default::default()).next().await {
+ match event_result {
+ Ok(event) => println!("event -> {:?}", event),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/export.rs b/examples/export.rs
index 55d1b7b..34f460d 100644
--- a/examples/export.rs
+++ b/examples/export.rs
@@ -1,8 +1,10 @@
-use shiplift::{errors::Error, Docker};
+use futures::StreamExt;
use std::{env, fs::OpenOptions, io::Write};
-use tokio::prelude::{Future, Stream};
-fn main() {
+use shiplift::{errors::Error, Docker};
+
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args().nth(1).expect("You need to specify an image id");
@@ -11,17 +13,13 @@ fn main() {
.create(true)
.open(format!("{}.tar", &id))
.unwrap();
+
let images = docker.images();
- let fut = images
- .get(&id)
- .export()
- .for_each(move |bytes| {
- export_file
- .write(&bytes[..])
- .map(|n| println!("copied {} bytes", n))
- .map_err(Error::IO)
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut)
+ while let Some(export_result) = images.get(&id).export().next().await {
+ match export_result.and_then(|bytes| export_file.write(&bytes).map_err(Error::from)) {
+ Ok(n) => println!("copied {} bytes", n),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagebuild.rs b/examples/imagebuild.rs
index 6dbea78..80d825c 100644
--- a/examples/imagebuild.rs
+++ b/examples/imagebuild.rs
@@ -1,19 +1,22 @@
+use futures::StreamExt;
use shiplift::{BuildOptions, Docker};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args().nth(1).expect("You need to specify a path");
- let fut = docker
- .images()
- .build(&BuildOptions::builder(path).tag("shiplift_test").build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ let options = BuildOptions::builder(path).tag("shiplift_test").build();
- tokio::run(fut);
+ let images = docker.images();
+
+ let mut stream = images.build(&options);
+
+ while let Some(build_result) = stream.next().await {
+ match build_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagedelete.rs b/examples/imagedelete.rs
index efa763c..3b9bc60 100644
--- a/examples/imagedelete.rs
+++ b/examples/imagedelete.rs
@@ -1,21 +1,18 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let img = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
- .images()
- .get(&img[..])
- .delete()
- .map(|statuses| {
+ match docker.images().get(&img).delete().await {
+ Ok(statuses) => {
for status in statuses {
println!("{:?}", status);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imageinspect.rs b/examples/imageinspect.rs
index 494480a..b7d2f9a 100644
--- a/examples/imageinspect.rs
+++ b/examples/imageinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example imageinspect -- <image>");
- let fut = docker
- .images()
- .get(&id)
- .inspect()
- .map(|image| println!("{:#?}", image))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.images().get(&id).inspect().await {
+ Ok(image) => println!("{:#?}", image),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagepull.rs b/examples/imagepull.rs
index 84a6149..5b3fbf4 100644
--- a/examples/imagepull.rs
+++ b/examples/imagepull.rs
@@ -1,22 +1,25 @@
// cargo run --example imagepull busybox
+use futures::StreamExt;
use shiplift::{Docker, PullOptions};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
.nth(1)
.expect("You need to specify an image name");
- let fut = docker
+
+ let mut stream = docker
.images()
- .pull(&PullOptions::builder().image(img).build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .pull(&PullOptions::builder().image(img).build());
+
+ while let Some(pull_result) = stream.next().await {
+ match pull_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/imagepull_auth.rs b/examples/imagepull_auth.rs
index 1c559c7..6f0ceec 100644
--- a/examples/imagepull_auth.rs
+++ b/examples/imagepull_auth.rs
@@ -1,10 +1,11 @@
// cargo run --example imagepull_auth busybox username password
+use futures::StreamExt;
use shiplift::{Docker, PullOptions, RegistryAuth};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
@@ -16,13 +17,15 @@ fn main() {
.username(username)
.password(password)
.build();
- let fut = docker
+
+ let mut stream = docker
.images()
- .pull(&PullOptions::builder().image(img).auth(auth).build())
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .pull(&PullOptions::builder().image(img).auth(auth).build());
+
+ while let Some(pull_result) = stream.next().await {
+ match pull_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("{}", e),
+ }
+ }
}
diff --git a/examples/images.rs b/examples/images.rs
index 7a8a094..1c0852e 100644
--- a/examples/images.rs
+++ b/examples/images.rs
@@ -1,13 +1,14 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("docker images in stock");
- let fut = docker
- .images()
- .list(&Default::default())
- .map(|images| {
+
+ let result = docker.images().list(&Default::default()).await;
+
+ match result {
+ Ok(images) => {
for i in images {
println!(
"{} {} {:?}",
@@ -16,7 +17,7 @@ fn main() {
i.repo_tags.unwrap_or_else(|| vec!["none".into()])
);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagesearch.rs b/examples/imagesearch.rs
index a6d6e52..e2fd110 100644
--- a/examples/imagesearch.rs
+++ b/examples/imagesearch.rs
@@ -1,17 +1,16 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
println!("remote docker images in stock");
- let fut = docker
- .images()
- .search("rust")
- .map(|results| {
+
+ match docker.images().search("rust").await {
+ Ok(results) => {
for result in results {
println!("{} - {}", result.name, result.description);
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/imagetag.rs b/examples/imagetag.rs
index 7ae78dd..e36af76 100644
--- a/examples/imagetag.rs
+++ b/examples/imagetag.rs
@@ -2,9 +2,9 @@
use shiplift::{Docker, Image, TagOptions};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
let docker = Docker::new();
let img = env::args()
@@ -21,7 +21,7 @@ fn main() {
let image = Image::new(&docker, img);
- let fut = image.tag(&tag_opts).map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ if let Err(e) = image.tag(&tag_opts).await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/import.rs b/examples/import.rs
index 20c61e6..7ea35bd 100644
--- a/examples/import.rs
+++ b/examples/import.rs
@@ -1,8 +1,9 @@
+use futures::StreamExt;
use shiplift::Docker;
use std::{env, fs::File};
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let path = env::args()
.nth(1)
@@ -11,14 +12,12 @@ fn main() {
let reader = Box::from(f);
- let fut = docker
- .images()
- .import(reader)
- .for_each(|output| {
- println!("{:?}", output);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ let mut stream = docker.images().import(reader);
- tokio::run(fut);
+ while let Some(import_result) = stream.next().await {
+ match import_result {
+ Ok(output) => println!("{:?}", output),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/info.rs b/examples/info.rs
index 05fdede..76036e6 100644
--- a/examples/info.rs
+++ b/examples/info.rs
@@ -1,12 +1,11 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
- tokio::run(
- docker
- .info()
- .map(|info| println!("info {:?}", info))
- .map_err(|e| eprintln!("Error: {}", e)),
- );
+
+ match docker.info().await {
+ Ok(info) => println!("info {:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/logs.rs b/examples/logs.rs
index 57f12be..73c8045 100644
--- a/examples/logs.rs
+++ b/examples/logs.rs
@@ -1,25 +1,31 @@
-use shiplift::{tty::StreamType, Docker, LogsOptions};
+use futures::StreamExt;
+use shiplift::{tty::TtyChunk, Docker, LogsOptions};
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a container id");
- let fut = docker
+
+ let mut logs_stream = docker
.containers()
.get(&id)
- .logs(&LogsOptions::builder().stdout(true).stderr(true).build())
- .for_each(|chunk| {
- match chunk.stream_type {
- StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
- StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
- StreamType::StdIn => unreachable!(),
- }
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
+ .logs(&LogsOptions::builder().stdout(true).stderr(true).build());
+
+ while let Some(log_result) = logs_stream.next().await {
+ match log_result {
+ Ok(chunk) => print_chunk(chunk),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
+}
- tokio::run(fut);
+fn print_chunk(chunk: TtyChunk) {
+ match chunk {
+ TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()),
+ TtyChunk::StdIn(_) => unreachable!(),
+ }
}
diff --git a/examples/networkconnect.rs b/examples/networkconnect.rs
index 0cfc8fc..a04db63 100644
--- a/examples/networkconnect.rs
+++ b/examples/networkconnect.rs
@@ -1,18 +1,20 @@
use shiplift::{ContainerConnectionOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let networks = docker.networks();
+
match (env::args().nth(1), env::args().nth(2)) {
(Some(container_id), Some(network_id)) => {
- let fut = networks
+ if let Err(e) = networks
.get(&network_id)
.connect(&ContainerConnectionOptions::builder(&container_id).build())
- .map(|v| println!("{:?}", v))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
}
_ => eprintln!("please provide a container_id and network_id"),
}
diff --git a/examples/networkcreate.rs b/examples/networkcreate.rs
index 30bb41c..3173ab4 100644
--- a/examples/networkcreate.rs
+++ b/examples/networkcreate.rs
@@ -1,20 +1,22 @@
use shiplift::{Docker, NetworkCreateOptions};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let network_name = env::args()
.nth(1)
.expect("You need to specify a network name");
- let fut = docker
+ match docker
.networks()
.create(
&NetworkCreateOptions::builder(network_name.as_ref())
.driver("bridge")
.build(),
)
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/networkdelete.rs b/examples/networkdelete.rs
index 16fc4ab..e419f90 100644
--- a/examples/networkdelete.rs
+++ b/examples/networkdelete.rs
@@ -1,18 +1,14 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a network id");
- let fut = docker
- .networks()
- .get(&id)
- .delete()
- .map(|network| println!("{:?}", network))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ if let Err(e) = docker.networks().get(&id).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/networkdisconnect.rs b/examples/networkdisconnect.rs
index 9588ecc..8d58b35 100644
--- a/examples/networkdisconnect.rs
+++ b/examples/networkdisconnect.rs
@@ -1,18 +1,27 @@
use shiplift::{ContainerConnectionOptions, Docker};
use std::env;
-use tokio::prelude::Future;
-fn main() {
+async fn network_disconnect(
+ container_id: &str,
+ network_id: &str,
+) {
let docker = Docker::new();
let networks = docker.networks();
+
+ if let Err(e) = networks
+ .get(network_id)
+ .disconnect(&ContainerConnectionOptions::builder(container_id).build())
+ .await
+ {
+ eprintln!("Error: {}", e)
+ }
+}
+
+#[tokio::main]
+async fn main() {
match (env::args().nth(1), env::args().nth(2)) {
(Some(container_id), Some(network_id)) => {
- let fut = networks
- .get(&network_id)
- .disconnect(&ContainerConnectionOptions::builder(&container_id).build())
- .map(|v| println!("{:?}", v))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ network_disconnect(&container_id, &network_id).await;
}
_ => eprintln!("please provide a container_id and network_id"),
}
diff --git a/examples/networkinspect.rs b/examples/networkinspect.rs
index 86a076b..143c637 100644
--- a/examples/networkinspect.rs
+++ b/examples/networkinspect.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("You need to specify a network id");
- let fut = docker
- .networks()
- .get(&id)
- .inspect()
- .map(|network| println!("{:#?}", network))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.networks().get(&id).inspect().await {
+ Ok(network_info) => println!("{:#?}", network_info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/networks.rs b/examples/networks.rs
index 9ceea99..4a1dcf1 100644
--- a/examples/networks.rs
+++ b/examples/networks.rs
@@ -1,17 +1,17 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
env_logger::init();
+
let docker = Docker::new();
- let fut = docker
- .networks()
- .list(&Default::default())
- .map(|networks| {
+
+ match docker.networks().list(&Default::default()).await {
+ Ok(networks) => {
for network in networks {
- println!("network -> {:#?}", network);
+ println!("network -> {:#?}", network)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/stats.rs b/examples/stats.rs
index 9e14cf4..063f502 100644
--- a/examples/stats.rs
+++ b/examples/stats.rs
@@ -1,21 +1,20 @@
// cargo run --example stats -- <container>
+use futures::StreamExt;
use shiplift::Docker;
use std::env;
-use tokio::prelude::{Future, Stream};
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let containers = docker.containers();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example stats -- <container>");
- let fut = containers
- .get(&id)
- .stats()
- .for_each(|stat| {
- println!("{:?}", stat);
- Ok(())
- })
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ while let Some(result) = containers.get(&id).stats().next().await {
+ match result {
+ Ok(stat) => println!("{:?}", stat),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+ }
}
diff --git a/examples/top.rs b/examples/top.rs
index 5fc4229..39e8ea6 100644
--- a/examples/top.rs
+++ b/examples/top.rs
@@ -1,17 +1,15 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let id = env::args()
.nth(1)
.expect("Usage: cargo run --example top -- <container>");
- let fut = docker
- .containers()
- .get(&id)
- .top(Default::default())
- .map(|top| println!("{:#?}", top))
- .map_err(|e| eprintln!("Error: {}", e));
- tokio::run(fut);
+
+ match docker.containers().get(&id).top(Default::default()).await {
+ Ok(top) => println!("{:#?}", top),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/version.rs b/examples/version.rs
index 6125e56..e1ab5d2 100644
--- a/examples/version.rs
+++ b/examples/version.rs
@@ -1,13 +1,10 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
- env_logger::init();
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
- let fut = docker
- .version()
- .map(|ver| println!("version -> {:#?}", ver))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ match docker.version().await {
+ Ok(ver) => println!("version -> {:#?}", ver),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/volumecreate.rs b/examples/volumecreate.rs
index 83f5045..a243be6 100644
--- a/examples/volumecreate.rs
+++ b/examples/volumecreate.rs
@@ -1,8 +1,8 @@
use shiplift::{builder::VolumeCreateOptions, Docker};
use std::{collections::HashMap, env};
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
@@ -13,15 +13,16 @@ fn main() {
let mut labels = HashMap::new();
labels.insert("com.github.softprops", "shiplift");
- let fut = volumes
+ match volumes
.create(
&VolumeCreateOptions::builder()
.name(volume_name.as_ref())
.labels(&labels)
.build(),
)
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ .await
+ {
+ Ok(info) => println!("{:?}", info),
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/examples/volumedelete.rs b/examples/volumedelete.rs
index 3800d22..ec1da7e 100644
--- a/examples/volumedelete.rs
+++ b/examples/volumedelete.rs
@@ -1,8 +1,8 @@
use shiplift::Docker;
use std::env;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
@@ -10,11 +10,7 @@ fn main() {
.nth(1)
.expect("You need to specify an volume name");
- let fut = volumes
- .get(&volume_name)
- .delete()
- .map(|info| println!("{:?}", info))
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ if let Err(e) = volumes.get(&volume_name).delete().await {
+ eprintln!("Error: {}", e)
+ }
}
diff --git a/examples/volumes.rs b/examples/volumes.rs
index c5548ec..d45c00a 100644
--- a/examples/volumes.rs
+++ b/examples/volumes.rs
@@ -1,18 +1,16 @@
use shiplift::Docker;
-use tokio::prelude::Future;
-fn main() {
+#[tokio::main]
+async fn main() {
let docker = Docker::new();
let volumes = docker.volumes();
- let fut = volumes
- .list()
- .map(|volumes| {
+ match volumes.list().await {
+ Ok(volumes) => {
for v in volumes {
println!("volume -> {:#?}", v)
}
- })
- .map_err(|e| eprintln!("Error: {}", e));
-
- tokio::run(fut);
+ }
+ Err(e) => eprintln!("Error: {}", e),
+ }
}
diff --git a/lib.rs b/lib.rs
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib.rs
diff --git a/src/errors.rs b/src/errors.rs
index 5506198..9d01f91 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -1,8 +1,10 @@
//! Representations of various client errors
-use hyper::{self, StatusCode};
+use hyper::{self, http, StatusCode};
use serde_json::Error as SerdeError;
-use std::{error::Error as StdError, fmt, io::Error as IoError, string::FromUtf8Error};
+use std::{error::Error as StdError, fmt, string::FromUtf8Error};
+
+use futures_util::io::Error as IoError;
#[derive(Debug)]
pub enum Error {
@@ -34,12 +36,25 @@ impl From<hyper::http::Error> for Error {
}
}
+impl From<http::uri::InvalidUri> for Error {
+ fn from(error: http::uri::InvalidUri) -> Self {
+ let http_error: http::Error = error.into();
+ http_error.into()
+ }
+}
+
impl From<IoError> for Error {
fn from(error: IoError) -> Error {
Error::IO(error)
}
}
+impl From<FromUtf8Error> for Error {
+ fn from(error: FromUtf8Error) -> Error {
+ Error::Encoding(error)
+ }
+}
+
impl fmt::Display for Error {
fn fmt(
&self,
diff --git a/src/lib.rs b/src/lib.rs
index 16e5625..06b31a1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,22 +3,22 @@
//! # examples
//!
//! ```no_run
-//! use tokio::prelude::Future;
-//!
+//! # async {
//! let docker = shiplift::Docker::new();
-//! let fut = docker.images().list(&Default::default()).map(|images| {
-//! println!("docker images in stock");
-//! for i in images {
-//! println!("{:?}", i.repo_tags);
-//! }
-//! }).map_err(|e| eprintln!("Something bad happened! {}", e));
//!
-//! tokio::run(fut);
+//! match docker.images().list(&Default::default()).await {
+//! Ok(images) => {
+//! for image in images {
+//! println!("{:?}", image.repo_tags);
+//! }
+//! },
+//! Err(e) => eprintln!("Something bad happened! {}", e),
+//! }
+//! # };
//! ```
pub mod builder;
pub mod errors;
-pub mod read;
pub mod rep;
pub mod transport;
pub mod tty;
@@ -35,7 +35,6 @@ pub use crate::{
errors::Error,
};
use crate::{
- read::StreamReader,
rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit,
History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo,
@@ -43,9 +42,14 @@ use crate::{
Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep,
},
transport::{tar, Transport},
- tty::TtyDecoder,
+ tty::Multiplexer as TtyMultiPlexer,
+};
+use futures_util::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+ TryFutureExt, TryStreamExt,
};
-use futures::{future::Either, Future, IntoFuture, Stream};
+// use futures::{future::Either, Future, IntoFuture, Stream};
pub use hyper::Uri;
use hyper::{client::HttpConnector, Body, Client, Method};
#[cfg(feature = "tls")]
@@ -56,8 +60,7 @@ use mime::Mime;
#[cfg(feature = "tls")]
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
use serde_json::Value;
-use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration};
-use tokio_codec::{FramedRead, LinesCodec};
+use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration};
use url::form_urlencoded;
/// Represents the result of all docker operations
@@ -70,19 +73,19 @@ pub struct Docker {
}
/// Interface for accessing and manipulating a named docker image
-pub struct Image<'a, 'b> {
+pub struct Image<'a> {
docker: &'a Docker,
- name: Cow<'b, str>,
+ name: Cow<'a, str>,
}
-impl<'a, 'b> Image<'a, 'b> {
+impl<'a> Image<'a> {
/// Exports an interface for operations that may be performed against a named image
pub fn new<S>(
docker: &'a Docker,
name: S,
- ) -> Image<'a, 'b>
+ ) -> Self
where
- S: Into<Cow<'b, str>>,
+ S: Into<Cow<'a, str>>,
{
Image {
docker,
@@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> {
}
/// Inspects a named image's details
- pub fn inspect(&self) -> impl Future<Item = ImageDetails, Error = Error> {
+ pub async fn inspect(&self) -> Result<ImageDetails> {
self.docker
.get_json(&format!("/images/{}/json", self.name)[..])
+ .await
}
/// Lists the history of the images set of changes
- pub fn history(&self) -> impl Future<Item = Vec<History>, Error = Error> {
+ pub async fn history(&self) -> Result<Vec<History>> {
self.docker
.get_json(&format!("/images/{}/history", self.name)[..])
+ .await
}
/// Deletes an image
- pub fn delete(&self) -> impl Future<Item = Vec<Status>, Error = Error> {
+ pub async fn delete(&self) -> Result<Vec<Status>> {
self.docker
.delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..])
+ .await
}
/// Export this image to a tarball
- pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
- self.docker
- .stream_get(&format!("/images/{}/get", self.name)[..])
- .map(|c| c.to_vec())
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'a {
+ Box::pin(
+ self.docker
+ .stream_get(format!("/images/{}/get", self.name))
+ .map_ok(|c| c.to_vec()),
+ )
}
/// Adds a tag to an image
- pub fn tag(
+ pub async fn tag(
&self,
opts: &TagOptions,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/images/{}/tag", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ let _ = self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
}
@@ -141,76 +150,74 @@ impl<'a> Images<'a> {
/// Builds a new image build by reading a Dockerfile in a target directory
pub fn build(
- &self,
- opts: &BuildOptions,
- ) -> impl Stream<Item = Value, Error = Error> {
- let mut path = vec!["/build".to_owned()];
- if let Some(query) = opts.serialize() {
- path.push(query)
- }
+ &'a self,
+ opts: &'a BuildOptions,
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut path = vec!["/build".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+
+ let mut bytes = Vec::default();
+
+ tarball::dir(&mut bytes, &opts.path[..])?;
+
+ let chunk_stream = self.docker.stream_post(
+ path.join("?"),
+ Some((Body::from(bytes), tar())),
+ None::<iter::Empty<_>>,
+ );
- let mut bytes = vec![];
-
- match tarball::dir(&mut bytes, &opts.path[..]) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- &path.join("?"),
- Some((Body::from(bytes), tar())),
- None::<iter::Empty<_>>,
- )
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::<Value>()
- .collect::<Vec<_>>(),
- )
- .map_err(Error::from)
- })
- .flatten(),
- ) as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- }
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
/// Lists the docker images on the current docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &ImageListOptions,
- ) -> impl Future<Item = Vec<ImageRep>, Error = Error> {
+ ) -> Result<Vec<ImageRep>> {
let mut path = vec!["/images/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.docker.get_json::<Vec<ImageRep>>(&path.join("?"))
+ self.docker.get_json::<Vec<ImageRep>>(&path.join("?")).await
}
/// Returns a reference to a set of operations available for a named image
- pub fn get<'b>(
+ pub fn get(
&self,
- name: &'b str,
- ) -> Image<'a, 'b> {
+ name: &'a str,
+ ) -> Image<'a> {
Image::new(self.docker, name)
}
/// Search for docker images by term
- pub fn search(
+ pub async fn search(
&self,
term: &str,
- ) -> impl Future<Item = Vec<SearchResult>, Error = Error> {
+ ) -> Result<Vec<SearchResult>> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("term", term)
.finish();
self.docker
.get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..])
+ .await
}
/// Pull and create a new docker images from an existing image
pub fn pull(
&self,
opts: &PullOptions,
- ) -> impl Stream<Item = Value, Error = Error> {
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
@@ -218,18 +225,15 @@ impl<'a> Images<'a> {
let headers = opts
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
- self.docker
- .stream_post::<Body, _>(&path.join("?"), None, headers)
- // todo: give this a proper enum type
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::<Value>()
- .collect::<Vec<_>>(),
- )
- .map_err(Error::from)
- })
- .flatten()
+
+ Box::pin(
+ self.docker
+ .stream_post(path.join("?"), None, headers)
+ .and_then(move |chunk| {
+ // todo: give this a proper enum type
+ futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from))
+ }),
+ )
}
/// exports a collection of named images,
@@ -237,14 +241,14 @@ impl<'a> Images<'a> {
pub fn export(
&self,
names: Vec<&str>,
- ) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
self.docker
- .stream_get(&format!("/images/get?{}", query)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/images/get?{}", query))
+ .map_ok(|c| c.to_vec())
}
/// imports an image or set of images from a given tarball source
@@ -252,43 +256,44 @@ impl<'a> Images<'a> {
pub fn import(
self,
mut tarball: Box<dyn Read>,
- ) -> impl Stream<Item = Value, Error = Error> {
- let mut bytes = Vec::new();
-
- match tarball.read_to_end(&mut bytes) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- "/images/load",
- Some((Body::from(bytes), tar())),
- None::<iter::Empty<_>>,
- )
- .and_then(|bytes| {
- serde_json::from_slice::<'_, Value>(&bytes[..])
- .map_err(Error::from)
- .into_future()
- }),
- ) as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- }
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut bytes = Vec::default();
+
+ tarball.read_to_end(&mut bytes)?;
+
+ let chunk_stream = self.docker.stream_post(
+ "/images/load",
+ Some((Body::from(bytes), tar())),
+ None::<iter::Empty<_>>,
+ );
+
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
}
/// Interface for accessing and manipulating a docker container
-pub struct Container<'a, 'b> {
+pub struct Container<'a> {
docker: &'a Docker,
- id: Cow<'b, str>,
+ id: Cow<'a, str>,
}
-impl<'a, 'b> Container<'a, 'b> {
+impl<'a> Container<'a> {
/// Exports an interface exposing operations against a container instance
pub fn new<S>(
docker: &'a Docker,
id: S,
- ) -> Container<'a, 'b>
+ ) -> Self
where
- S: Into<Cow<'b, str>>,
+ S: Into<Cow<'a, str>>,
{
Container {
docker,
@@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> {
}
/// Inspects the current docker container instance's details
- pub fn inspect(&self) -> impl Future<Item = ContainerDetails, Error = Error> {
+ pub async fn inspect(&self) -> Result<ContainerDetails> {
self.docker
.get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..])
+ .await
}
/// Returns a `top` view of information about the container process
- pub fn top(
+ pub async fn top(
&self,
psargs: Option<&str>,
- ) -> impl Future<Item = Top, Error = Error> {
+ ) -> Result<Top> {
let mut path = vec![format!("/containers/{}/top", self.id)];
if let Some(ref args) = psargs {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.get_json(&path.join("?"))
+ self.docker.get_json(&path.join("?")).await
}
/// Returns a stream of logs emitted but the container instance
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream<Item = tty::Chunk, Error = Error> {
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?")));
+ let stream = Box::pin(self.docker.stream_get(path.join("?")));
- FramedRead::new(chunk_stream, decoder)
+ Box::pin(tty::decode(stream))
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach(&self) -> impl Future<Item = tty::Multiplexed, Error = Error> {
- self.docker.stream_post_upgrade_multiplexed::<Body>(
- &format!(
- "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
- self.id
- ),
- None,
- )
+ /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
+ async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'a> {
+ self.docker
+ .stream_post_upgrade(
+ format!(
+ "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
+ self.id
+ ),
+ None,
+ )
+ .await
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach_blocking(&self) -> Result<tty::MultiplexedBlocking> {
- self.attach().map(|s| s.wait()).wait()
+ /// Attaches a `[TtyMultiplexer]` to the container.
+ ///
+ /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin.
+ ///
+ /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method
+ pub async fn attach(&self) -> Result<TtyMultiPlexer<'a>> {
+ let tcp_stream = self.attach_raw().await?;
+
+ Ok(TtyMultiPlexer::new(tcp_stream))
}
/// Returns a set of changes made to the container instance
- pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> {
+ pub async fn changes(&self) -> Result<Vec<Change>> {
self.docker
.get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..])
+ .await
}
/// Exports the current docker container into a tarball
- pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
self.docker
- .stream_get(&format!("/containers/{}/export", self.id)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/containers/{}/export", self.id))
+ .map_ok(|c| c.to_vec())
}
/// Returns a stream of stats specific to this container instance
- pub fn stats(&self) -> impl Stream<Item = Stats, Error = Error> {
- let decoder = LinesCodec::new();
- let stream_of_chunks = StreamReader::new(
- self.docker
- .stream_get(&format!("/containers/{}/stats", self.id)[..]),
- );
+ pub fn stats(&'a self) -> impl Stream<Item = Result<Stats>> + Unpin + 'a {
+ let codec = futures_codec::LinesCodec {};
- FramedRead::new(stream_of_chunks, decoder)
- .map_err(Error::IO)
- .and_then(|s| {
- serde_json::from_str::<Stats>(&s)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ let reader = Box::pin(
+ self.docker
+ .stream_get(format!("/containers/{}/stats", self.id))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
}
/// Start the container instance
- pub fn start(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn start(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/start", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/start", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Stop the container instance
- pub fn stop(
+ pub async fn stop(
&self,
wait: Option<Duration>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/stop", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> {
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Restart the container instance
- pub fn restart(
+ pub async fn restart(
&self,
wait: Option<Duration>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/restart", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Kill the container instance
- pub fn kill(
+ pub async fn kill(
&self,
signal: Option<&str>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/kill", self.id)];
if let Some(sig) = signal {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Rename the container instance
- pub fn rename(
+ pub async fn rename(
&self,
name: &str,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish();
self.docker
- .post::<Body>(
+ .post(
&format!("/containers/{}/rename?{}", self.id, query)[..],
None,
)
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Pause the container instance
- pub fn pause(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn pause(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/pause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/pause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Unpause the container instance
- pub fn unpause(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn unpause(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/unpause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/unpause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Wait until the container stops
- pub fn wait(&self) -> impl Future<Item = Exit, Error = Error> {
+ pub async fn wait(&self) -> Result<Exit> {
self.docker
- .post_json::<Body, Exit>(&format!("/containers/{}/wait", self.id)[..], None)
+ .post_json(
+ format!("/containers/{}/wait", self.id),
+ Option::<(Body, Mime)>::None,
+ )
+ .await
}
/// Delete the container instance
///
/// Use remove instead to use the force/v options.
- pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/containers/{}", self.id)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Delete the container instance (todo: force/v)
- pub fn remove(
+ pub async fn remove(
&self,
opts: RmContainerOptions,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.delete(&path.join("?")).map(|_| ())
+ self.docker.delete(&path.join("?")).await?;
+ Ok(())
}
- // TODO(abusch) fix this
- /// Exec the specified command in the container
- pub fn exec(
+ async fn exec_create(
&self,
opts: &ExecContainerOptions,
- ) -> impl Stream<Item = tty::Chunk, Error = Error> {
- let data = opts.serialize().unwrap(); // TODO fixme
- let bytes = data.into_bytes();
- let docker2 = self.docker.clone();
- self.docker
- .post(
+ ) -> Result<String> {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ let body: Body = opts.serialize()?.into();
+
+ let Response { id } = self
+ .docker
+ .post_json(
&format!("/containers/{}/exec", self.id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
+ Some((body, mime::APPLICATION_JSON)),
)
- .map(move |res| {
- let data = "{}";
- let bytes = data.as_bytes();
- let id = serde_json::from_str::<Value>(res.as_str())
- .ok()
- .and_then(|v| {
- v.as_object()
- .and_then(|v| v.get("Id"))
- .and_then(|v| v.as_str().map(|v| v.to_string()))
- })
- .unwrap(); // TODO fixme
-
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(docker2.stream_post(
- &format!("/exec/{}/start", id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- None::<iter::Empty<_>>,
- ));
- FramedRead::new(chunk_stream, decoder)
- })
- .flatten_stream()
+ .await?;
+
+ Ok(id)
+ }
+
+ fn exec_start(
+ &self,
+ id: String,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + 'a {
+ let bytes: &[u8] = b"{}";
+
+ let stream = Box::pin(self.docker.stream_post(
+ format!("/exec/{}/start", id),
+ Some((bytes.into(), mime::APPLICATION_JSON)),
+ None::<iter::Empty<_>>,
+ ));
+
+ tty::decode(stream)
+ }
+
+ pub fn exec(
+ &'a self,
+ opts: &'a ExecContainerOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let id = self.exec_create(opts).await?;
+ Ok(self.exec_start(id))
+ }
+ .try_flatten_stream(),
+ )
}
/// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
@@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn copy_from(
&self,
path: &Path,
- ) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
- self.docker
- .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg))
- .map(|c| c.to_vec())
+
+ let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg);
+ self.docker.stream_get(endpoint).map_ok(|c| c.to_vec())
}
/// Copy a byte slice as file into (see `bytes`) the container.
///
/// The file will be copied at the given location (see `path`) and will be owned by root
/// with access mask 644.
- pub fn copy_file_into<P: AsRef<Path>>(
+ pub async fn copy_file_into<P: AsRef<Path>>(
&self,
path: P,
bytes: &[u8],
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let path = path.as_ref();
let mut ar = tar::Builder::new(Vec::new());
@@ -588,9 +630,10 @@ impl<'a, 'b> Container<'a, 'b> {
self.docker
.put(
&format!("/containers/{}/archive?{}", self.id, path_arg),
- body,
+ body.map(|(body, mime)| (body.into(), mime)),
)
- .map(|_| ())
+ .await?;
+ Ok(())
}
}
@@ -606,36 +649,33 @@ impl<'a> Containers<'a> {
}
/// Lists the container instances on the docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &ContainerListOptions,
- ) -> impl Future<Item = Vec<ContainerRep>, Error = Error> {
+ ) -> Result<Vec<ContainerRep>> {
let mut path = vec!["/containers/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.get_json::<Vec<ContainerRep>>(&path.join("?"))
+ self.docker
+ .get_json::<Vec<ContainerRep>>(&path.join("?"))
+ .await
}
/// Returns a reference to a set of operations available to a specific container instance
- pub fn get<'b>(
+ pub fn get(
&self,
- name: &'b str,
- ) -> Container<'a, 'b> {
+ name: &'a str,
+ ) -> Container<'a> {
Container::new(self.docker, name)
}
/// Returns a builder interface for creating a new container instance
- pub fn create(
+ pub async fn create(
&self,
opts: &ContainerOptions,
- ) -> impl Future<Item = ContainerCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
-
- let bytes = data.into_bytes();
+ ) -> Result<ContainerCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let mut path = vec!["/containers/create".to_owned()];
if let Some(ref name) = opts.name {
@@ -646,10 +686,9 @@ impl<'a> Containers<'a> {
);
}
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
}
@@ -665,15 +704,15 @@ impl<'a> Networks<'a> {
}
/// List the docker networks on the current docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &NetworkListOptions,
- ) -> impl Future<Item = Vec<NetworkInfo>, Error = Error> {
+ ) -> Result<Vec<NetworkInfo>> {
let mut path = vec!["/networks".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.docker.get_json(&path.join("?"))
+ self.docker.get_json(&path.join("?")).await
}
/// Returns a reference to a set of operations available to a specific network instance
@@ -685,21 +724,16 @@ impl<'a> Networks<'a> {
}
/// Create a new Network instance
- pub fn create(
+ pub async fn create(
&self,
opts: &NetworkCreateOptions,
- ) -> impl Future<Item = NetworkCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
- let bytes = data.into_bytes();
+ ) -> Result<NetworkCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let path = vec!["/networks/create".to_owned()];
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
}
@@ -730,52 +764,50 @@ impl<'a, 'b> Network<'a, 'b> {
}
/// Inspects the current docker network instance's details
- pub fn inspect(&self) -> impl Future<Item = NetworkInfo, Error = Error> {
- self.docker.get_json(&format!("/networks/{}", self.id)[..])
+ pub async fn inspect(&self) -> Result<NetworkInfo> {
+ self.docker
+ .get_json(&format!("/networks/{}", self.id)[..])
+ .await
}
/// Delete the network instance
- pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/networks/{}", self.id)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Connect container to network
- pub fn connect(
+ pub async fn connect(
&self,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- self.do_connection("connect", opts)
+ ) -> Result<()> {
+ self.do_connection("connect", opts).await
}
/// Disconnect container to network
- pub fn disconnect(
+ pub async fn disconnect(
&self,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- self.do_connection("disconnect", opts)
+ ) -> Result<()> {
+ self.do_connection("disconnect", opts).await
}
- fn do_connection(
+ async fn do_connection(
&self,
segment: &str,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
- let bytes = data.into_bytes();
+ ) -> Result<()> {
+ let body: Body = opts.serialize()?.into();
- Either::B(
- self.docker
- .post(
- &format!("/networks/{}/{}", self.id, segment)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- )
- .map(|_| ()),
- )
+ self.docker
+ .post(
+ &format!("/networks/{}/{}", self.id, segment)[..],
+ Some((body, mime::APPLICATION_JSON)),
+ )
+ .await?;
+ Ok(())
}
}
@@ -790,34 +822,27 @@ impl<'a> Volumes<'a> {
Volumes { docker }
}
- pub fn create(
+ pub async fn create(
&self,
opts: &VolumeCreateOptions,
- ) -> impl Future<Item = VolumeCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
-
- let bytes = data.into_bytes();
+ ) -> Result<VolumeCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let path = vec!["/volumes/create".to_owned()];
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
/// Lists the docker volumes on the current docker host
- pub fn list(&self) -> impl Future<Item = Vec<VolumeRep>, Error = Error> {
+ pub async fn list(&self) -> Result<Vec<VolumeRep>> {
let path = vec!["/volumes".to_owned()];
- self.docker
- .get_json::<VolumesRep>(&path.join("?"))
- .map(|volumes: VolumesRep| match volumes.volumes {
- Some(volumes) => volumes,
- None => vec![],
- })
+ let volumes_rep = self.docker.get_json::<VolumesRep>(&path.join("?")).await?;
+ Ok(match volumes_rep.volumes {
+ Some(volumes) => volumes,
+ None => vec![],
+ })
}
/// Returns a reference to a set of operations available for a named volume
@@ -851,15 +876,16 @@ impl<'a, 'b> Volume<'a, 'b> {
}
/// Deletes a volume
- pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/volumes/{}", self.name)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
}
fn get_http_connector() -> HttpConnector {
- let mut http = HttpConnector::new(1);
+ let mut http = HttpConnector::new();
http.enforce_http(false);
http
@@ -950,7 +976,9 @@ impl Docker {
{
Docker {
transport: Transport::Unix {
- client: Client::builder().keep_alive(false).build(UnixConnector),
+ client: Client::builder()
+ .pool_max_idle_per_host(0)
+ .build(UnixConnector),
path: socket_path.into(),
},
}
@@ -960,12 +988,12 @@ impl Docker {
pub fn host(host: Uri) -> Docker {
let tcp_host_str = format!(
"{}://{}:{}",
- host.scheme_part().map(|s| s.as_str()).unwrap(),
+ host.scheme_str().unwrap(),
host.host().unwrap().to_owned(),
host.port_u16().unwrap_or(80)
);
- match host.scheme_part().map(|s| s.as_str()) {
+ match host.scheme_str() {
#[cfg(feature = "unix-socket")]
Some("unix") => Docker {
transport: Transport::Unix {
@@ -1000,153 +1028,152 @@ impl Docker {
}
/// Returns version information associated with the docker daemon
- pub fn version(&self) -> impl Future<Item = Version, Error = Error> {
- self.get_json("/version")
+ pub async fn version(&self) -> Result<Version> {
+ self.get_json("/version").await
}
/// Returns information associated with the docker daemon
- pub fn info(&self) -> impl Future<Item = Info, Error = Error> {
- self.get_json("/info")
+ pub async fn info(&self) -> Result<Info> {
+ self.get_json("/info").await
}
/// Returns a simple ping response indicating the docker daemon is accessible
- pub fn ping(&self) -> impl Future<Item = String, Error = Error> {
- self.get("/_ping")
+ pub async fn ping(&self) -> Result<String> {
+ self.get("/_ping").await
}
/// Returns a stream of docker events
- pub fn events(
- &self,
- opts: &EventsOptions,
- ) -> impl Stream<Item = Event, Error = Error> {
+ pub fn events<'a>(
+ &'a self,
+ opts: &'a EventsOptions,
+ ) -> impl Stream<Item = Result<Event>> + Unpin + 'a {
let mut path = vec!["/events".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- let stream_of_chunks = self.stream_get(&path.join("?")[..]);
- let reader = StreamReader::new(stream_of_chunks);
- FramedRead::new(reader, LinesCodec::new())
- .map_err(Error::IO)
- .and_then(|line| serde_json::from_str::<Event>(&line).map_err(Error::from))
+ let reader = Box::pin(
+ self.stream_get(path.join("?"))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ let codec = futures_codec::LinesCodec {};
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
}
//
// Utility functions to make requests
//
- fn get(
+ async fn get(
&self,
endpoint: &str,
- ) -> impl Future<Item = String, Error = Error> {
- self.transport.request::<Body>(Method::GET, endpoint, None)
+ ) -> Result<String> {
+ self.transport
+ .request(Method::GET, endpoint, Option::<(Body, Mime)>::None)
+ .await
}
- fn get_json<T: serde::de::DeserializeOwned>(
+ async fn get_json<T: serde::de::DeserializeOwned>(
&self,
endpoint: &str,
- ) -> impl Future<Item = T, Error = Error> {
- self.transport
- .request::<Body>(Method::GET, endpoint, None)
- .and_then(|v| {
- serde_json::from_str::<T>(&v)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ ) -> Result<T> {
+ let raw_string = self
+ .transport
+ .request(Method::GET, endpoint, Option::<(Body, Mime)>::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&raw_string)?)
}
- fn post<B>(
+ async fn post(
&self,
endpoint: &str,
- body: Option<(B, Mime)>,
- ) -> impl Future<Item = String, Error = Error>
- where
- B: Into<Body>,
- {
- self.transport.request(Method::POST, endpoint, body)
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport.request(Method::POST, endpoint, body).await
}
- fn put<B>(
+ async fn put(
&self,
endpoint: &str,
- body: Option<(B, Mime)>,
- ) -> impl Future<Item = String, Error = Error>
- where
- B: Into<Body>,
- {
- self.transport.request(Method::PUT, endpoint, body)
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport.request(Method::PUT, endpoint, body).await
}
- fn post_json<B, T>(
+ async fn post_json<T, B>(
&self,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
- ) -> impl Future<Item = T, Error = Error>
+ ) -> Result<T>
where
- B: Into<Body>,
T: serde::de::DeserializeOwned,
+ B: Into<Body>,
{
- self.transport
- .request(Method::POST, endpoint, body)
- .and_then(|v| {
- serde_json::from_str::<T>(&v)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ let string = self.transport.request(Method::POST, endpoint, body).await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
}
- fn delete(
+ async fn delete(
&self,
endpoint: &str,
- ) -> impl Future<Item = String, Error = Error> {
+ ) -> Result<String> {
self.transport
- .request::<Body>(Method::DELETE, endpoint, None)
+ .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None)
+ .await
}
- fn delete_json<T: serde::de::DeserializeOwned>(
+ async fn delete_json<T: serde::de::DeserializeOwned>(
&self,
endpoint: &str,
- ) -> impl Future<Item = T, Error = Error> {
- self.transport
- .request::<Body>(Method::DELETE, endpoint, None)
- .and_then(|v| {
- serde_json::from_str::<T>(&v)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ ) -> Result<T> {
+ let string = self
+ .transport
+ .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
}
- fn stream_post<B, H>(
- &self,
- endpoint: &str,
- body: Option<(B, Mime)>,
+ fn stream_post<'a, H>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
headers: Option<H>,
- ) -> impl Stream<Item = hyper::Chunk, Error = Error>
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
where
- B: Into<Body>,
- H: IntoIterator<Item = (&'static str, String)>,
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
{
self.transport
.stream_chunks(Method::POST, endpoint, body, headers)
}
- fn stream_get(
- &self,
- endpoint: &str,
- ) -> impl Stream<Item = hyper::Chunk, Error = Error> {
+ fn stream_get<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + Unpin + 'a,
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
+ let headers = Some(Vec::default());
self.transport
- .stream_chunks::<Body, iter::Empty<_>>(Method::GET, endpoint, None, None)
+ .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
}
- fn stream_post_upgrade_multiplexed<B>(
- &self,
- endpoint: &str,
- body: Option<(B, Mime)>,
- ) -> impl Future<Item = tty::Multiplexed, Error = Error>
- where
- B: Into<Body> + 'static,
- {
+ async fn stream_post_upgrade<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
self.transport
- .stream_upgrade_multiplexed(Method::POST, endpoint, body)
+ .stream_upgrade(Method::POST, endpoint, body)
+ .await
}
}
diff --git a/src/read.rs b/src/read.rs
deleted file mode 100644
index b9dc5ef..0000000
--- a/src/read.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-use crate::errors::Error;
-use futures::{Async, Stream};
-use hyper::Chunk;
-use std::{
- cmp,
- io::{self, Read},
-};
-use tokio_io::AsyncRead;
-
-/*
- * The following is taken from
- * https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/read.rs.
- * TODO: see with upstream author to move to a separate crate.
- */
-
-/// The state of a stream returning Chunks.
-///
-enum ReadState {
- /// A chunk is ready to be read from.
- ///
- Ready(Chunk, usize),
-
- /// The next chunk isn't ready yet.
- ///
- NotReady,
-}
-
-/// Reads from a stream of chunks asynchronously.
-///
-pub struct StreamReader<S> {
- stream: S,
- state: ReadState,
-}
-
-impl<S> StreamReader<S>
-where
- S: Stream<Item = Chunk, Error = Error>,
-{
- #[inline]
- pub fn new(stream: S) -> StreamReader<S> {
- StreamReader {
- stream,
- state: ReadState::NotReady,
- }
- }
-}
-
-impl<S> Read for StreamReader<S>
-where
- S: Stream<Item = Chunk, Error = Error>,
-{
- fn read(
- &mut self,
- buf: &mut [u8],
- ) -> io::Result<usize> {
- loop {
- let ret;
-
- match self.state {
- // Stream yielded a Chunk to read.
- //
- ReadState::Ready(ref mut chunk, ref mut pos) => {
- let chunk_start = *pos;
- let len = cmp::min(buf.len(), chunk.len() - chunk_start);
- let chunk_end = chunk_start + len;
-
- buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
- *pos += len;
-
- if *pos == chunk.len() {
- ret = len;
- } else {
- return Ok(len);
- }
- }
- // Stream is not ready, and a Chunk needs to be read.
- //
- ReadState::NotReady => {
- match self.stream.poll() {
- // Polling stream yielded a Chunk that can be read from.
- //
- Ok(Async::Ready(Some(chunk))) => {
- self.state = ReadState::Ready(chunk, 0);
-
- continue;
- }
- // Polling stream yielded EOF.
- //
- Ok(Async::Ready(None)) => return Ok(0),
- // Stream could not be read from.
- //
- Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
- Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
- }
- }
- }
-
- self.state = ReadState::NotReady;
-
- return Ok(ret);
- }
- }
-}
-
-impl<S> AsyncRead for StreamReader<S> where S: Stream<Item = Chunk, Error = Error> {}
diff --git a/src/transport.rs b/src/transport.rs
index 7cc1fb1..112d548 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -1,13 +1,15 @@
//! Transports for communicating with the docker daemon
use crate::{Error, Result};
-use futures::{
- future::{self, Either},
- Future, IntoFuture, Stream,
+use futures_util::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+ StreamExt, TryFutureExt,
};
use hyper::{
+ body::Bytes,
client::{Client, HttpConnector},
- header, Body, Chunk, Method, Request, StatusCode,
+ header, Body, Method, Request, StatusCode,
};
#[cfg(feature = "tls")]
use hyper_openssl::HttpsConnector;
@@ -15,12 +17,14 @@ use hyper_openssl::HttpsConnector;
use hyperlocal::UnixConnector;
#[cfg(feature = "unix-socket")]
use hyperlocal::Uri as DomainUri;
-use log::debug;
use mime::Mime;
+use pin_project::pin_project;
use serde::{Deserialize, Serialize};
-use serde_json;
-use std::{fmt, iter};
-use tokio_io::{AsyncRead, AsyncWrite};
+use std::{
+ fmt, io, iter,
+ pin::Pin,
+ task::{Context, Poll},
+};
pub fn tar() -> Mime {
"application/tar".parse().unwrap()
@@ -66,116 +70,133 @@ impl fmt::Debug for Transport {
impl Transport {
/// Make a request and return the whole response in a `String`
- pub fn request<B>(
+ pub async fn request<B>(
&self,
method: Method,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
- ) -> impl Future<Item = String, Error = Error>
+ ) -> Result<String>
where
B: Into<Body>,
{
- let endpoint = endpoint.to_string();
- self.stream_chunks(method, &endpoint, body, None::<iter::Empty<_>>)
- .concat2()
- .and_then(|v| {
- String::from_utf8(v.to_vec())
- .map_err(Error::Encoding)
- .into_future()
- })
- .inspect(move |body| debug!("{} raw response: {}", endpoint, body))
+ let body = self
+ .get_body(method, endpoint, body, None::<iter::Empty<_>>)
+ .await?;
+ let bytes = hyper::body::to_bytes(body).await?;
+ let string = String::from_utf8(bytes.to_vec())?;
+
+ Ok(string)
}
- /// Make a request and return a `Stream` of `Chunks` as they are returned.
- pub fn stream_chunks<B, H>(
+ async fn get_body<B, H>(
&self,
method: Method,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
headers: Option<H>,
- ) -> impl Stream<Item = Chunk, Error = Error>
+ ) -> Result<Body>
where
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
{
let req = self
- .build_request(method, endpoint, body, headers, |_| ())
+ .build_request(method, endpoint, body, headers, Request::builder())
.expect("Failed to build request!");
- self.send_request(req)
- .and_then(|res| {
- let status = res.status();
- match status {
- // Success case: pass on the response
- StatusCode::OK
- | StatusCode::CREATED
- | StatusCode::SWITCHING_PROTOCOLS
- | StatusCode::NO_CONTENT => Either::A(future::ok(res)),
- // Error case: parse the body to try to extract the error message
- _ => Either::B(
- res.into_body()
- .concat2()
- .map_err(Error::Hyper)
- .and_then(|v| {
- String::from_utf8(v.into_iter().collect::<Vec<u8>>())
- .map_err(Error::Encoding)
- })
- .and_then(move |body| {
- future::err(Error::Fault {
- code: status,
- message: Self::get_error_message(&body).unwrap_or_else(|| {
- status
- .canonical_reason()
- .unwrap_or_else(|| "unknown error code")
- .to_owned()
- }),
- })
- }),
- ),
- }
- })
- .map(|r| {
- // Convert the response body into a stream of chunks
- r.into_body().map_err(Error::Hyper)
- })
- .flatten_stream()
+ let response = self.send_request(req).await?;
+
+ let status = response.status();
+
+ match status {
+ // Success case: pass on the response
+ StatusCode::OK
+ | StatusCode::CREATED
+ | StatusCode::SWITCHING_PROTOCOLS
+ | StatusCode::NO_CONTENT => Ok(response.into_body()),
+ _ => {
+ let bytes = hyper::body::to_bytes(response.into_body()).await?;
+ let message_body = String::from_utf8(bytes.to_vec())?;
+
+ Err(Error::Fault {
+ code: status,
+ message: Self::get_error_message(&message_body).unwrap_or_else(|| {
+ status
+ .canonical_reason()
+ .unwrap_or_else(|| "unknown error code")
+ .to_owned()
+ }),
+ })
+ }
+ }
+ }
+
+ async fn get_chunk_stream<B, H>(
+ &self,
+ method: Method,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ headers: Option<H>,
+ ) -> Result<impl Stream<Item = Result<Bytes>>>
+ where
+ B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)>,
+ {
+ let body = self.get_body(method, endpoint, body, headers).await?;
+
+ Ok(stream_body(body))
+ }
+
+ pub fn stream_chunks<'a, H, B>(
+ &'a self,
+ method: Method,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(B, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<Bytes>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ B: Into<Body> + 'a,
+ {
+ self.get_chunk_stream(method, endpoint, body, headers)
+ .try_flatten_stream()
}
/// Builds an HTTP request.
fn build_request<B, H>(
&self,
method: Method,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
headers: Option<H>,
- f: impl FnOnce(&mut ::hyper::http::request::Builder),
+ builder: hyper::http::request::Builder,
) -> Result<Request<Body>>
where
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
{
- let mut builder = Request::builder();
- f(&mut builder);
-
let req = match *self {
Transport::Tcp { ref host, .. } => {
- builder.method(method).uri(&format!("{}{}", host, endpoint))
+ builder
+ .method(method)
+ .uri(&format!("{}{}", host, endpoint.as_ref()))
}
#[cfg(feature = "tls")]
Transport::EncryptedTcp { ref host, .. } => {
- builder.method(method).uri(&format!("{}{}", host, endpoint))
+ builder
+ .method(method)
+ .uri(&format!("{}{}", host, endpoint.as_ref()))
}
#[cfg(feature = "unix-socket")]
Transport::Unix { ref path, .. } => {
- let uri: hyper::Uri = DomainUri::new(&path, endpoint).into();
- builder.method(method).uri(&uri.to_string())
+ let uri = DomainUri::new(&path, endpoint.as_ref());
+ builder.method(method).uri(uri)
}
};
- let req = req.header(header::HOST, "");
+ let mut req = req.header(header::HOST, "");
if let Some(h) = headers {
for (k, v) in h.into_iter() {
- req.header(k, v);
+ req = req.header(k, v);
}
}
@@ -188,19 +209,17 @@ impl Transport {
}
/// Send the given request to the docker daemon and return a Future of the response.
- fn send_request(
+ async fn send_request(
&self,
req: Request<hyper::Body>,
- ) -> impl Future<Item = hyper::Response<Body>, Error = Error> {
- let req = match self {
- Transport::Tcp { ref client, .. } => client.request(req),
+ ) -> Result<hyper::Response<Body>> {
+ match self {
+ Transport::Tcp { ref client, .. } => Ok(client.request(req).await?),
#[cfg(feature = "tls")]
- Transport::EncryptedTcp { ref client, .. } => client.request(req),
+ Transport::EncryptedTcp { ref client, .. } => Ok(client.request(req).await?),
#[cfg(feature = "unix-socket")]
- Transport::Unix { ref client, .. } => client.request(req),
- };
-
- req.map_err(Error::Hyper)
+ Transport::Unix { ref client, .. } => Ok(client.request(req).await?),
+ }
}
/// Makes an HTTP request, upgrading the connection to a TCP
@@ -208,12 +227,12 @@ impl Transport {
///
/// This method can be used for operations such as viewing
/// docker container logs interactively.
- pub fn stream_upgrade<B>(
+ async fn stream_upgrade_tokio<B>(
&self,
method: Method,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
- ) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error>
+ ) -> Result<hyper::upgrade::Upgraded>
where
B: Into<Body>,
{
@@ -226,32 +245,37 @@ impl Transport {
};
let req = self
- .build_request(method, endpoint, body, None::<iter::Empty<_>>, |builder| {
- builder
+ .build_request(
+ method,
+ endpoint,
+ body,
+ None::<iter::Empty<_>>,
+ Request::builder()
.header(header::CONNECTION, "Upgrade")
- .header(header::UPGRADE, "tcp");
- })
+ .header(header::UPGRADE, "tcp"),
+ )
.expect("Failed to build request!");
- self.send_request(req)
- .and_then(|res| match res.status() {
- StatusCode::SWITCHING_PROTOCOLS => Ok(res),
- _ => Err(Error::ConnectionNotUpgraded),
- })
- .and_then(|res| res.into_body().on_upgrade().from_err())
+ let response = self.send_request(req).await?;
+
+ match response.status() {
+ StatusCode::SWITCHING_PROTOCOLS => Ok(response.into_body().on_upgrade().await?),
+ _ => Err(Error::ConnectionNotUpgraded),
+ }
}
- pub fn stream_upgrade_multiplexed<B>(
+ pub async fn stream_upgrade<B>(
&self,
method: Method,
- endpoint: &str,
+ endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
- ) -> impl Future<Item = crate::tty::Multiplexed, Error = Error>
+ ) -> Result<impl AsyncRead + AsyncWrite>
where
- B: Into<Body> + 'static,
+ B: Into<Body>,
{
- self.stream_upgrade(method, endpoint, body)
- .map(crate::tty::Multiplexed::new)
+ let tokio_multiplexer = self.stream_upgrade_tokio(method, endpoint, body).await?;
+
+ Ok(Compat { tokio_multiplexer })
}
/// Extract the error message content from an HTTP response that
@@ -263,7 +287,61 @@ impl Transport {
}
}
+#[pin_project]
+struct Compat<S> {
+ #[pin]
+ tokio_multiplexer: S,
+}
+
+impl<S> AsyncRead for Compat<S>
+where
+ S: tokio::io::AsyncRead,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().tokio_multiplexer.poll_read(cx, buf)
+ }
+}
+
+impl<S> AsyncWrite for Compat<S>
+where
+ S: tokio::io::AsyncWrite,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().tokio_multiplexer.poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().tokio_multiplexer.poll_flush(cx)
+ }
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().tokio_multiplexer.poll_shutdown(cx)
+ }
+}
+
#[derive(Serialize, Deserialize)]
struct ErrorResponse {
message: String,
}
+
+fn stream_body(body: Body) -> impl Stream<Item = Result<Bytes>> {
+ async fn unfold(mut body: Body) -> Option<(Result<Bytes>, Body)> {
+ let chunk_result = body.next().await?.map_err(Error::from);
+
+ Some((chunk_result, body))
+ }
+
+ futures_util::stream::unfold(body, unfold)
+}
diff --git a/src/tty.rs b/src/tty.rs
index a4ab27f..a26846f 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,278 +1,172 @@
-use crate::errors::Error;
-use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
-use bytes::BytesMut;
-use futures::{self, Async};
-use hyper::rt::{Future, Stream};
-use log::trace;
-use std::io::{self, Cursor};
-use tokio_codec::Decoder;
-use tokio_io::{AsyncRead, AsyncWrite};
-
-#[derive(Debug)]
-pub struct Chunk {
- pub stream_type: StreamType,
- pub data: Vec<u8>,
-}
-
-#[derive(Debug, Clone, Copy)]
-pub enum StreamType {
- StdIn,
- StdOut,
- StdErr,
-}
-
-/// A multiplexed stream.
-pub struct Multiplexed {
- stdin: Box<dyn AsyncWrite>,
- chunks: Box<dyn futures::Stream<Item = Chunk, Error = crate::Error>>,
-}
-
-pub struct MultiplexedBlocking {
- stdin: Box<dyn AsyncWrite>,
- chunks: Box<dyn Iterator<Item = Result<Chunk, crate::Error>>>,
-}
-
-/// Represent the current state of the decoding of a TTY frame
-enum TtyDecoderState {
- /// We have yet to read a frame header
- WaitingHeader,
- /// We have read a header and extracted the payload size and stream type,
- /// and are now waiting to read the corresponding payload
- WaitingPayload(usize, StreamType),
-}
-
-pub struct TtyDecoder {
- state: TtyDecoderState,
-}
-
-impl Chunk {
- /// Interprets the raw bytes as a string.
- ///
- /// Returns `None` if the raw bytes do not represent
- /// a valid UTF-8 string.
- pub fn as_string(&self) -> Option<String> {
- String::from_utf8(self.data.clone()).ok()
- }
-
- /// Unconditionally interprets the raw bytes as a string.
- ///
- /// Inserts placeholder symbols for all non-character bytes.
- pub fn as_string_lossy(&self) -> String {
- String::from_utf8_lossy(&self.data).into_owned()
+//! Types for working with docker TTY streams
+
+use crate::{Error, Result};
+use bytes::{BigEndian, ByteOrder};
+use futures_util::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite},
+ stream::{Stream, TryStreamExt},
+};
+use pin_project::pin_project;
+use std::io;
+
+/// An enum representing a chunk of TTY text streamed from a Docker container.
+///
+/// For convenience, this type can deref to the contained `Vec<u8>`.
+#[derive(Debug, Clone)]
+pub enum TtyChunk {
+ StdIn(Vec<u8>),
+ StdOut(Vec<u8>),
+ StdErr(Vec<u8>),
+}
+
+impl From<TtyChunk> for Vec<u8> {
+ fn from(tty_chunk: TtyChunk) -> Self {
+ match tty_chunk {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
+ }
}
}
-impl TtyDecoder {
- pub fn new() -> Self {
- Self {
- state: TtyDecoderState::WaitingHeader,
+impl AsRef<Vec<u8>> for TtyChunk {
+ fn as_ref(&self) -> &Vec<u8> {
+ match self {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
}
}
}
-impl Default for TtyDecoder {
- fn default() -> Self {
- Self::new()
+impl std::ops::Deref for TtyChunk {
+ type Target = Vec<u8>;
+ fn deref(&self) -> &Self::Target {
+ self.as_ref()
}
}
-impl Decoder for TtyDecoder {
- type Item = Chunk;
- type Error = Error;
-
- fn decode(
- &mut self,
- src: &mut BytesMut,
- ) -> Result<Option<Self::Item>, Self::Error> {
- loop {
- match self.state {
- TtyDecoderState::WaitingHeader => {
- if src.len() < 8 {
- trace!("Not enough data to read a header");
- return Ok(None);
- } else {
- trace!("Reading header");
- let header_bytes = src.split_to(8);
- let payload_size: Vec<u8> = header_bytes[4..8].to_vec();
- let stream_type = match header_bytes[0] {
- 0 => {
- return Err(Error::InvalidResponse(
- "Unsupported stream of type stdin".to_string(),
- ));
- }
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => {
- return Err(Error::InvalidResponse(format!(
- "Unsupported stream of type {}",
- n
- )));
- }
- };
-
- let length =
- Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
- trace!(
- "Read header: length = {}, stream_type = {:?}",
- length,
- stream_type
- );
- // We've successfully read a header, now we wait for the payload
- self.state = TtyDecoderState::WaitingPayload(length, stream_type);
- continue;
- }
- }
- TtyDecoderState::WaitingPayload(len, stream_type) => {
- if src.len() < len {
- trace!(
- "Not enough data to read payload. Need {} but only {} available",
- len,
- src.len()
- );
- return Ok(None);
- } else {
- trace!("Reading payload");
- let data = src.split_to(len)[..].to_owned();
- let tty_chunk = Chunk { stream_type, data };
-
- // We've successfully read a full frame, now we go back to waiting for the next
- // header
- self.state = TtyDecoderState::WaitingHeader;
- return Ok(Some(tty_chunk));
- }
- }
- }
+impl std::ops::DerefMut for TtyChunk {
+ fn deref_mut(&mut self) -> &mut Vec<u8> {
+ match self {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
}
}
}
-impl Multiplexed {
- /// Create a multiplexed stream.
- pub(crate) fn new<T>(stream: T) -> Multiplexed
- where
- T: AsyncRead + AsyncWrite + 'static,
- {
- let (reader, stdin) = stream.split();
- Multiplexed {
- chunks: Box::new(chunks(reader)),
- stdin: Box::new(stdin),
- }
- }
+async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
+where
+ S: AsyncRead + Unpin,
+{
+ let mut header_bytes = vec![0u8; 8];
- pub fn wait(self) -> MultiplexedBlocking {
- MultiplexedBlocking {
- stdin: self.stdin,
- chunks: Box::new(self.chunks.wait()),
- }
+ match stream.read_exact(&mut header_bytes).await {
+ Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
+ Err(e) => return Some((Err(Error::IO(e)), stream)),
+ _ => (),
}
-}
-
-impl futures::Stream for Multiplexed {
- type Item = Chunk;
- type Error = crate::Error;
- fn poll(&mut self) -> Result<Async<Option<Chunk>>, crate::Error> {
- self.chunks.poll()
- }
-}
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
-impl Iterator for MultiplexedBlocking {
- type Item = Result<Chunk, crate::Error>;
+ let mut data = vec![0u8; data_length as usize];
- fn next(&mut self) -> Option<Result<Chunk, crate::Error>> {
- self.chunks.next()
+ if stream.read_exact(&mut data).await.is_err() {
+ return None;
}
-}
-
-macro_rules! delegate_io_write {
- ($ty:ty) => {
- impl io::Write for $ty {
- fn write(
- &mut self,
- buf: &[u8],
- ) -> Result<usize, io::Error> {
- self.stdin.write(buf)
- }
- fn flush(&mut self) -> Result<(), io::Error> {
- self.stdin.flush()
- }
- }
+ let chunk = match header_bytes[0] {
+ 0 => TtyChunk::StdIn(data),
+ 1 => TtyChunk::StdOut(data),
+ 2 => TtyChunk::StdErr(data),
+ n => panic!("invalid stream number from docker daemon: '{}'", n),
};
-}
-delegate_io_write!(Multiplexed);
-delegate_io_write!(MultiplexedBlocking);
+ Some((Ok(chunk), stream))
+}
-pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = crate::Error>
+pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
where
- S: AsyncRead,
+ S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
{
- let stream = futures::stream::unfold(stream, |stream| {
- let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
+ let stream = hyper_chunk_stream
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+ .into_async_read();
- let fut = header_future.and_then(|(stream, header_bytes)| {
- let size_bytes = &header_bytes[4..];
- let data_length = BigEndian::read_u32(size_bytes);
- let stream_type = match header_bytes[0] {
- 0 => StreamType::StdIn,
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => panic!("invalid stream number from docker daemon: '{}'", n),
- };
+ futures_util::stream::unfold(stream, decode_chunk)
+}
- ::tokio_io::io::read_exact(stream, vec![0; data_length as usize])
- .map(move |(stream, data)| (Chunk { stream_type, data }, stream))
- });
- // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
- // stream)).
- // This is much better because it would allow us to swallow the unexpected eof and
- // stop the stream much cleaner than writing a custom stream filter.
- Some(fut)
- });
+type TtyReader<'a> = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'a>>;
+type TtyWriter<'a> = Pin<Box<dyn AsyncWrite + Send + 'a>>;
- util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof)
- .map_err(crate::Error::from)
+/// TTY multiplexer returned by the `attach` method.
+///
+/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin.
+#[pin_project]
+pub struct Multiplexer<'a> {
+ #[pin]
+ reader: TtyReader<'a>,
+ #[pin]
+ writer: TtyWriter<'a>,
}
-mod util {
- use futures::{Async, Stream};
+impl<'a> Multiplexer<'a> {
+ pub(crate) fn new<T>(tcp_connection: T) -> Self
+ where
+ T: AsyncRead + AsyncWrite + Send + 'a,
+ {
+ let (reader, writer) = tcp_connection.split();
- pub struct StopOnError<S, F> {
- stream: S,
- f: F,
+ Self {
+ reader: Box::pin(futures_util::stream::unfold(reader, |reader| {
+ decode_chunk(reader)
+ })),
+ writer: Box::pin(writer),
+ }
}
+}
- pub fn stop_on_err<S, F>(
- stream: S,
- f: F,
- ) -> StopOnError<S, F>
- where
- S: Stream,
- F: FnMut(&S::Error) -> bool,
- {
- StopOnError { stream, f }
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+impl<'a> Stream for Multiplexer<'a> {
+ type Item = Result<TtyChunk>;
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.project().reader.poll_next(cx)
}
+}
- impl<S, F> Stream for StopOnError<S, F>
- where
- S: Stream,
- F: FnMut(&S::Error) -> bool,
- {
- type Item = S::Item;
- type Error = S::Error;
+impl<'a> AsyncWrite for Multiplexer<'a> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().writer.poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().writer.poll_flush(cx)
+ }
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().writer.poll_close(cx)
+ }
+}
- fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
- match self.stream.poll() {
- Err(e) => {
- if (self.f)(&e) {
- Err(e)
- } else {
- Ok(Async::Ready(None))
- }
- }
- a => a,
- }
- }
+impl<'a> Multiplexer<'a> {
+ /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts
+ pub fn split(
+ self
+ ) -> (
+ impl Stream<Item = Result<TtyChunk>> + 'a,
+ impl AsyncWrite + Send + 'a,
+ ) {
+ (self.reader, self.writer)
}
}