summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authordoug tangren <d.tangren@gmail.com>2018-12-23 01:15:02 +0900
committerGitHub <noreply@github.com>2018-12-23 01:15:02 +0900
commit846b69eac815942d6eff2dc2ac52db8065d8eef7 (patch)
tree2b2ef2d550d68c4f0d32370d9de5f0d7c159ab35 /src
parent83d7def2900b5ff2fa736b5b84074f53a57a2e35 (diff)
update travis build (#140)
* update travis build * notes on fmting * remove quotes * comment below * rouge quote * first host.port usage * fix deprecation warning
Diffstat (limited to 'src')
-rw-r--r--src/builder.rs26
-rw-r--r--src/errors.rs5
-rw-r--r--src/lib.rs19
-rw-r--r--src/tarball.rs2
-rw-r--r--src/transport.rs30
-rw-r--r--src/tty.rs78
6 files changed, 96 insertions, 64 deletions
diff --git a/src/builder.rs b/src/builder.rs
index bee4c6e..cd086ca 100644
--- a/src/builder.rs
+++ b/src/builder.rs
@@ -515,26 +515,38 @@ impl ContainerOptionsBuilder {
}
/// Whether to attach to `stdin`.
- pub fn attach_stdin(&mut self, attach: bool) -> &mut Self {
+ pub fn attach_stdin(
+ &mut self,
+ attach: bool,
+ ) -> &mut Self {
self.params.insert("AttachStdin", json!(attach));
self.params.insert("OpenStdin", json!(attach));
self
}
/// Whether to attach to `stdout`.
- pub fn attach_stdout(&mut self, attach: bool) -> &mut Self {
+ pub fn attach_stdout(
+ &mut self,
+ attach: bool,
+ ) -> &mut Self {
self.params.insert("AttachStdout", json!(attach));
self
}
/// Whether to attach to `stderr`.
- pub fn attach_stderr(&mut self, attach: bool) -> &mut Self {
+ pub fn attach_stderr(
+ &mut self,
+ attach: bool,
+ ) -> &mut Self {
self.params.insert("AttachStderr", json!(attach));
self
}
/// Whether standard streams should be attached to a TTY.
- pub fn tty(&mut self, tty: bool) -> &mut Self {
+ pub fn tty(
+ &mut self,
+ tty: bool,
+ ) -> &mut Self {
self.params.insert("Tty", json!(tty));
self
}
@@ -1171,9 +1183,7 @@ impl NetworkCreateOptionsBuilder {
pub(crate) fn new(name: &str) -> Self {
let mut params = HashMap::new();
params.insert("Name", json!(name));
- NetworkCreateOptionsBuilder {
- params
- }
+ NetworkCreateOptionsBuilder { params }
}
pub fn driver(
@@ -1253,7 +1263,7 @@ impl ContainerConnectionOptionsBuilder {
aliases: Vec<&str>,
) -> &mut Self {
self.params
- .insert("EndpointConfig", json!({"Aliases": json!(aliases)}));
+ .insert("EndpointConfig", json!({ "Aliases": json!(aliases) }));
self
}
diff --git a/src/errors.rs b/src/errors.rs
index fa408c2..8392971 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -60,7 +60,10 @@ impl fmt::Display for Error {
write!(f, "Response doesn't have the expected format: {}", cause)
}
Error::Fault { code, .. } => write!(f, "{}", code),
- Error::ConnectionNotUpgraded => write!(f, "expected the docker host to upgrade the HTTP connection but it did not"),
+ Error::ConnectionNotUpgraded => write!(
+ f,
+ "expected the docker host to upgrade the HTTP connection but it did not"
+ ),
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 0932994..11a4653 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -310,11 +310,14 @@ impl<'a, 'b> Container<'a, 'b> {
/// Attaches to a running container, returning a stream that can
/// be used to interact with the standard IO streams.
- pub fn attach(&self)
- -> impl Future<Item = tty::Multiplexed, Error = Error> {
+ pub fn attach(&self) -> impl Future<Item = tty::Multiplexed, Error = Error> {
self.docker.stream_post_upgrade_multiplexed::<Body>(
- &format!("/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", self.id),
- None)
+ &format!(
+ "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
+ self.id
+ ),
+ None,
+ )
}
/// Attaches to a running container, returning a stream that can
@@ -804,7 +807,7 @@ impl Docker {
"{}://{}:{}",
host.scheme_part().map(|s| s.as_str()).unwrap(),
host.host().unwrap().to_owned(),
- host.port_part().unwrap_or(80)
+ host.port().unwrap_or(80)
);
match host.scheme_part().map(|s| s.as_str()) {
@@ -1009,7 +1012,9 @@ impl Docker {
body: Option<(B, Mime)>,
) -> impl Future<Item = tty::Multiplexed, Error = Error>
where
- B: Into<Body> + 'static {
- self.transport.stream_upgrade_multiplexed(Method::POST, endpoint, body)
+ B: Into<Body> + 'static,
+ {
+ self.transport
+ .stream_upgrade_multiplexed(Method::POST, endpoint, body)
}
}
diff --git a/src/tarball.rs b/src/tarball.rs
index d414620..acbe47a 100644
--- a/src/tarball.rs
+++ b/src/tarball.rs
@@ -54,7 +54,7 @@ where
let relativized = canonical
.to_str()
.unwrap()
- .trim_left_matches(&base_path_str[..]);
+ .trim_start_matches(&base_path_str[..]);
if path.is_dir() {
archive.append_dir(Path::new(relativized), &canonical)?
} else {
diff --git a/src/transport.rs b/src/transport.rs
index 55844c3..41d8694 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -143,7 +143,8 @@ impl Transport {
f: impl FnOnce(&mut ::http::request::Builder),
) -> Result<Request<Body>>
where
- B: Into<Body> {
+ B: Into<Body>,
+ {
let mut builder = Request::builder();
f(&mut builder);
@@ -197,28 +198,27 @@ impl Transport {
body: Option<(B, Mime)>,
) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error>
where
- B: Into<Body>
+ B: Into<Body>,
{
match self {
Transport::Tcp { .. } | Transport::EncryptedTcp { .. } => (),
_ => panic!("connection streaming is only supported over TCP"),
};
- let req = self.build_request(method, endpoint, body, |builder| {
- builder.header(header::CONNECTION, "Upgrade")
- .header(header::UPGRADE, "tcp");
- }).expect("Failed to build request!");
+ let req = self
+ .build_request(method, endpoint, body, |builder| {
+ builder
+ .header(header::CONNECTION, "Upgrade")
+ .header(header::UPGRADE, "tcp");
+ })
+ .expect("Failed to build request!");
- self.send_request(req).and_then(|res| {
- match res.status() {
+ self.send_request(req)
+ .and_then(|res| match res.status() {
StatusCode::SWITCHING_PROTOCOLS => Ok(res),
_ => Err(Error::ConnectionNotUpgraded),
- }
- }).and_then(|res| {
- res.into_body()
- .on_upgrade()
- .from_err()
- })
+ })
+ .and_then(|res| res.into_body().on_upgrade().from_err())
}
pub fn stream_upgrade_multiplexed<B>(
@@ -228,7 +228,7 @@ impl Transport {
body: Option<(B, Mime)>,
) -> impl Future<Item = ::tty::Multiplexed, Error = Error>
where
- B: Into<Body> + 'static
+ B: Into<Body> + 'static,
{
self.stream_upgrade(method, endpoint, body)
.map(|u| ::tty::Multiplexed::new(u))
diff --git a/src/tty.rs b/src/tty.rs
index a0c5146..c30a038 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -5,9 +5,9 @@ use std::io::Cursor;
use tokio_codec::Decoder;
use futures::{self, Async};
-use hyper::rt::{Stream, Future};
-use tokio_io::{AsyncRead, AsyncWrite};
+use hyper::rt::{Future, Stream};
use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct Chunk {
@@ -25,12 +25,12 @@ pub enum StreamType {
/// A multiplexed stream.
pub struct Multiplexed {
stdin: Box<AsyncWrite>,
- chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>,
+ chunks: Box<futures::Stream<Item = Chunk, Error = ::Error>>,
}
pub struct MultiplexedBlocking {
stdin: Box<AsyncWrite>,
- chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>,
+ chunks: Box<Iterator<Item = Result<Chunk, ::Error>>>,
}
/// Represent the current state of the decoding of a TTY frame
@@ -144,7 +144,9 @@ impl Decoder for TtyDecoder {
impl Multiplexed {
/// Create a multiplexed stream.
pub(crate) fn new<T>(stream: T) -> Multiplexed
- where T: AsyncRead + AsyncWrite + 'static {
+ where
+ T: AsyncRead + AsyncWrite + 'static,
+ {
let (reader, stdin) = stream.split();
Multiplexed {
chunks: Box::new(chunks(reader)),
@@ -180,7 +182,10 @@ impl Iterator for MultiplexedBlocking {
macro_rules! delegate_io_write {
($ty:ty) => {
impl io::Write for $ty {
- fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
+ fn write(
+ &mut self,
+ buf: &[u8],
+ ) -> Result<usize, io::Error> {
self.stdin.write(buf)
}
@@ -194,27 +199,26 @@ macro_rules! delegate_io_write {
delegate_io_write!(Multiplexed);
delegate_io_write!(MultiplexedBlocking);
-pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
- where S: AsyncRead {
-
+pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = ::Error>
+where
+ S: AsyncRead,
+{
let stream = futures::stream::unfold(stream, |stream| {
let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
- let fut = header_future
- .and_then(|(stream, header_bytes)| {
- let size_bytes = &header_bytes[4..];
- let data_length = BigEndian::read_u32(size_bytes);
- let stream_type = match header_bytes[0] {
- 0 => StreamType::StdIn,
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => panic!("invalid stream number from docker daemon: '{}'", n)
- };
-
- ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| {
- (Chunk { stream_type, data }, stream)
- })
- });
+ let fut = header_future.and_then(|(stream, header_bytes)| {
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
+ let stream_type = match header_bytes[0] {
+ 0 => StreamType::StdIn,
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => panic!("invalid stream number from docker daemon: '{}'", n),
+ };
+
+ ::tokio_io::io::read_exact(stream, vec![0; data_length as usize])
+ .map(move |(stream, data)| (Chunk { stream_type, data }, stream))
+ });
// FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
// stream)).
// This is much better because it would allow us to swallow the unexpected eof and
@@ -227,33 +231,43 @@ pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
}
mod util {
- use futures::{Stream, Async};
+ use futures::{Async, Stream};
pub struct StopOnError<S, F> {
stream: S,
f: F,
}
- pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F>
- where S: Stream,
- F: FnMut(&S::Error) -> bool,
+ pub fn stop_on_err<S, F>(
+ stream: S,
+ f: F,
+ ) -> StopOnError<S, F>
+ where
+ S: Stream,
+ F: FnMut(&S::Error) -> bool,
{
StopOnError { stream, f }
}
impl<S, F> Stream for StopOnError<S, F>
- where S: Stream,
- F: FnMut(&S::Error) -> bool,
+ where
+ S: Stream,
+ F: FnMut(&S::Error) -> bool,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
match self.stream.poll() {
- Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) },
+ Err(e) => {
+ if (self.f)(&e) {
+ Err(e)
+ } else {
+ Ok(Async::Ready(None))
+ }
+ }
a => a,
}
}
}
}
-