diff options
author | doug tangren <d.tangren@gmail.com> | 2018-12-23 01:15:02 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-23 01:15:02 +0900 |
commit | 846b69eac815942d6eff2dc2ac52db8065d8eef7 (patch) | |
tree | 2b2ef2d550d68c4f0d32370d9de5f0d7c159ab35 /src/tty.rs | |
parent | 83d7def2900b5ff2fa736b5b84074f53a57a2e35 (diff) |
update travis build (#140)
* update travis build
* notes on fmting
* remove quotes
* comment below
* rouge quote
* first host.port usage
* fix deprecation warning
Diffstat (limited to 'src/tty.rs')
-rw-r--r-- | src/tty.rs | 78 |
1 files changed, 46 insertions, 32 deletions
@@ -5,9 +5,9 @@ use std::io::Cursor; use tokio_codec::Decoder; use futures::{self, Async}; -use hyper::rt::{Stream, Future}; -use tokio_io::{AsyncRead, AsyncWrite}; +use hyper::rt::{Future, Stream}; use std::io; +use tokio_io::{AsyncRead, AsyncWrite}; #[derive(Debug)] pub struct Chunk { @@ -25,12 +25,12 @@ pub enum StreamType { /// A multiplexed stream. pub struct Multiplexed { stdin: Box<AsyncWrite>, - chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>, + chunks: Box<futures::Stream<Item = Chunk, Error = ::Error>>, } pub struct MultiplexedBlocking { stdin: Box<AsyncWrite>, - chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>, + chunks: Box<Iterator<Item = Result<Chunk, ::Error>>>, } /// Represent the current state of the decoding of a TTY frame @@ -144,7 +144,9 @@ impl Decoder for TtyDecoder { impl Multiplexed { /// Create a multiplexed stream. pub(crate) fn new<T>(stream: T) -> Multiplexed - where T: AsyncRead + AsyncWrite + 'static { + where + T: AsyncRead + AsyncWrite + 'static, + { let (reader, stdin) = stream.split(); Multiplexed { chunks: Box::new(chunks(reader)), @@ -180,7 +182,10 @@ impl Iterator for MultiplexedBlocking { macro_rules! delegate_io_write { ($ty:ty) => { impl io::Write for $ty { - fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> { + fn write( + &mut self, + buf: &[u8], + ) -> Result<usize, io::Error> { self.stdin.write(buf) } @@ -194,27 +199,26 @@ macro_rules! delegate_io_write { delegate_io_write!(Multiplexed); delegate_io_write!(MultiplexedBlocking); -pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error> - where S: AsyncRead { - +pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = ::Error> +where + S: AsyncRead, +{ let stream = futures::stream::unfold(stream, |stream| { let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]); - let fut = header_future - .and_then(|(stream, header_bytes)| { - let size_bytes = &header_bytes[4..]; - let data_length = BigEndian::read_u32(size_bytes); - let stream_type = match header_bytes[0] { - 0 => StreamType::StdIn, - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => panic!("invalid stream number from docker daemon: '{}'", n) - }; - - ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| { - (Chunk { stream_type, data }, stream) - }) - }); + let fut = header_future.and_then(|(stream, header_bytes)| { + let size_bytes = &header_bytes[4..]; + let data_length = BigEndian::read_u32(size_bytes); + let stream_type = match header_bytes[0] { + 0 => StreamType::StdIn, + 1 => StreamType::StdOut, + 2 => StreamType::StdErr, + n => panic!("invalid stream number from docker daemon: '{}'", n), + }; + + ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]) + .map(move |(stream, data)| (Chunk { stream_type, data }, stream)) + }); // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk, // stream)). // This is much better because it would allow us to swallow the unexpected eof and @@ -227,33 +231,43 @@ pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error> } mod util { - use futures::{Stream, Async}; + use futures::{Async, Stream}; pub struct StopOnError<S, F> { stream: S, f: F, } - pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F> - where S: Stream, - F: FnMut(&S::Error) -> bool, + pub fn stop_on_err<S, F>( + stream: S, + f: F, + ) -> StopOnError<S, F> + where + S: Stream, + F: FnMut(&S::Error) -> bool, { StopOnError { stream, f } } impl<S, F> Stream for StopOnError<S, F> - where S: Stream, - F: FnMut(&S::Error) -> bool, + where + S: Stream, + F: FnMut(&S::Error) -> bool, { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> { match self.stream.poll() { - Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) }, + Err(e) => { + if (self.f)(&e) { + Err(e) + } else { + Ok(Async::Ready(None)) + } + } a => a, } } } } - |