summaryrefslogtreecommitdiffstats
path: root/src/transport.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport.rs')
-rw-r--r--src/transport.rs42
1 files changed, 21 insertions, 21 deletions
diff --git a/src/transport.rs b/src/transport.rs
index 112d548..59b83b5 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -30,6 +30,9 @@ pub fn tar() -> Mime {
"application/tar".parse().unwrap()
}
+pub(crate) type Headers = Option<Vec<(&'static str, String)>>;
+pub(crate) type Payload = Option<(Body, Mime)>;
+
/// Transports are types which define the means of communication
/// with the docker daemon
#[derive(Clone)]
@@ -70,18 +73,18 @@ impl fmt::Debug for Transport {
impl Transport {
/// Make a request and return the whole response in a `String`
- pub async fn request<B>(
+ pub async fn request<B, H>(
&self,
method: Method,
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
+ headers: Option<H>,
) -> Result<String>
where
B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)>,
{
- let body = self
- .get_body(method, endpoint, body, None::<iter::Empty<_>>)
- .await?;
+ let body = self.get_body(method, endpoint, body, headers).await?;
let bytes = hyper::body::to_bytes(body).await?;
let string = String::from_utf8(bytes.to_vec())?;
@@ -122,7 +125,7 @@ impl Transport {
message: Self::get_error_message(&message_body).unwrap_or_else(|| {
status
.canonical_reason()
- .unwrap_or_else(|| "unknown error code")
+ .unwrap_or("unknown error code")
.to_owned()
}),
})
@@ -146,16 +149,16 @@ impl Transport {
Ok(stream_body(body))
}
- pub fn stream_chunks<'a, H, B>(
- &'a self,
+ pub fn stream_chunks<'stream, H, B>(
+ &'stream self,
method: Method,
- endpoint: impl AsRef<str> + 'a,
+ endpoint: impl AsRef<str> + 'stream,
body: Option<(B, Mime)>,
headers: Option<H>,
- ) -> impl Stream<Item = Result<Bytes>> + 'a
+ ) -> impl Stream<Item = Result<Bytes>> + 'stream
where
- H: IntoIterator<Item = (&'static str, String)> + 'a,
- B: Into<Body> + 'a,
+ B: Into<Body> + 'stream,
+ H: IntoIterator<Item = (&'static str, String)> + 'stream,
{
self.get_chunk_stream(method, endpoint, body, headers)
.try_flatten_stream()
@@ -236,14 +239,6 @@ impl Transport {
where
B: Into<Body>,
{
- match self {
- Transport::Tcp { .. } => (),
- #[cfg(feature = "tls")]
- Transport::EncryptedTcp { .. } => (),
- #[cfg(feature = "unix-socket")]
- Transport::Unix { .. } => panic!("connection streaming is only supported over TCP"),
- };
-
let req = self
.build_request(
method,
@@ -259,7 +254,7 @@ impl Transport {
let response = self.send_request(req).await?;
match response.status() {
- StatusCode::SWITCHING_PROTOCOLS => Ok(response.into_body().on_upgrade().await?),
+ StatusCode::SWITCHING_PROTOCOLS => Ok(hyper::upgrade::on(response).await?),
_ => Err(Error::ConnectionNotUpgraded),
}
}
@@ -302,7 +297,12 @@ where
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- self.project().tokio_multiplexer.poll_read(cx, buf)
+ let mut readbuf = tokio::io::ReadBuf::new(buf);
+ match self.project().tokio_multiplexer.poll_read(cx, &mut readbuf) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Ok(())) => Poll::Ready(Ok(readbuf.filled().len())),
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ }
}
}