summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorEli W. Hunter <42009212+elihunter173@users.noreply.github.com>2021-02-20 19:36:31 -0500
committerGitHub <noreply@github.com>2021-02-20 19:36:31 -0500
commit9c72d3aacb675c5bcff238ce4996dbbe138e11c8 (patch)
tree692b95faa1a8226a6c29efe1f349ae999eddc735 /src
parent5af822c5691a772cad36bd9a69e94419b03d4511 (diff)
Fix lifetimes (#272)
* Some changes * Clarify lifetimes in transport.rs * Fix remaining easy lifetimes in lib.rs * Refactor Images::build for new lifetimes to work * Fix Exec::start() * Fix Container::exec() * Make header_bytes not a Vec * Make Docker::{images,...}() lifetimes explicit * Fix Containers::get() * Remove unnecessary locals from examples * Update changelog * Appease clippy
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs303
-rw-r--r--src/transport.rs16
-rw-r--r--src/tty.rs2
3 files changed, 177 insertions, 144 deletions
diff --git a/src/lib.rs b/src/lib.rs
index ac09a41..a9d9264 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -75,17 +75,17 @@ pub struct Docker {
}
/// Interface for accessing and manipulating a named docker image
-pub struct Image<'a> {
- docker: &'a Docker,
+pub struct Image<'docker> {
+ docker: &'docker Docker,
name: String,
}
-impl<'a> Image<'a> {
+impl<'docker> Image<'docker> {
/// Exports an interface for operations that may be performed against a named image
pub fn new<S>(
- docker: &Docker,
+ docker: &'docker Docker,
name: S,
- ) -> Image
+ ) -> Self
where
S: Into<String>,
{
@@ -117,7 +117,7 @@ impl<'a> Image<'a> {
}
/// Export this image to a tarball
- pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'a {
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'docker {
Box::pin(
self.docker
.stream_get(format!("/images/{}/get", self.name))
@@ -140,34 +140,42 @@ impl<'a> Image<'a> {
}
/// Interface for docker images
-pub struct Images<'a> {
- docker: &'a Docker,
+pub struct Images<'docker> {
+ docker: &'docker Docker,
}
-impl<'a> Images<'a> {
+impl<'docker> Images<'docker> {
/// Exports an interface for interacting with docker images
- pub fn new(docker: &'a Docker) -> Images<'a> {
+ pub fn new(docker: &'docker Docker) -> Self {
Images { docker }
}
/// Builds a new image build by reading a Dockerfile in a target directory
pub fn build(
- &'a self,
- opts: &'a BuildOptions,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
- Box::pin(
- async move {
- let mut path = vec!["/build".to_owned()];
- if let Some(query) = opts.serialize() {
- path.push(query)
- }
+ &self,
+ opts: &BuildOptions,
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
+ let mut endpoint = vec!["/build".to_owned()];
+ if let Some(query) = opts.serialize() {
+ endpoint.push(query)
+ }
- let mut bytes = Vec::default();
+ // To not tie the lifetime of `opts` to the 'stream, we do the tarring work outside of the
+ // stream. But for backwards compatability, we have to return the error inside of the
+ // stream.
+ let mut bytes = Vec::default();
+ let tar_result = tarball::dir(&mut bytes, opts.path.as_str());
- tarball::dir(&mut bytes, &opts.path[..])?;
+ // We must take ownership of the Docker reference. If we don't then the lifetime of 'stream
+ // is incorrectly tied to `self`.
+ let docker = self.docker;
+ Box::pin(
+ async move {
+ // Bubble up error inside the stream for backwards compatability
+ tar_result?;
- let value_stream = self.docker.stream_post_into_values(
- path.join("?"),
+ let value_stream = docker.stream_post_into_values(
+ endpoint.join("?"),
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);
@@ -194,7 +202,7 @@ impl<'a> Images<'a> {
pub fn get<S>(
&self,
name: S,
- ) -> Image<'a>
+ ) -> Image<'docker>
where
S: Into<String>,
{
@@ -218,7 +226,7 @@ impl<'a> Images<'a> {
pub fn pull(
&self,
opts: &PullOptions,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
@@ -238,7 +246,7 @@ impl<'a> Images<'a> {
pub fn export(
&self,
names: Vec<&str>,
- ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
@@ -253,9 +261,9 @@ impl<'a> Images<'a> {
pub fn import<R>(
self,
mut tarball: R,
- ) -> impl Stream<Item = Result<Value>> + Unpin + 'a
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'docker
where
- R: Read + Send + 'a,
+ R: Read + Send + 'docker,
{
Box::pin(
async move {
@@ -276,15 +284,15 @@ impl<'a> Images<'a> {
}
/// Interface for accessing and manipulating a docker container
-pub struct Container<'a> {
- docker: &'a Docker,
+pub struct Container<'docker> {
+ docker: &'docker Docker,
id: String,
}
-impl<'a> Container<'a> {
+impl<'docker> Container<'docker> {
/// Exports an interface exposing operations against a container instance
pub fn new<S>(
- docker: &'a Docker,
+ docker: &'docker Docker,
id: S,
) -> Self
where
@@ -327,7 +335,7 @@ impl<'a> Container<'a> {
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
@@ -339,7 +347,7 @@ impl<'a> Container<'a> {
}
/// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
- async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'a> {
+ async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'docker> {
self.docker
.stream_post_upgrade(
format!(
@@ -356,7 +364,7 @@ impl<'a> Container<'a> {
/// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin.
///
/// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method
- pub async fn attach(&self) -> Result<TtyMultiPlexer<'a>> {
+ pub async fn attach(&self) -> Result<TtyMultiPlexer<'docker>> {
let tcp_stream = self.attach_raw().await?;
Ok(TtyMultiPlexer::new(tcp_stream))
@@ -370,14 +378,14 @@ impl<'a> Container<'a> {
}
/// Exports the current docker container into a tarball
- pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
self.docker
.stream_get(format!("/containers/{}/export", self.id))
.map_ok(|c| c.to_vec())
}
/// Returns a stream of stats specific to this container instance
- pub fn stats(&'a self) -> impl Stream<Item = Result<Stats>> + Unpin + 'a {
+ pub fn stats(&self) -> impl Stream<Item = Result<Stats>> + Unpin + 'docker {
let codec = futures_codec::LinesCodec {};
let reader = Box::pin(
@@ -521,16 +529,10 @@ impl<'a> Container<'a> {
/// Execute a command in this container
pub fn exec(
- &'a self,
- opts: &'a ExecContainerOptions,
- ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
- Box::pin(
- async move {
- let id = Exec::create_id(&self.docker, &self.id, opts).await?;
- Ok(Exec::_start(&self.docker, &id))
- }
- .try_flatten_stream(),
- )
+ &self,
+ opts: &ExecContainerOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
+ Exec::create_and_start(self.docker, &self.id, opts)
}
/// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
@@ -544,7 +546,7 @@ impl<'a> Container<'a> {
pub fn copy_from(
&self,
path: &Path,
- ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
@@ -608,13 +610,13 @@ impl<'a> Container<'a> {
}
/// Interface for docker containers
-pub struct Containers<'a> {
- docker: &'a Docker,
+pub struct Containers<'docker> {
+ docker: &'docker Docker,
}
-impl<'a> Containers<'a> {
+impl<'docker> Containers<'docker> {
/// Exports an interface for interacting with docker containers
- pub fn new(docker: &'a Docker) -> Containers<'a> {
+ pub fn new(docker: &'docker Docker) -> Self {
Containers { docker }
}
@@ -636,7 +638,7 @@ impl<'a> Containers<'a> {
pub fn get<S>(
&self,
name: S,
- ) -> Container
+ ) -> Container<'docker>
where
S: Into<String>,
{
@@ -665,16 +667,16 @@ impl<'a> Containers<'a> {
}
}
/// Interface for docker exec instance
-pub struct Exec<'a> {
- docker: &'a Docker,
+pub struct Exec<'docker> {
+ docker: &'docker Docker,
id: String,
}
-impl<'a> Exec<'a> {
+impl<'docker> Exec<'docker> {
fn new<S>(
- docker: &'a Docker,
+ docker: &'docker Docker,
id: S,
- ) -> Exec<'a>
+ ) -> Self
where
S: Into<String>,
{
@@ -684,12 +686,12 @@ impl<'a> Exec<'a> {
}
}
- /// Creates an exec instance in docker and returns its id
- pub(crate) async fn create_id(
- docker: &'a Docker,
+ /// Creates a new exec instance that will be executed in a container with id == container_id
+ pub async fn create(
+ docker: &'docker Docker,
container_id: &str,
opts: &ExecContainerOptions,
- ) -> Result<String> {
+ ) -> Result<Exec<'docker>> {
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
@@ -698,41 +700,68 @@ impl<'a> Exec<'a> {
let body: Body = opts.serialize()?.into();
- docker
+ let id = docker
.post_json(
- &format!("/containers/{}/exec", container_id)[..],
+ &format!("/containers/{}/exec", container_id),
Some((body, mime::APPLICATION_JSON)),
)
.await
- .map(|resp: Response| resp.id)
- }
+ .map(|resp: Response| resp.id)?;
- /// Starts an exec instance with id exec_id
- pub(crate) fn _start(
- docker: &'a Docker,
- exec_id: &str,
- ) -> impl Stream<Item = Result<tty::TtyChunk>> + 'a {
- let bytes: &[u8] = b"{}";
-
- let stream = Box::pin(docker.stream_post(
- format!("/exec/{}/start", &exec_id),
- Some((bytes.into(), mime::APPLICATION_JSON)),
- None::<iter::Empty<_>>,
- ));
-
- tty::decode(stream)
+ Ok(Exec::new(docker, id))
}
- /// Creates a new exec instance that will be executed in a container with id == container_id
- pub async fn create(
- docker: &'a Docker,
+ // This exists for Container::exec()
+ //
+ // We need to combine `Exec::create` and `Exec::start` into one method because otherwise you
+ // needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because
+ // `Exec::create` is async so it must occur inside of the `async move` block. However, this
+ // means that `container_id` and `opts` are both expected to be alive in the returned stream
+ // because we can't do the work of creating an endpoint from `container_id` or serializing
+ // `opts`. By doing this work outside of the stream, we get owned values that we can then move
+ // into the stream and have the lifetimes work out as you would expect.
+ //
+ // Yes, it is sad that we can't do the easy method and thus have some duplicated code.
+ pub(crate) fn create_and_start(
+ docker: &'docker Docker,
container_id: &str,
opts: &ExecContainerOptions,
- ) -> Result<Exec<'a>> {
- Ok(Exec::new(
- docker,
- Exec::create_id(docker, container_id, opts).await?,
- ))
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ // To not tie the lifetime of `opts` to the stream, we do the serializing work outside of
+ // the stream. But for backwards compatability, we have to return the error inside of the
+ // stream.
+ let body_result = opts.serialize();
+
+ // To not tie the lifetime of `container_id` to the stream, we convert it to an (owned)
+ // endpoint outside of the stream.
+ let container_endpoint = format!("/containers/{}/exec", container_id);
+
+ Box::pin(
+ async move {
+ // Bubble up the error inside the stream for backwards compatability
+ let body: Body = body_result?.into();
+
+ let exec_id = docker
+ .post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON)))
+ .await
+ .map(|resp: Response| resp.id)?;
+
+ let stream = Box::pin(docker.stream_post(
+ format!("/exec/{}/start", exec_id),
+ Some(("{}".into(), mime::APPLICATION_JSON)),
+ None::<iter::Empty<_>>,
+ ));
+
+ Ok(tty::decode(stream))
+ }
+ .try_flatten_stream(),
+ )
}
/// Get a reference to a set of operations available to an already created exec instance.
@@ -741,9 +770,9 @@ impl<'a> Exec<'a> {
/// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created
/// beforehand.
pub async fn get<S>(
- docker: &'a Docker,
+ docker: &'docker Docker,
id: S,
- ) -> Exec<'a>
+ ) -> Exec<'docker>
where
S: Into<String>,
{
@@ -751,14 +780,18 @@ impl<'a> Exec<'a> {
}
/// Starts this exec instance returning a multiplexed tty stream
- pub fn start(&'a self) -> impl Stream<Item = Result<tty::TtyChunk>> + 'a {
+ pub fn start(&self) -> impl Stream<Item = Result<tty::TtyChunk>> + 'docker {
+ // We must take ownership of the docker reference to not needlessly tie the stream to the
+ // lifetime of `self`.
+ let docker = self.docker;
+ // We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly
+ // tie the stream to the lifetime of `self`.
+ let endpoint = format!("/exec/{}/start", &self.id);
Box::pin(
async move {
- let bytes: &[u8] = b"{}";
-
- let stream = Box::pin(self.docker.stream_post(
- format!("/exec/{}/start", &self.id),
- Some((bytes.into(), mime::APPLICATION_JSON)),
+ let stream = Box::pin(docker.stream_post(
+ endpoint,
+ Some(("{}".into(), mime::APPLICATION_JSON)),
None::<iter::Empty<_>>,
));
@@ -791,13 +824,13 @@ impl<'a> Exec<'a> {
}
/// Interface for docker network
-pub struct Networks<'a> {
- docker: &'a Docker,
+pub struct Networks<'docker> {
+ docker: &'docker Docker,
}
-impl<'a> Networks<'a> {
+impl<'docker> Networks<'docker> {
/// Exports an interface for interacting with docker Networks
- pub fn new(docker: &Docker) -> Networks {
+ pub fn new(docker: &'docker Docker) -> Self {
Networks { docker }
}
@@ -817,7 +850,7 @@ impl<'a> Networks<'a> {
pub fn get<S>(
&self,
id: S,
- ) -> Network
+ ) -> Network<'docker>
where
S: Into<String>,
{
@@ -839,17 +872,17 @@ impl<'a> Networks<'a> {
}
/// Interface for accessing and manipulating a docker network
-pub struct Network<'a> {
- docker: &'a Docker,
+pub struct Network<'docker> {
+ docker: &'docker Docker,
id: String,
}
-impl<'a> Network<'a> {
+impl<'docker> Network<'docker> {
/// Exports an interface exposing operations against a network instance
pub fn new<S>(
- docker: &Docker,
+ docker: &'docker Docker,
id: S,
- ) -> Network
+ ) -> Self
where
S: Into<String>,
{
@@ -913,13 +946,13 @@ impl<'a> Network<'a> {
}
/// Interface for docker volumes
-pub struct Volumes<'a> {
- docker: &'a Docker,
+pub struct Volumes<'docker> {
+ docker: &'docker Docker,
}
-impl<'a> Volumes<'a> {
+impl<'docker> Volumes<'docker> {
/// Exports an interface for interacting with docker volumes
- pub fn new(docker: &Docker) -> Volumes {
+ pub fn new(docker: &'docker Docker) -> Self {
Volumes { docker }
}
@@ -950,23 +983,23 @@ impl<'a> Volumes<'a> {
pub fn get(
&self,
name: &str,
- ) -> Volume {
+ ) -> Volume<'docker> {
Volume::new(self.docker, name)
}
}
/// Interface for accessing and manipulating a named docker volume
-pub struct Volume<'a> {
- docker: &'a Docker,
+pub struct Volume<'docker> {
+ docker: &'docker Docker,
name: String,
}
-impl<'a> Volume<'a> {
+impl<'docker> Volume<'docker> {
/// Exports an interface for operations that may be performed against a named volume
pub fn new<S>(
- docker: &Docker,
+ docker: &'docker Docker,
name: S,
- ) -> Volume
+ ) -> Self
where
S: Into<String>,
{
@@ -986,13 +1019,13 @@ impl<'a> Volume<'a> {
}
/// Interface for docker services
-pub struct Services<'a> {
- docker: &'a Docker,
+pub struct Services<'docker> {
+ docker: &'docker Docker,
}
-impl<'a> Services<'a> {
+impl<'docker> Services<'docker> {
/// Exports an interface for interacting with docker services
- pub fn new(docker: &Docker) -> Services {
+ pub fn new(docker: &'docker Docker) -> Self {
Services { docker }
}
@@ -1013,23 +1046,23 @@ impl<'a> Services<'a> {
pub fn get(
&self,
name: &str,
- ) -> Service {
+ ) -> Service<'docker> {
Service::new(self.docker, name)
}
}
/// Interface for accessing and manipulating a named docker volume
-pub struct Service<'a> {
- docker: &'a Docker,
+pub struct Service<'docker> {
+ docker: &'docker Docker,
name: String,
}
-impl<'a> Service<'a> {
+impl<'docker> Service<'docker> {
/// Exports an interface for operations that may be performed against a named service
pub fn new<S>(
- docker: &Docker,
+ docker: &'docker Docker,
name: S,
- ) -> Service
+ ) -> Self
where
S: Into<String>,
{
@@ -1078,7 +1111,7 @@ impl<'a> Service<'a> {
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
let mut path = vec![format!("/services/{}/logs", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
@@ -1220,25 +1253,25 @@ impl Docker {
}
/// Exports an interface for interacting with docker images
- pub fn images(&self) -> Images {
+ pub fn images(&'_ self) -> Images<'_> {
Images::new(self)
}
/// Exports an interface for interacting with docker containers
- pub fn containers(&self) -> Containers {
+ pub fn containers(&'_ self) -> Containers<'_> {
Containers::new(self)
}
/// Exports an interface for interacting with docker services
- pub fn services(&self) -> Services {
+ pub fn services(&'_ self) -> Services<'_> {
Services::new(self)
}
- pub fn networks(&self) -> Networks {
+ pub fn networks(&'_ self) -> Networks<'_> {
Networks::new(self)
}
- pub fn volumes(&self) -> Volumes {
+ pub fn volumes(&'_ self) -> Volumes<'_> {
Volumes::new(self)
}
@@ -1258,10 +1291,10 @@ impl Docker {
}
/// Returns a stream of docker events
- pub fn events<'a>(
- &'a self,
+ pub fn events<'docker>(
+ &'docker self,
opts: &EventsOptions,
- ) -> impl Stream<Item = Result<Event>> + Unpin + 'a {
+ ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
let mut path = vec!["/events".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
diff --git a/src/transport.rs b/src/transport.rs
index 0fbbcc1..59b83b5 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -73,7 +73,7 @@ impl fmt::Debug for Transport {
impl Transport {
/// Make a request and return the whole response in a `String`
- pub async fn request<'a, B, H>(
+ pub async fn request<B, H>(
&self,
method: Method,
endpoint: impl AsRef<str>,
@@ -82,7 +82,7 @@ impl Transport {
) -> Result<String>
where
B: Into<Body>,
- H: IntoIterator<Item = (&'static str, String)> + 'a,
+ H: IntoIterator<Item = (&'static str, String)>,
{
let body = self.get_body(method, endpoint, body, headers).await?;
let bytes = hyper::body::to_bytes(body).await?;
@@ -149,16 +149,16 @@ impl Transport {
Ok(stream_body(body))
}
- pub fn stream_chunks<'a, H, B>(
- &'a self,
+ pub fn stream_chunks<'stream, H, B>(
+ &'stream self,
method: Method,
- endpoint: impl AsRef<str> + 'a,
+ endpoint: impl AsRef<str> + 'stream,
body: Option<(B, Mime)>,
headers: Option<H>,
- ) -> impl Stream<Item = Result<Bytes>> + 'a
+ ) -> impl Stream<Item = Result<Bytes>> + 'stream
where
- H: IntoIterator<Item = (&'static str, String)> + 'a,
- B: Into<Body> + 'a,
+ B: Into<Body> + 'stream,
+ H: IntoIterator<Item = (&'static str, String)> + 'stream,
{
self.get_chunk_stream(method, endpoint, body, headers)
.try_flatten_stream()
diff --git a/src/tty.rs b/src/tty.rs
index 67c72bd..b4232ad 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -53,7 +53,7 @@ async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
where
S: AsyncRead + Unpin,
{
- let mut header_bytes = vec![0u8; 8];
+ let mut header_bytes = [0u8; 8];
match stream.read_exact(&mut header_bytes).await {
Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,