diff options
Diffstat (limited to 'src/transport.rs')
-rw-r--r-- | src/transport.rs | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/src/transport.rs b/src/transport.rs index 112d548..59b83b5 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -30,6 +30,9 @@ pub fn tar() -> Mime { "application/tar".parse().unwrap() } +pub(crate) type Headers = Option<Vec<(&'static str, String)>>; +pub(crate) type Payload = Option<(Body, Mime)>; + /// Transports are types which define the means of communication /// with the docker daemon #[derive(Clone)] @@ -70,18 +73,18 @@ impl fmt::Debug for Transport { impl Transport { /// Make a request and return the whole response in a `String` - pub async fn request<B>( + pub async fn request<B, H>( &self, method: Method, endpoint: impl AsRef<str>, body: Option<(B, Mime)>, + headers: Option<H>, ) -> Result<String> where B: Into<Body>, + H: IntoIterator<Item = (&'static str, String)>, { - let body = self - .get_body(method, endpoint, body, None::<iter::Empty<_>>) - .await?; + let body = self.get_body(method, endpoint, body, headers).await?; let bytes = hyper::body::to_bytes(body).await?; let string = String::from_utf8(bytes.to_vec())?; @@ -122,7 +125,7 @@ impl Transport { message: Self::get_error_message(&message_body).unwrap_or_else(|| { status .canonical_reason() - .unwrap_or_else(|| "unknown error code") + .unwrap_or("unknown error code") .to_owned() }), }) @@ -146,16 +149,16 @@ impl Transport { Ok(stream_body(body)) } - pub fn stream_chunks<'a, H, B>( - &'a self, + pub fn stream_chunks<'stream, H, B>( + &'stream self, method: Method, - endpoint: impl AsRef<str> + 'a, + endpoint: impl AsRef<str> + 'stream, body: Option<(B, Mime)>, headers: Option<H>, - ) -> impl Stream<Item = Result<Bytes>> + 'a + ) -> impl Stream<Item = Result<Bytes>> + 'stream where - H: IntoIterator<Item = (&'static str, String)> + 'a, - B: Into<Body> + 'a, + B: Into<Body> + 'stream, + H: IntoIterator<Item = (&'static str, String)> + 'stream, { self.get_chunk_stream(method, endpoint, body, headers) .try_flatten_stream() @@ -236,14 +239,6 @@ impl Transport { where B: Into<Body>, { - match self { - Transport::Tcp { .. } => (), - #[cfg(feature = "tls")] - Transport::EncryptedTcp { .. } => (), - #[cfg(feature = "unix-socket")] - Transport::Unix { .. } => panic!("connection streaming is only supported over TCP"), - }; - let req = self .build_request( method, @@ -259,7 +254,7 @@ impl Transport { let response = self.send_request(req).await?; match response.status() { - StatusCode::SWITCHING_PROTOCOLS => Ok(response.into_body().on_upgrade().await?), + StatusCode::SWITCHING_PROTOCOLS => Ok(hyper::upgrade::on(response).await?), _ => Err(Error::ConnectionNotUpgraded), } } @@ -302,7 +297,12 @@ where cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - self.project().tokio_multiplexer.poll_read(cx, buf) + let mut readbuf = tokio::io::ReadBuf::new(buf); + match self.project().tokio_multiplexer.poll_read(cx, &mut readbuf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(readbuf.filled().len())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } } } |