From 6cd1d7f93bd6f150341582a1b54087cefffdbf87 Mon Sep 17 00:00:00 2001 From: "Eli W. Hunter" <42009212+elihunter173@users.noreply.github.com> Date: Thu, 23 Jul 2020 23:54:12 -0400 Subject: Async/Await Support (continuation of #191) (#229) * it builds! * remove unused dependencies * bump dependencies * reimplement 'exec' endpoint * update a few more examples * update remaining examples * fix doc tests, remove unused 'read' module * remove feature-gated async closures * split futures dependency to just 'futures-util' * update version and readme * make functions accepting Body generic over Into again * update changelog * reinstate 'unix-socket' feature * reinstate 'attach' endpoint * fix clippy lints * fix documentation typo * fix container copyfrom/into implementations * add convenience methods for TtyChunk struct * remove 'main' from code example to silence clippy lint * Update hyper to 0.13.1 * Add Send bounds to TtyWriter * Appease clippy * Fix examples * Update issue in changelog Co-authored-by: Daniel Eades Co-authored-by: Marc Schreiber --- CHANGELOG.md | 4 + Cargo.toml | 21 +- examples/attach.rs | 32 ++ examples/containercopyfrom.rs | 25 +- examples/containercopyinto.rs | 18 +- examples/containercreate.rs | 15 +- examples/containerdelete.rs | 14 +- examples/containerexec.rs | 37 +- examples/containerinspect.rs | 16 +- examples/containers.rs | 17 +- examples/custom_host.rs | 15 +- examples/events.rs | 19 +- examples/export.rs | 26 +- examples/imagebuild.rs | 25 +- examples/imagedelete.rs | 17 +- examples/imageinspect.rs | 16 +- examples/imagepull.rs | 23 +- examples/imagepull_auth.rs | 23 +- examples/images.rs | 19 +- examples/imagesearch.rs | 17 +- examples/imagetag.rs | 10 +- examples/import.rs | 21 +- examples/info.rs | 15 +- examples/logs.rs | 36 +- examples/networkconnect.rs | 14 +- examples/networkcreate.rs | 14 +- examples/networkdelete.rs | 14 +- examples/networkdisconnect.rs | 25 +- examples/networkinspect.rs | 16 +- examples/networks.rs | 20 +- examples/stats.rs | 21 +- examples/top.rs | 16 +- examples/version.rs | 15 +- examples/volumecreate.rs | 15 +- examples/volumedelete.rs | 14 +- examples/volumes.rs | 16 +- lib.rs | 0 src/errors.rs | 19 +- src/lib.rs | 793 ++++++++++++++++++++++-------------------- src/read.rs | 105 ------ src/transport.rs | 280 +++++++++------ src/tty.rs | 372 +++++++------------- 42 files changed, 1097 insertions(+), 1153 deletions(-) create mode 100644 examples/attach.rs create mode 100644 lib.rs delete mode 100644 src/read.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b4ad43..8e72e8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.7.0 + +* async-await support [#229](https://github.com/softprops/shiplift/pull/229) + # 0.6.0 * add chrono as an optional feature, enabled by default [#190](https://github.com/softprops/shiplift/pull/190) diff --git a/Cargo.toml b/Cargo.toml index c85ed4e..2dad6ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,28 +18,31 @@ coveralls = { repository = "softprops/shipflit" } maintenance = { status = "actively-developed" } [dependencies] -log = "0.4" -mime = "0.3" base64 = "0.11" byteorder = "1.3" bytes = "0.4" chrono = { version = "0.4", optional = true, features = ["serde"] } flate2 = "1.0" -futures = "0.1" -hyper = "0.12" -hyper-openssl = { version = "0.7", optional = true } -hyperlocal = { version = "0.6", optional = true } +futures-util = "0.3" +futures_codec = "0.3" +hyper = "0.13" +hyper-openssl = { version = "0.8", optional = true } +hyperlocal = { version = "0.7", optional = true } +log = "0.4" +mime = "0.3" openssl = { version = "0.10", optional = true } +pin-project = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tar = "0.4" -tokio = "0.1" -tokio-codec = "0.1" -tokio-io = "0.1" +tokio = "0.2" url = "2.1" [dev-dependencies] env_logger = "0.7" +# Required for examples to run +futures = "0.3.1" +tokio = { version = "0.2.6", features = ["macros"] } [features] default = ["chrono", "unix-socket", "tls"] diff --git a/examples/attach.rs b/examples/attach.rs new file mode 100644 index 0000000..e4ed637 --- /dev/null +++ b/examples/attach.rs @@ -0,0 +1,32 @@ +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let docker = Docker::new(); + let id = env::args() + .nth(1) + .expect("You need to specify a container id"); + + let tty_multiplexer = docker.containers().get(&id).attach().await?; + + let (mut reader, _writer) = tty_multiplexer.split(); + + while let Some(tty_result) = reader.next().await { + match tty_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } + + Ok(()) +} + +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } +} diff --git a/examples/containercopyfrom.rs b/examples/containercopyfrom.rs index 2ebeccf..acbfa19 100644 --- a/examples/containercopyfrom.rs +++ b/examples/containercopyfrom.rs @@ -1,8 +1,10 @@ +use futures::TryStreamExt; use shiplift::Docker; use std::{env, path}; -use tokio::prelude::{Future, Stream}; +use tar::Archive; -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box> { let docker = Docker::new(); let id = env::args() .nth(1) @@ -10,17 +12,16 @@ fn main() { let path = env::args() .nth(2) .expect("Usage: cargo run --example containercopyfrom -- "); - let fut = docker + + let bytes = docker .containers() .get(&id) .copy_from(path::Path::new(&path)) - .collect() - .and_then(|stream| { - let tar = stream.concat(); - let mut archive = tar::Archive::new(tar.as_slice()); - archive.unpack(env::current_dir()?)?; - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .try_concat() + .await?; + + let mut archive = Archive::new(&bytes[..]); + archive.unpack(env::current_dir()?)?; + + Ok(()) } diff --git a/examples/containercopyinto.rs b/examples/containercopyinto.rs index 63f0a2d..689e7af 100644 --- a/examples/containercopyinto.rs +++ b/examples/containercopyinto.rs @@ -1,8 +1,8 @@ use shiplift::Docker; -use std::env; -use tokio::prelude::Future; +use std::{env, fs::File, io::Read}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args() .nth(1) @@ -11,17 +11,17 @@ fn main() { .nth(2) .expect("Usage: cargo run --example containercopyinto -- "); - use std::{fs::File, io::prelude::*}; - let mut file = File::open(&path).unwrap(); let mut bytes = Vec::new(); file.read_to_end(&mut bytes) .expect("Cannot read file on the localhost."); - let fut = docker + if let Err(e) = docker .containers() .get(&id) - .copy_file_into(path, &bytes[..]) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .copy_file_into(path, &bytes) + .await + { + eprintln!("Error: {}", e) + } } diff --git a/examples/containercreate.rs b/examples/containercreate.rs index d061f70..ef579a6 100644 --- a/examples/containercreate.rs +++ b/examples/containercreate.rs @@ -1,16 +1,19 @@ use shiplift::{ContainerOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let image = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker + + match docker .containers() .create(&ContainerOptions::builder(image.as_ref()).build()) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/containerdelete.rs b/examples/containerdelete.rs index e3c2036..86bd9c5 100644 --- a/examples/containerdelete.rs +++ b/examples/containerdelete.rs @@ -1,16 +1,14 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify an container id"); - let fut = docker - .containers() - .get(&id) - .delete() - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + if let Err(e) = docker.containers().get(&id).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/containerexec.rs b/examples/containerexec.rs index 7f12f88..6c545a7 100644 --- a/examples/containerexec.rs +++ b/examples/containerexec.rs @@ -1,8 +1,9 @@ -use shiplift::{tty::StreamType, Docker, ExecContainerOptions}; -use std::env; -use tokio::prelude::{Future, Stream}; +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker, ExecContainerOptions}; +use std::{env, str::from_utf8}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) @@ -18,19 +19,19 @@ fn main() { .attach_stdout(true) .attach_stderr(true) .build(); - let fut = docker - .containers() - .get(&id) - .exec(&options) - .for_each(|chunk| { - match chunk.stream_type { - StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()), - StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()), - StreamType::StdIn => unreachable!(), - } - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + while let Some(exec_result) = docker.containers().get(&id).exec(&options).next().await { + match exec_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } +} + +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } } diff --git a/examples/containerinspect.rs b/examples/containerinspect.rs index 0f853bc..8de2a67 100644 --- a/examples/containerinspect.rs +++ b/examples/containerinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example containerinspect -- "); - let fut = docker - .containers() - .get(&id) - .inspect() - .map(|container| println!("{:#?}", container)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.containers().get(&id).inspect().await { + Ok(container) => println!("{:#?}", container), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/containers.rs b/examples/containers.rs index 0eb51af..72de140 100644 --- a/examples/containers.rs +++ b/examples/containers.rs @@ -1,18 +1,15 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); - let fut = docker - .containers() - .list(&Default::default()) - .map(|containers| { + match docker.containers().list(&Default::default()).await { + Ok(containers) => { for c in containers { println!("container -> {:#?}", c) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/custom_host.rs b/examples/custom_host.rs index 3b1fd3e..8b06eae 100644 --- a/examples/custom_host.rs +++ b/examples/custom_host.rs @@ -1,13 +1,10 @@ -use futures::Future; use shiplift::Docker; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::host("http://yourhost".parse().unwrap()); - - let fut = docker - .ping() - .map(|pong| println!("Ping: {}", pong)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + match docker.ping().await { + Ok(pong) => println!("Ping: {}", pong), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/events.rs b/examples/events.rs index 0e35860..e44d68f 100644 --- a/examples/events.rs +++ b/examples/events.rs @@ -1,16 +1,15 @@ +use futures::StreamExt; use shiplift::Docker; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("listening for events"); - let fut = docker - .events(&Default::default()) - .for_each(|e| { - println!("event -> {:?}", e); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + while let Some(event_result) = docker.events(&Default::default()).next().await { + match event_result { + Ok(event) => println!("event -> {:?}", event), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/export.rs b/examples/export.rs index 55d1b7b..34f460d 100644 --- a/examples/export.rs +++ b/examples/export.rs @@ -1,8 +1,10 @@ -use shiplift::{errors::Error, Docker}; +use futures::StreamExt; use std::{env, fs::OpenOptions, io::Write}; -use tokio::prelude::{Future, Stream}; -fn main() { +use shiplift::{errors::Error, Docker}; + +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args().nth(1).expect("You need to specify an image id"); @@ -11,17 +13,13 @@ fn main() { .create(true) .open(format!("{}.tar", &id)) .unwrap(); + let images = docker.images(); - let fut = images - .get(&id) - .export() - .for_each(move |bytes| { - export_file - .write(&bytes[..]) - .map(|n| println!("copied {} bytes", n)) - .map_err(Error::IO) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut) + while let Some(export_result) = images.get(&id).export().next().await { + match export_result.and_then(|bytes| export_file.write(&bytes).map_err(Error::from)) { + Ok(n) => println!("copied {} bytes", n), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagebuild.rs b/examples/imagebuild.rs index 6dbea78..80d825c 100644 --- a/examples/imagebuild.rs +++ b/examples/imagebuild.rs @@ -1,19 +1,22 @@ +use futures::StreamExt; use shiplift::{BuildOptions, Docker}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args().nth(1).expect("You need to specify a path"); - let fut = docker - .images() - .build(&BuildOptions::builder(path).tag("shiplift_test").build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + let options = BuildOptions::builder(path).tag("shiplift_test").build(); - tokio::run(fut); + let images = docker.images(); + + let mut stream = images.build(&options); + + while let Some(build_result) = stream.next().await { + match build_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagedelete.rs b/examples/imagedelete.rs index efa763c..3b9bc60 100644 --- a/examples/imagedelete.rs +++ b/examples/imagedelete.rs @@ -1,21 +1,18 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let img = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker - .images() - .get(&img[..]) - .delete() - .map(|statuses| { + match docker.images().get(&img).delete().await { + Ok(statuses) => { for status in statuses { println!("{:?}", status); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imageinspect.rs b/examples/imageinspect.rs index 494480a..b7d2f9a 100644 --- a/examples/imageinspect.rs +++ b/examples/imageinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example imageinspect -- "); - let fut = docker - .images() - .get(&id) - .inspect() - .map(|image| println!("{:#?}", image)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.images().get(&id).inspect().await { + Ok(image) => println!("{:#?}", image), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagepull.rs b/examples/imagepull.rs index 84a6149..5b3fbf4 100644 --- a/examples/imagepull.rs +++ b/examples/imagepull.rs @@ -1,22 +1,25 @@ // cargo run --example imagepull busybox +use futures::StreamExt; use shiplift::{Docker, PullOptions}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker + + let mut stream = docker .images() - .pull(&PullOptions::builder().image(img).build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .pull(&PullOptions::builder().image(img).build()); + + while let Some(pull_result) = stream.next().await { + match pull_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagepull_auth.rs b/examples/imagepull_auth.rs index 1c559c7..6f0ceec 100644 --- a/examples/imagepull_auth.rs +++ b/examples/imagepull_auth.rs @@ -1,10 +1,11 @@ // cargo run --example imagepull_auth busybox username password +use futures::StreamExt; use shiplift::{Docker, PullOptions, RegistryAuth}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() @@ -16,13 +17,15 @@ fn main() { .username(username) .password(password) .build(); - let fut = docker + + let mut stream = docker .images() - .pull(&PullOptions::builder().image(img).auth(auth).build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .pull(&PullOptions::builder().image(img).auth(auth).build()); + + while let Some(pull_result) = stream.next().await { + match pull_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("{}", e), + } + } } diff --git a/examples/images.rs b/examples/images.rs index 7a8a094..1c0852e 100644 --- a/examples/images.rs +++ b/examples/images.rs @@ -1,13 +1,14 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("docker images in stock"); - let fut = docker - .images() - .list(&Default::default()) - .map(|images| { + + let result = docker.images().list(&Default::default()).await; + + match result { + Ok(images) => { for i in images { println!( "{} {} {:?}", @@ -16,7 +17,7 @@ fn main() { i.repo_tags.unwrap_or_else(|| vec!["none".into()]) ); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagesearch.rs b/examples/imagesearch.rs index a6d6e52..e2fd110 100644 --- a/examples/imagesearch.rs +++ b/examples/imagesearch.rs @@ -1,17 +1,16 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("remote docker images in stock"); - let fut = docker - .images() - .search("rust") - .map(|results| { + + match docker.images().search("rust").await { + Ok(results) => { for result in results { println!("{} - {}", result.name, result.description); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagetag.rs b/examples/imagetag.rs index 7ae78dd..e36af76 100644 --- a/examples/imagetag.rs +++ b/examples/imagetag.rs @@ -2,9 +2,9 @@ use shiplift::{Docker, Image, TagOptions}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() @@ -21,7 +21,7 @@ fn main() { let image = Image::new(&docker, img); - let fut = image.tag(&tag_opts).map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + if let Err(e) = image.tag(&tag_opts).await { + eprintln!("Error: {}", e) + } } diff --git a/examples/import.rs b/examples/import.rs index 20c61e6..7ea35bd 100644 --- a/examples/import.rs +++ b/examples/import.rs @@ -1,8 +1,9 @@ +use futures::StreamExt; use shiplift::Docker; use std::{env, fs::File}; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args() .nth(1) @@ -11,14 +12,12 @@ fn main() { let reader = Box::from(f); - let fut = docker - .images() - .import(reader) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + let mut stream = docker.images().import(reader); - tokio::run(fut); + while let Some(import_result) = stream.next().await { + match import_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/info.rs b/examples/info.rs index 05fdede..76036e6 100644 --- a/examples/info.rs +++ b/examples/info.rs @@ -1,12 +1,11 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); - tokio::run( - docker - .info() - .map(|info| println!("info {:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)), - ); + + match docker.info().await { + Ok(info) => println!("info {:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/logs.rs b/examples/logs.rs index 57f12be..73c8045 100644 --- a/examples/logs.rs +++ b/examples/logs.rs @@ -1,25 +1,31 @@ -use shiplift::{tty::StreamType, Docker, LogsOptions}; +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker, LogsOptions}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a container id"); - let fut = docker + + let mut logs_stream = docker .containers() .get(&id) - .logs(&LogsOptions::builder().stdout(true).stderr(true).build()) - .for_each(|chunk| { - match chunk.stream_type { - StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()), - StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()), - StreamType::StdIn => unreachable!(), - } - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + .logs(&LogsOptions::builder().stdout(true).stderr(true).build()); + + while let Some(log_result) = logs_stream.next().await { + match log_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } +} - tokio::run(fut); +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } } diff --git a/examples/networkconnect.rs b/examples/networkconnect.rs index 0cfc8fc..a04db63 100644 --- a/examples/networkconnect.rs +++ b/examples/networkconnect.rs @@ -1,18 +1,20 @@ use shiplift::{ContainerConnectionOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let networks = docker.networks(); + match (env::args().nth(1), env::args().nth(2)) { (Some(container_id), Some(network_id)) => { - let fut = networks + if let Err(e) = networks .get(&network_id) .connect(&ContainerConnectionOptions::builder(&container_id).build()) - .map(|v| println!("{:?}", v)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + eprintln!("Error: {}", e) + } } _ => eprintln!("please provide a container_id and network_id"), } diff --git a/examples/networkcreate.rs b/examples/networkcreate.rs index 30bb41c..3173ab4 100644 --- a/examples/networkcreate.rs +++ b/examples/networkcreate.rs @@ -1,20 +1,22 @@ use shiplift::{Docker, NetworkCreateOptions}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let network_name = env::args() .nth(1) .expect("You need to specify a network name"); - let fut = docker + match docker .networks() .create( &NetworkCreateOptions::builder(network_name.as_ref()) .driver("bridge") .build(), ) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/networkdelete.rs b/examples/networkdelete.rs index 16fc4ab..e419f90 100644 --- a/examples/networkdelete.rs +++ b/examples/networkdelete.rs @@ -1,18 +1,14 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a network id"); - let fut = docker - .networks() - .get(&id) - .delete() - .map(|network| println!("{:?}", network)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + if let Err(e) = docker.networks().get(&id).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/networkdisconnect.rs b/examples/networkdisconnect.rs index 9588ecc..8d58b35 100644 --- a/examples/networkdisconnect.rs +++ b/examples/networkdisconnect.rs @@ -1,18 +1,27 @@ use shiplift::{ContainerConnectionOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +async fn network_disconnect( + container_id: &str, + network_id: &str, +) { let docker = Docker::new(); let networks = docker.networks(); + + if let Err(e) = networks + .get(network_id) + .disconnect(&ContainerConnectionOptions::builder(container_id).build()) + .await + { + eprintln!("Error: {}", e) + } +} + +#[tokio::main] +async fn main() { match (env::args().nth(1), env::args().nth(2)) { (Some(container_id), Some(network_id)) => { - let fut = networks - .get(&network_id) - .disconnect(&ContainerConnectionOptions::builder(&container_id).build()) - .map(|v| println!("{:?}", v)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + network_disconnect(&container_id, &network_id).await; } _ => eprintln!("please provide a container_id and network_id"), } diff --git a/examples/networkinspect.rs b/examples/networkinspect.rs index 86a076b..143c637 100644 --- a/examples/networkinspect.rs +++ b/examples/networkinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a network id"); - let fut = docker - .networks() - .get(&id) - .inspect() - .map(|network| println!("{:#?}", network)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.networks().get(&id).inspect().await { + Ok(network_info) => println!("{:#?}", network_info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/networks.rs b/examples/networks.rs index 9ceea99..4a1dcf1 100644 --- a/examples/networks.rs +++ b/examples/networks.rs @@ -1,17 +1,17 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); + let docker = Docker::new(); - let fut = docker - .networks() - .list(&Default::default()) - .map(|networks| { + + match docker.networks().list(&Default::default()).await { + Ok(networks) => { for network in networks { - println!("network -> {:#?}", network); + println!("network -> {:#?}", network) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/stats.rs b/examples/stats.rs index 9e14cf4..063f502 100644 --- a/examples/stats.rs +++ b/examples/stats.rs @@ -1,21 +1,20 @@ // cargo run --example stats -- +use futures::StreamExt; use shiplift::Docker; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let containers = docker.containers(); let id = env::args() .nth(1) .expect("Usage: cargo run --example stats -- "); - let fut = containers - .get(&id) - .stats() - .for_each(|stat| { - println!("{:?}", stat); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + while let Some(result) = containers.get(&id).stats().next().await { + match result { + Ok(stat) => println!("{:?}", stat), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/top.rs b/examples/top.rs index 5fc4229..39e8ea6 100644 --- a/examples/top.rs +++ b/examples/top.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example top -- "); - let fut = docker - .containers() - .get(&id) - .top(Default::default()) - .map(|top| println!("{:#?}", top)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.containers().get(&id).top(Default::default()).await { + Ok(top) => println!("{:#?}", top), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/version.rs b/examples/version.rs index 6125e56..e1ab5d2 100644 --- a/examples/version.rs +++ b/examples/version.rs @@ -1,13 +1,10 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { - env_logger::init(); +#[tokio::main] +async fn main() { let docker = Docker::new(); - let fut = docker - .version() - .map(|ver| println!("version -> {:#?}", ver)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + match docker.version().await { + Ok(ver) => println!("version -> {:#?}", ver), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/volumecreate.rs b/examples/volumecreate.rs index 83f5045..a243be6 100644 --- a/examples/volumecreate.rs +++ b/examples/volumecreate.rs @@ -1,8 +1,8 @@ use shiplift::{builder::VolumeCreateOptions, Docker}; use std::{collections::HashMap, env}; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); @@ -13,15 +13,16 @@ fn main() { let mut labels = HashMap::new(); labels.insert("com.github.softprops", "shiplift"); - let fut = volumes + match volumes .create( &VolumeCreateOptions::builder() .name(volume_name.as_ref()) .labels(&labels) .build(), ) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/volumedelete.rs b/examples/volumedelete.rs index 3800d22..ec1da7e 100644 --- a/examples/volumedelete.rs +++ b/examples/volumedelete.rs @@ -1,8 +1,8 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); @@ -10,11 +10,7 @@ fn main() { .nth(1) .expect("You need to specify an volume name"); - let fut = volumes - .get(&volume_name) - .delete() - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + if let Err(e) = volumes.get(&volume_name).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/volumes.rs b/examples/volumes.rs index c5548ec..d45c00a 100644 --- a/examples/volumes.rs +++ b/examples/volumes.rs @@ -1,18 +1,16 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); - let fut = volumes - .list() - .map(|volumes| { + match volumes.list().await { + Ok(volumes) => { for v in volumes { println!("volume -> {:#?}", v) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/lib.rs b/lib.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/errors.rs b/src/errors.rs index 5506198..9d01f91 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,8 +1,10 @@ //! Representations of various client errors -use hyper::{self, StatusCode}; +use hyper::{self, http, StatusCode}; use serde_json::Error as SerdeError; -use std::{error::Error as StdError, fmt, io::Error as IoError, string::FromUtf8Error}; +use std::{error::Error as StdError, fmt, string::FromUtf8Error}; + +use futures_util::io::Error as IoError; #[derive(Debug)] pub enum Error { @@ -34,12 +36,25 @@ impl From for Error { } } +impl From for Error { + fn from(error: http::uri::InvalidUri) -> Self { + let http_error: http::Error = error.into(); + http_error.into() + } +} + impl From for Error { fn from(error: IoError) -> Error { Error::IO(error) } } +impl From for Error { + fn from(error: FromUtf8Error) -> Error { + Error::Encoding(error) + } +} + impl fmt::Display for Error { fn fmt( &self, diff --git a/src/lib.rs b/src/lib.rs index 16e5625..06b31a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,22 +3,22 @@ //! # examples //! //! ```no_run -//! use tokio::prelude::Future; -//! +//! # async { //! let docker = shiplift::Docker::new(); -//! let fut = docker.images().list(&Default::default()).map(|images| { -//! println!("docker images in stock"); -//! for i in images { -//! println!("{:?}", i.repo_tags); -//! } -//! }).map_err(|e| eprintln!("Something bad happened! {}", e)); //! -//! tokio::run(fut); +//! match docker.images().list(&Default::default()).await { +//! Ok(images) => { +//! for image in images { +//! println!("{:?}", image.repo_tags); +//! } +//! }, +//! Err(e) => eprintln!("Something bad happened! {}", e), +//! } +//! # }; //! ``` pub mod builder; pub mod errors; -pub mod read; pub mod rep; pub mod transport; pub mod tty; @@ -35,7 +35,6 @@ pub use crate::{ errors::Error, }; use crate::{ - read::StreamReader, rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo, @@ -43,9 +42,14 @@ use crate::{ Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep, }, transport::{tar, Transport}, - tty::TtyDecoder, + tty::Multiplexer as TtyMultiPlexer, +}; +use futures_util::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, + TryFutureExt, TryStreamExt, }; -use futures::{future::Either, Future, IntoFuture, Stream}; +// use futures::{future::Either, Future, IntoFuture, Stream}; pub use hyper::Uri; use hyper::{client::HttpConnector, Body, Client, Method}; #[cfg(feature = "tls")] @@ -56,8 +60,7 @@ use mime::Mime; #[cfg(feature = "tls")] use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; use serde_json::Value; -use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration}; -use tokio_codec::{FramedRead, LinesCodec}; +use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration}; use url::form_urlencoded; /// Represents the result of all docker operations @@ -70,19 +73,19 @@ pub struct Docker { } /// Interface for accessing and manipulating a named docker image -pub struct Image<'a, 'b> { +pub struct Image<'a> { docker: &'a Docker, - name: Cow<'b, str>, + name: Cow<'a, str>, } -impl<'a, 'b> Image<'a, 'b> { +impl<'a> Image<'a> { /// Exports an interface for operations that may be performed against a named image pub fn new( docker: &'a Docker, name: S, - ) -> Image<'a, 'b> + ) -> Self where - S: Into>, + S: Into>, { Image { docker, @@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> { } /// Inspects a named image's details - pub fn inspect(&self) -> impl Future { + pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/images/{}/json", self.name)[..]) + .await } /// Lists the history of the images set of changes - pub fn history(&self) -> impl Future, Error = Error> { + pub async fn history(&self) -> Result> { self.docker .get_json(&format!("/images/{}/history", self.name)[..]) + .await } /// Deletes an image - pub fn delete(&self) -> impl Future, Error = Error> { + pub async fn delete(&self) -> Result> { self.docker .delete_json::>(&format!("/images/{}", self.name)[..]) + .await } /// Export this image to a tarball - pub fn export(&self) -> impl Stream, Error = Error> { - self.docker - .stream_get(&format!("/images/{}/get", self.name)[..]) - .map(|c| c.to_vec()) + pub fn export(&self) -> impl Stream>> + Unpin + 'a { + Box::pin( + self.docker + .stream_get(format!("/images/{}/get", self.name)) + .map_ok(|c| c.to_vec()), + ) } /// Adds a tag to an image - pub fn tag( + pub async fn tag( &self, opts: &TagOptions, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/images/{}/tag", self.name)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + let _ = self.docker.post(&path.join("?"), None).await?; + Ok(()) } } @@ -141,76 +150,74 @@ impl<'a> Images<'a> { /// Builds a new image build by reading a Dockerfile in a target directory pub fn build( - &self, - opts: &BuildOptions, - ) -> impl Stream { - let mut path = vec!["/build".to_owned()]; - if let Some(query) = opts.serialize() { - path.push(query) - } + &'a self, + opts: &'a BuildOptions, + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let mut path = vec!["/build".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query) + } + + let mut bytes = Vec::default(); + + tarball::dir(&mut bytes, &opts.path[..])?; + + let chunk_stream = self.docker.stream_post( + path.join("?"), + Some((Body::from(bytes), tar())), + None::>, + ); - let mut bytes = vec![]; - - match tarball::dir(&mut bytes, &opts.path[..]) { - Ok(_) => Box::new( - self.docker - .stream_post( - &path.join("?"), - Some((Body::from(bytes), tar())), - None::>, - ) - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::() - .collect::>(), - ) - .map_err(Error::from) - }) - .flatten(), - ) as Box + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box + Send>, - } + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } /// Lists the docker images on the current docker host - pub fn list( + pub async fn list( &self, opts: &ImageListOptions, - ) -> impl Future, Error = Error> { + ) -> Result> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.docker.get_json::>(&path.join("?")) + self.docker.get_json::>(&path.join("?")).await } /// Returns a reference to a set of operations available for a named image - pub fn get<'b>( + pub fn get( &self, - name: &'b str, - ) -> Image<'a, 'b> { + name: &'a str, + ) -> Image<'a> { Image::new(self.docker, name) } /// Search for docker images by term - pub fn search( + pub async fn search( &self, term: &str, - ) -> impl Future, Error = Error> { + ) -> Result> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); self.docker .get_json::>(&format!("/images/search?{}", query)[..]) + .await } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, - ) -> impl Stream { + ) -> impl Stream> + Unpin + 'a { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); @@ -218,18 +225,15 @@ impl<'a> Images<'a> { let headers = opts .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); - self.docker - .stream_post::(&path.join("?"), None, headers) - // todo: give this a proper enum type - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::() - .collect::>(), - ) - .map_err(Error::from) - }) - .flatten() + + Box::pin( + self.docker + .stream_post(path.join("?"), None, headers) + .and_then(move |chunk| { + // todo: give this a proper enum type + futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) + }), + ) } /// exports a collection of named images, @@ -237,14 +241,14 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> impl Stream, Error = Error> { + ) -> impl Stream>> + 'a { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker - .stream_get(&format!("/images/get?{}", query)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/images/get?{}", query)) + .map_ok(|c| c.to_vec()) } /// imports an image or set of images from a given tarball source @@ -252,43 +256,44 @@ impl<'a> Images<'a> { pub fn import( self, mut tarball: Box, - ) -> impl Stream { - let mut bytes = Vec::new(); - - match tarball.read_to_end(&mut bytes) { - Ok(_) => Box::new( - self.docker - .stream_post( - "/images/load", - Some((Body::from(bytes), tar())), - None::>, - ) - .and_then(|bytes| { - serde_json::from_slice::<'_, Value>(&bytes[..]) - .map_err(Error::from) - .into_future() - }), - ) as Box + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box + Send>, - } + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let mut bytes = Vec::default(); + + tarball.read_to_end(&mut bytes)?; + + let chunk_stream = self.docker.stream_post( + "/images/load", + Some((Body::from(bytes), tar())), + None::>, + ); + + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } } /// Interface for accessing and manipulating a docker container -pub struct Container<'a, 'b> { +pub struct Container<'a> { docker: &'a Docker, - id: Cow<'b, str>, + id: Cow<'a, str>, } -impl<'a, 'b> Container<'a, 'b> { +impl<'a> Container<'a> { /// Exports an interface exposing operations against a container instance pub fn new( docker: &'a Docker, id: S, - ) -> Container<'a, 'b> + ) -> Self where - S: Into>, + S: Into>, { Container { docker, @@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> { } /// Inspects the current docker container instance's details - pub fn inspect(&self) -> impl Future { + pub async fn inspect(&self) -> Result { self.docker .get_json::(&format!("/containers/{}/json", self.id)[..]) + .await } /// Returns a `top` view of information about the container process - pub fn top( + pub async fn top( &self, psargs: Option<&str>, - ) -> impl Future { + ) -> Result { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.get_json(&path.join("?")) + self.docker.get_json(&path.join("?")).await } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream { + ) -> impl Stream> + Unpin + 'a { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?"))); + let stream = Box::pin(self.docker.stream_get(path.join("?"))); - FramedRead::new(chunk_stream, decoder) + Box::pin(tty::decode(stream)) } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach(&self) -> impl Future { - self.docker.stream_post_upgrade_multiplexed::( - &format!( - "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", - self.id - ), - None, - ) + /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. + async fn attach_raw(&self) -> Result { + self.docker + .stream_post_upgrade( + format!( + "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", + self.id + ), + None, + ) + .await } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach_blocking(&self) -> Result { - self.attach().map(|s| s.wait()).wait() + /// Attaches a `[TtyMultiplexer]` to the container. + /// + /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. + /// + /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method + pub async fn attach(&self) -> Result> { + let tcp_stream = self.attach_raw().await?; + + Ok(TtyMultiPlexer::new(tcp_stream)) } /// Returns a set of changes made to the container instance - pub fn changes(&self) -> impl Future, Error = Error> { + pub async fn changes(&self) -> Result> { self.docker .get_json::>(&format!("/containers/{}/changes", self.id)[..]) + .await } /// Exports the current docker container into a tarball - pub fn export(&self) -> impl Stream, Error = Error> { + pub fn export(&self) -> impl Stream>> + 'a { self.docker - .stream_get(&format!("/containers/{}/export", self.id)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/containers/{}/export", self.id)) + .map_ok(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&self) -> impl Stream { - let decoder = LinesCodec::new(); - let stream_of_chunks = StreamReader::new( - self.docker - .stream_get(&format!("/containers/{}/stats", self.id)[..]), - ); + pub fn stats(&'a self) -> impl Stream> + Unpin + 'a { + let codec = futures_codec::LinesCodec {}; - FramedRead::new(stream_of_chunks, decoder) - .map_err(Error::IO) - .and_then(|s| { - serde_json::from_str::(&s) - .map_err(Error::SerdeJsonError) - .into_future() - }) + let reader = Box::pin( + self.docker + .stream_get(format!("/containers/{}/stats", self.id)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) } /// Start the container instance - pub fn start(&self) -> impl Future { + pub async fn start(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/start", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/start", self.id)[..], None) + .await?; + Ok(()) } /// Stop the container instance - pub fn stop( + pub async fn stop( &self, wait: Option, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> { path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Restart the container instance - pub fn restart( + pub async fn restart( &self, wait: Option, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Kill the container instance - pub fn kill( + pub async fn kill( &self, signal: Option<&str>, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Rename the container instance - pub fn rename( + pub async fn rename( &self, name: &str, - ) -> impl Future { + ) -> Result<()> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); self.docker - .post::( + .post( &format!("/containers/{}/rename?{}", self.id, query)[..], None, ) - .map(|_| ()) + .await?; + Ok(()) } /// Pause the container instance - pub fn pause(&self) -> impl Future { + pub async fn pause(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/pause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/pause", self.id)[..], None) + .await?; + Ok(()) } /// Unpause the container instance - pub fn unpause(&self) -> impl Future { + pub async fn unpause(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/unpause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/unpause", self.id)[..], None) + .await?; + Ok(()) } /// Wait until the container stops - pub fn wait(&self) -> impl Future { + pub async fn wait(&self) -> Result { self.docker - .post_json::(&format!("/containers/{}/wait", self.id)[..], None) + .post_json( + format!("/containers/{}/wait", self.id), + Option::<(Body, Mime)>::None, + ) + .await } /// Delete the container instance /// /// Use remove instead to use the force/v options. - pub fn delete(&self) -> impl Future { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/containers/{}", self.id)[..]) - .map(|_| ()) + .await?; + Ok(()) } /// Delete the container instance (todo: force/v) - pub fn remove( + pub async fn remove( &self, opts: RmContainerOptions, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.delete(&path.join("?")).map(|_| ()) + self.docker.delete(&path.join("?")).await?; + Ok(()) } - // TODO(abusch) fix this - /// Exec the specified command in the container - pub fn exec( + async fn exec_create( &self, opts: &ExecContainerOptions, - ) -> impl Stream { - let data = opts.serialize().unwrap(); // TODO fixme - let bytes = data.into_bytes(); - let docker2 = self.docker.clone(); - self.docker - .post( + ) -> Result { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + let body: Body = opts.serialize()?.into(); + + let Response { id } = self + .docker + .post_json( &format!("/containers/{}/exec", self.id)[..], - Some((bytes, mime::APPLICATION_JSON)), + Some((body, mime::APPLICATION_JSON)), ) - .map(move |res| { - let data = "{}"; - let bytes = data.as_bytes(); - let id = serde_json::from_str::(res.as_str()) - .ok() - .and_then(|v| { - v.as_object() - .and_then(|v| v.get("Id")) - .and_then(|v| v.as_str().map(|v| v.to_string())) - }) - .unwrap(); // TODO fixme - - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(docker2.stream_post( - &format!("/exec/{}/start", id)[..], - Some((bytes, mime::APPLICATION_JSON)), - None::>, - )); - FramedRead::new(chunk_stream, decoder) - }) - .flatten_stream() + .await?; + + Ok(id) + } + + fn exec_start( + &self, + id: String, + ) -> impl Stream> + 'a { + let bytes: &[u8] = b"{}"; + + let stream = Box::pin(self.docker.stream_post( + format!("/exec/{}/start", id), + Some((bytes.into(), mime::APPLICATION_JSON)), + None::>, + )); + + tty::decode(stream) + } + + pub fn exec( + &'a self, + opts: &'a ExecContainerOptions, + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let id = self.exec_create(opts).await?; + Ok(self.exec_start(id)) + } + .try_flatten_stream(), + ) } /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted @@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> { pub fn copy_from( &self, path: &Path, - ) -> impl Stream, Error = Error> { + ) -> impl Stream>> + 'a { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); - self.docker - .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg)) - .map(|c| c.to_vec()) + + let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg); + self.docker.stream_get(endpoint).map_ok(|c| c.to_vec()) } /// Copy a byte slice as file into (see `bytes`) the container. /// /// The file will be copied at the given location (see `path`) and will be owned by root /// with access mask 644. - pub fn copy_file_into>( + pub async fn copy_file_into>( &self, path: P, bytes: &[u8], - ) -> impl Fut