From 9c72d3aacb675c5bcff238ce4996dbbe138e11c8 Mon Sep 17 00:00:00 2001 From: "Eli W. Hunter" <42009212+elihunter173@users.noreply.github.com> Date: Sat, 20 Feb 2021 19:36:31 -0500 Subject: Fix lifetimes (#272) * Some changes * Clarify lifetimes in transport.rs * Fix remaining easy lifetimes in lib.rs * Refactor Images::build for new lifetimes to work * Fix Exec::start() * Fix Container::exec() * Make header_bytes not a Vec * Make Docker::{images,...}() lifetimes explicit * Fix Containers::get() * Remove unnecessary locals from examples * Update changelog * Appease clippy --- src/lib.rs | 303 ++++++++++++++++++++++++++++++------------------------- src/transport.rs | 16 +-- src/tty.rs | 2 +- 3 files changed, 177 insertions(+), 144 deletions(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index ac09a41..a9d9264 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,17 +75,17 @@ pub struct Docker { } /// Interface for accessing and manipulating a named docker image -pub struct Image<'a> { - docker: &'a Docker, +pub struct Image<'docker> { + docker: &'docker Docker, name: String, } -impl<'a> Image<'a> { +impl<'docker> Image<'docker> { /// Exports an interface for operations that may be performed against a named image pub fn new( - docker: &Docker, + docker: &'docker Docker, name: S, - ) -> Image + ) -> Self where S: Into, { @@ -117,7 +117,7 @@ impl<'a> Image<'a> { } /// Export this image to a tarball - pub fn export(&self) -> impl Stream>> + Unpin + 'a { + pub fn export(&self) -> impl Stream>> + Unpin + 'docker { Box::pin( self.docker .stream_get(format!("/images/{}/get", self.name)) @@ -140,34 +140,42 @@ impl<'a> Image<'a> { } /// Interface for docker images -pub struct Images<'a> { - docker: &'a Docker, +pub struct Images<'docker> { + docker: &'docker Docker, } -impl<'a> Images<'a> { +impl<'docker> Images<'docker> { /// Exports an interface for interacting with docker images - pub fn new(docker: &'a Docker) -> Images<'a> { + pub fn new(docker: &'docker Docker) -> Self { Images { docker } } /// Builds a new image build by reading a Dockerfile in a target directory pub fn build( - &'a self, - opts: &'a BuildOptions, - ) -> impl Stream> + Unpin + 'a { - Box::pin( - async move { - let mut path = vec!["/build".to_owned()]; - if let Some(query) = opts.serialize() { - path.push(query) - } + &self, + opts: &BuildOptions, + ) -> impl Stream> + Unpin + 'docker { + let mut endpoint = vec!["/build".to_owned()]; + if let Some(query) = opts.serialize() { + endpoint.push(query) + } - let mut bytes = Vec::default(); + // To not tie the lifetime of `opts` to the 'stream, we do the tarring work outside of the + // stream. But for backwards compatability, we have to return the error inside of the + // stream. + let mut bytes = Vec::default(); + let tar_result = tarball::dir(&mut bytes, opts.path.as_str()); - tarball::dir(&mut bytes, &opts.path[..])?; + // We must take ownership of the Docker reference. If we don't then the lifetime of 'stream + // is incorrectly tied to `self`. + let docker = self.docker; + Box::pin( + async move { + // Bubble up error inside the stream for backwards compatability + tar_result?; - let value_stream = self.docker.stream_post_into_values( - path.join("?"), + let value_stream = docker.stream_post_into_values( + endpoint.join("?"), Some((Body::from(bytes), tar())), None::>, ); @@ -194,7 +202,7 @@ impl<'a> Images<'a> { pub fn get( &self, name: S, - ) -> Image<'a> + ) -> Image<'docker> where S: Into, { @@ -218,7 +226,7 @@ impl<'a> Images<'a> { pub fn pull( &self, opts: &PullOptions, - ) -> impl Stream> + Unpin + 'a { + ) -> impl Stream> + Unpin + 'docker { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); @@ -238,7 +246,7 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> impl Stream>> + 'a { + ) -> impl Stream>> + 'docker { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) @@ -253,9 +261,9 @@ impl<'a> Images<'a> { pub fn import( self, mut tarball: R, - ) -> impl Stream> + Unpin + 'a + ) -> impl Stream> + Unpin + 'docker where - R: Read + Send + 'a, + R: Read + Send + 'docker, { Box::pin( async move { @@ -276,15 +284,15 @@ impl<'a> Images<'a> { } /// Interface for accessing and manipulating a docker container -pub struct Container<'a> { - docker: &'a Docker, +pub struct Container<'docker> { + docker: &'docker Docker, id: String, } -impl<'a> Container<'a> { +impl<'docker> Container<'docker> { /// Exports an interface exposing operations against a container instance pub fn new( - docker: &'a Docker, + docker: &'docker Docker, id: S, ) -> Self where @@ -327,7 +335,7 @@ impl<'a> Container<'a> { pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream> + Unpin + 'a { + ) -> impl Stream> + Unpin + 'docker { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) @@ -339,7 +347,7 @@ impl<'a> Container<'a> { } /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. - async fn attach_raw(&self) -> Result { + async fn attach_raw(&self) -> Result { self.docker .stream_post_upgrade( format!( @@ -356,7 +364,7 @@ impl<'a> Container<'a> { /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. /// /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method - pub async fn attach(&self) -> Result> { + pub async fn attach(&self) -> Result> { let tcp_stream = self.attach_raw().await?; Ok(TtyMultiPlexer::new(tcp_stream)) @@ -370,14 +378,14 @@ impl<'a> Container<'a> { } /// Exports the current docker container into a tarball - pub fn export(&self) -> impl Stream>> + 'a { + pub fn export(&self) -> impl Stream>> + 'docker { self.docker .stream_get(format!("/containers/{}/export", self.id)) .map_ok(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&'a self) -> impl Stream> + Unpin + 'a { + pub fn stats(&self) -> impl Stream> + Unpin + 'docker { let codec = futures_codec::LinesCodec {}; let reader = Box::pin( @@ -521,16 +529,10 @@ impl<'a> Container<'a> { /// Execute a command in this container pub fn exec( - &'a self, - opts: &'a ExecContainerOptions, - ) -> impl Stream> + Unpin + 'a { - Box::pin( - async move { - let id = Exec::create_id(&self.docker, &self.id, opts).await?; - Ok(Exec::_start(&self.docker, &id)) - } - .try_flatten_stream(), - ) + &self, + opts: &ExecContainerOptions, + ) -> impl Stream> + Unpin + 'docker { + Exec::create_and_start(self.docker, &self.id, opts) } /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted @@ -544,7 +546,7 @@ impl<'a> Container<'a> { pub fn copy_from( &self, path: &Path, - ) -> impl Stream>> + 'a { + ) -> impl Stream>> + 'docker { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); @@ -608,13 +610,13 @@ impl<'a> Container<'a> { } /// Interface for docker containers -pub struct Containers<'a> { - docker: &'a Docker, +pub struct Containers<'docker> { + docker: &'docker Docker, } -impl<'a> Containers<'a> { +impl<'docker> Containers<'docker> { /// Exports an interface for interacting with docker containers - pub fn new(docker: &'a Docker) -> Containers<'a> { + pub fn new(docker: &'docker Docker) -> Self { Containers { docker } } @@ -636,7 +638,7 @@ impl<'a> Containers<'a> { pub fn get( &self, name: S, - ) -> Container + ) -> Container<'docker> where S: Into, { @@ -665,16 +667,16 @@ impl<'a> Containers<'a> { } } /// Interface for docker exec instance -pub struct Exec<'a> { - docker: &'a Docker, +pub struct Exec<'docker> { + docker: &'docker Docker, id: String, } -impl<'a> Exec<'a> { +impl<'docker> Exec<'docker> { fn new( - docker: &'a Docker, + docker: &'docker Docker, id: S, - ) -> Exec<'a> + ) -> Self where S: Into, { @@ -684,12 +686,12 @@ impl<'a> Exec<'a> { } } - /// Creates an exec instance in docker and returns its id - pub(crate) async fn create_id( - docker: &'a Docker, + /// Creates a new exec instance that will be executed in a container with id == container_id + pub async fn create( + docker: &'docker Docker, container_id: &str, opts: &ExecContainerOptions, - ) -> Result { + ) -> Result> { #[derive(serde::Deserialize)] #[serde(rename_all = "PascalCase")] struct Response { @@ -698,41 +700,68 @@ impl<'a> Exec<'a> { let body: Body = opts.serialize()?.into(); - docker + let id = docker .post_json( - &format!("/containers/{}/exec", container_id)[..], + &format!("/containers/{}/exec", container_id), Some((body, mime::APPLICATION_JSON)), ) .await - .map(|resp: Response| resp.id) - } + .map(|resp: Response| resp.id)?; - /// Starts an exec instance with id exec_id - pub(crate) fn _start( - docker: &'a Docker, - exec_id: &str, - ) -> impl Stream> + 'a { - let bytes: &[u8] = b"{}"; - - let stream = Box::pin(docker.stream_post( - format!("/exec/{}/start", &exec_id), - Some((bytes.into(), mime::APPLICATION_JSON)), - None::>, - )); - - tty::decode(stream) + Ok(Exec::new(docker, id)) } - /// Creates a new exec instance that will be executed in a container with id == container_id - pub async fn create( - docker: &'a Docker, + // This exists for Container::exec() + // + // We need to combine `Exec::create` and `Exec::start` into one method because otherwise you + // needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because + // `Exec::create` is async so it must occur inside of the `async move` block. However, this + // means that `container_id` and `opts` are both expected to be alive in the returned stream + // because we can't do the work of creating an endpoint from `container_id` or serializing + // `opts`. By doing this work outside of the stream, we get owned values that we can then move + // into the stream and have the lifetimes work out as you would expect. + // + // Yes, it is sad that we can't do the easy method and thus have some duplicated code. + pub(crate) fn create_and_start( + docker: &'docker Docker, container_id: &str, opts: &ExecContainerOptions, - ) -> Result> { - Ok(Exec::new( - docker, - Exec::create_id(docker, container_id, opts).await?, - )) + ) -> impl Stream> + Unpin + 'docker { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + // To not tie the lifetime of `opts` to the stream, we do the serializing work outside of + // the stream. But for backwards compatability, we have to return the error inside of the + // stream. + let body_result = opts.serialize(); + + // To not tie the lifetime of `container_id` to the stream, we convert it to an (owned) + // endpoint outside of the stream. + let container_endpoint = format!("/containers/{}/exec", container_id); + + Box::pin( + async move { + // Bubble up the error inside the stream for backwards compatability + let body: Body = body_result?.into(); + + let exec_id = docker + .post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON))) + .await + .map(|resp: Response| resp.id)?; + + let stream = Box::pin(docker.stream_post( + format!("/exec/{}/start", exec_id), + Some(("{}".into(), mime::APPLICATION_JSON)), + None::>, + )); + + Ok(tty::decode(stream)) + } + .try_flatten_stream(), + ) } /// Get a reference to a set of operations available to an already created exec instance. @@ -741,9 +770,9 @@ impl<'a> Exec<'a> { /// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created /// beforehand. pub async fn get( - docker: &'a Docker, + docker: &'docker Docker, id: S, - ) -> Exec<'a> + ) -> Exec<'docker> where S: Into, { @@ -751,14 +780,18 @@ impl<'a> Exec<'a> { } /// Starts this exec instance returning a multiplexed tty stream - pub fn start(&'a self) -> impl Stream> + 'a { + pub fn start(&self) -> impl Stream> + 'docker { + // We must take ownership of the docker reference to not needlessly tie the stream to the + // lifetime of `self`. + let docker = self.docker; + // We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly + // tie the stream to the lifetime of `self`. + let endpoint = format!("/exec/{}/start", &self.id); Box::pin( async move { - let bytes: &[u8] = b"{}"; - - let stream = Box::pin(self.docker.stream_post( - format!("/exec/{}/start", &self.id), - Some((bytes.into(), mime::APPLICATION_JSON)), + let stream = Box::pin(docker.stream_post( + endpoint, + Some(("{}".into(), mime::APPLICATION_JSON)), None::>, )); @@ -791,13 +824,13 @@ impl<'a> Exec<'a> { } /// Interface for docker network -pub struct Networks<'a> { - docker: &'a Docker, +pub struct Networks<'docker> { + docker: &'docker Docker, } -impl<'a> Networks<'a> { +impl<'docker> Networks<'docker> { /// Exports an interface for interacting with docker Networks - pub fn new(docker: &Docker) -> Networks { + pub fn new(docker: &'docker Docker) -> Self { Networks { docker } } @@ -817,7 +850,7 @@ impl<'a> Networks<'a> { pub fn get( &self, id: S, - ) -> Network + ) -> Network<'docker> where S: Into, { @@ -839,17 +872,17 @@ impl<'a> Networks<'a> { } /// Interface for accessing and manipulating a docker network -pub struct Network<'a> { - docker: &'a Docker, +pub struct Network<'docker> { + docker: &'docker Docker, id: String, } -impl<'a> Network<'a> { +impl<'docker> Network<'docker> { /// Exports an interface exposing operations against a network instance pub fn new( - docker: &Docker, + docker: &'docker Docker, id: S, - ) -> Network + ) -> Self where S: Into, { @@ -913,13 +946,13 @@ impl<'a> Network<'a> { } /// Interface for docker volumes -pub struct Volumes<'a> { - docker: &'a Docker, +pub struct Volumes<'docker> { + docker: &'docker Docker, } -impl<'a> Volumes<'a> { +impl<'docker> Volumes<'docker> { /// Exports an interface for interacting with docker volumes - pub fn new(docker: &Docker) -> Volumes { + pub fn new(docker: &'docker Docker) -> Self { Volumes { docker } } @@ -950,23 +983,23 @@ impl<'a> Volumes<'a> { pub fn get( &self, name: &str, - ) -> Volume { + ) -> Volume<'docker> { Volume::new(self.docker, name) } } /// Interface for accessing and manipulating a named docker volume -pub struct Volume<'a> { - docker: &'a Docker, +pub struct Volume<'docker> { + docker: &'docker Docker, name: String, } -impl<'a> Volume<'a> { +impl<'docker> Volume<'docker> { /// Exports an interface for operations that may be performed against a named volume pub fn new( - docker: &Docker, + docker: &'docker Docker, name: S, - ) -> Volume + ) -> Self where S: Into, { @@ -986,13 +1019,13 @@ impl<'a> Volume<'a> { } /// Interface for docker services -pub struct Services<'a> { - docker: &'a Docker, +pub struct Services<'docker> { + docker: &'docker Docker, } -impl<'a> Services<'a> { +impl<'docker> Services<'docker> { /// Exports an interface for interacting with docker services - pub fn new(docker: &Docker) -> Services { + pub fn new(docker: &'docker Docker) -> Self { Services { docker } } @@ -1013,23 +1046,23 @@ impl<'a> Services<'a> { pub fn get( &self, name: &str, - ) -> Service { + ) -> Service<'docker> { Service::new(self.docker, name) } } /// Interface for accessing and manipulating a named docker volume -pub struct Service<'a> { - docker: &'a Docker, +pub struct Service<'docker> { + docker: &'docker Docker, name: String, } -impl<'a> Service<'a> { +impl<'docker> Service<'docker> { /// Exports an interface for operations that may be performed against a named service pub fn new( - docker: &Docker, + docker: &'docker Docker, name: S, - ) -> Service + ) -> Self where S: Into, { @@ -1078,7 +1111,7 @@ impl<'a> Service<'a> { pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream> + Unpin + 'a { + ) -> impl Stream> + Unpin + 'docker { let mut path = vec![format!("/services/{}/logs", self.name)]; if let Some(query) = opts.serialize() { path.push(query) @@ -1220,25 +1253,25 @@ impl Docker { } /// Exports an interface for interacting with docker images - pub fn images(&self) -> Images { + pub fn images(&'_ self) -> Images<'_> { Images::new(self) } /// Exports an interface for interacting with docker containers - pub fn containers(&self) -> Containers { + pub fn containers(&'_ self) -> Containers<'_> { Containers::new(self) } /// Exports an interface for interacting with docker services - pub fn services(&self) -> Services { + pub fn services(&'_ self) -> Services<'_> { Services::new(self) } - pub fn networks(&self) -> Networks { + pub fn networks(&'_ self) -> Networks<'_> { Networks::new(self) } - pub fn volumes(&self) -> Volumes { + pub fn volumes(&'_ self) -> Volumes<'_> { Volumes::new(self) } @@ -1258,10 +1291,10 @@ impl Docker { } /// Returns a stream of docker events - pub fn events<'a>( - &'a self, + pub fn events<'docker>( + &'docker self, opts: &EventsOptions, - ) -> impl Stream> + Unpin + 'a { + ) -> impl Stream> + Unpin + 'docker { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); diff --git a/src/transport.rs b/src/transport.rs index 0fbbcc1..59b83b5 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -73,7 +73,7 @@ impl fmt::Debug for Transport { impl Transport { /// Make a request and return the whole response in a `String` - pub async fn request<'a, B, H>( + pub async fn request( &self, method: Method, endpoint: impl AsRef, @@ -82,7 +82,7 @@ impl Transport { ) -> Result where B: Into, - H: IntoIterator + 'a, + H: IntoIterator, { let body = self.get_body(method, endpoint, body, headers).await?; let bytes = hyper::body::to_bytes(body).await?; @@ -149,16 +149,16 @@ impl Transport { Ok(stream_body(body)) } - pub fn stream_chunks<'a, H, B>( - &'a self, + pub fn stream_chunks<'stream, H, B>( + &'stream self, method: Method, - endpoint: impl AsRef + 'a, + endpoint: impl AsRef + 'stream, body: Option<(B, Mime)>, headers: Option, - ) -> impl Stream> + 'a + ) -> impl Stream> + 'stream where - H: IntoIterator + 'a, - B: Into + 'a, + B: Into + 'stream, + H: IntoIterator + 'stream, { self.get_chunk_stream(method, endpoint, body, headers) .try_flatten_stream() diff --git a/src/tty.rs b/src/tty.rs index 67c72bd..b4232ad 100644 --- a/src/tty.rs +++ b/src/tty.rs @@ -53,7 +53,7 @@ async fn decode_chunk(mut stream: S) -> Option<(Result, S)> where S: AsyncRead + Unpin, { - let mut header_bytes = vec![0u8; 8]; + let mut header_bytes = [0u8; 8]; match stream.read_exact(&mut header_bytes).await { Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None, -- cgit v1.2.3