summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
authordoug tangren <d.tangren@gmail.com>2018-12-23 01:15:02 +0900
committerGitHub <noreply@github.com>2018-12-23 01:15:02 +0900
commit846b69eac815942d6eff2dc2ac52db8065d8eef7 (patch)
tree2b2ef2d550d68c4f0d32370d9de5f0d7c159ab35 /src/tty.rs
parent83d7def2900b5ff2fa736b5b84074f53a57a2e35 (diff)
update travis build (#140)
* update travis build * notes on fmting * remove quotes * comment below * rouge quote * first host.port usage * fix deprecation warning
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs78
1 files changed, 46 insertions, 32 deletions
diff --git a/src/tty.rs b/src/tty.rs
index a0c5146..c30a038 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -5,9 +5,9 @@ use std::io::Cursor;
use tokio_codec::Decoder;
use futures::{self, Async};
-use hyper::rt::{Stream, Future};
-use tokio_io::{AsyncRead, AsyncWrite};
+use hyper::rt::{Future, Stream};
use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct Chunk {
@@ -25,12 +25,12 @@ pub enum StreamType {
/// A multiplexed stream.
pub struct Multiplexed {
stdin: Box<AsyncWrite>,
- chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>,
+ chunks: Box<futures::Stream<Item = Chunk, Error = ::Error>>,
}
pub struct MultiplexedBlocking {
stdin: Box<AsyncWrite>,
- chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>,
+ chunks: Box<Iterator<Item = Result<Chunk, ::Error>>>,
}
/// Represent the current state of the decoding of a TTY frame
@@ -144,7 +144,9 @@ impl Decoder for TtyDecoder {
impl Multiplexed {
/// Create a multiplexed stream.
pub(crate) fn new<T>(stream: T) -> Multiplexed
- where T: AsyncRead + AsyncWrite + 'static {
+ where
+ T: AsyncRead + AsyncWrite + 'static,
+ {
let (reader, stdin) = stream.split();
Multiplexed {
chunks: Box::new(chunks(reader)),
@@ -180,7 +182,10 @@ impl Iterator for MultiplexedBlocking {
macro_rules! delegate_io_write {
($ty:ty) => {
impl io::Write for $ty {
- fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
+ fn write(
+ &mut self,
+ buf: &[u8],
+ ) -> Result<usize, io::Error> {
self.stdin.write(buf)
}
@@ -194,27 +199,26 @@ macro_rules! delegate_io_write {
delegate_io_write!(Multiplexed);
delegate_io_write!(MultiplexedBlocking);
-pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
- where S: AsyncRead {
-
+pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = ::Error>
+where
+ S: AsyncRead,
+{
let stream = futures::stream::unfold(stream, |stream| {
let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
- let fut = header_future
- .and_then(|(stream, header_bytes)| {
- let size_bytes = &header_bytes[4..];
- let data_length = BigEndian::read_u32(size_bytes);
- let stream_type = match header_bytes[0] {
- 0 => StreamType::StdIn,
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => panic!("invalid stream number from docker daemon: '{}'", n)
- };
-
- ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| {
- (Chunk { stream_type, data }, stream)
- })
- });
+ let fut = header_future.and_then(|(stream, header_bytes)| {
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
+ let stream_type = match header_bytes[0] {
+ 0 => StreamType::StdIn,
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => panic!("invalid stream number from docker daemon: '{}'", n),
+ };
+
+ ::tokio_io::io::read_exact(stream, vec![0; data_length as usize])
+ .map(move |(stream, data)| (Chunk { stream_type, data }, stream))
+ });
// FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
// stream)).
// This is much better because it would allow us to swallow the unexpected eof and
@@ -227,33 +231,43 @@ pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
}
mod util {
- use futures::{Stream, Async};
+ use futures::{Async, Stream};
pub struct StopOnError<S, F> {
stream: S,
f: F,
}
- pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F>
- where S: Stream,
- F: FnMut(&S::Error) -> bool,
+ pub fn stop_on_err<S, F>(
+ stream: S,
+ f: F,
+ ) -> StopOnError<S, F>
+ where
+ S: Stream,
+ F: FnMut(&S::Error) -> bool,
{
StopOnError { stream, f }
}
impl<S, F> Stream for StopOnError<S, F>
- where S: Stream,
- F: FnMut(&S::Error) -> bool,
+ where
+ S: Stream,
+ F: FnMut(&S::Error) -> bool,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
match self.stream.poll() {
- Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) },
+ Err(e) => {
+ if (self.f)(&e) {
+ Err(e)
+ } else {
+ Ok(Async::Ready(None))
+ }
+ }
a => a,
}
}
}
}
-