From 0295809436d8e178a7d0528b47b9d4313b292eef Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Mon, 18 Jan 2021 14:48:59 +0100 Subject: Run `cargo fmt` Signed-off-by: Matthias Beyer --- src/endpoint/configured.rs | 372 +++++++++++++++++++++++++++++++-------------- 1 file changed, 262 insertions(+), 110 deletions(-) (limited to 'src/endpoint/configured.rs') diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 92217e6..b89b0a5 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -13,19 +13,19 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use anyhow::anyhow; use futures::FutureExt; -use getset::{Getters, CopyGetters}; +use getset::{CopyGetters, Getters}; use log::trace; use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; use tokio::stream::StreamExt; -use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::RwLock; use typed_builder::TypedBuilder; use crate::endpoint::EndpointConfiguration; @@ -33,8 +33,8 @@ use crate::filestore::path::ArtifactPath; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; -use crate::log::LogItem; use crate::log::buffer_stream_to_line_stream; +use crate::log::LogItem; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; @@ -61,55 +61,72 @@ impl Debug for Endpoint { } impl Endpoint { - - pub(in super) async fn setup(epc: EndpointConfiguration) -> Result { - let ep = Endpoint::setup_endpoint(epc.endpoint()) - .with_context(|| anyhow!("Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - - let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); - let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); - let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep); + pub(super) async fn setup(epc: EndpointConfiguration) -> Result { + let ep = Endpoint::setup_endpoint(epc.endpoint()).with_context(|| { + anyhow!( + "Setting up endpoint: {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + + let versions_compat = + Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); + let api_versions_compat = + Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); + let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep); let (versions_compat, api_versions_compat, imgs_avail) = tokio::join!(versions_compat, api_versions_compat, imgs_avail); - let _ = versions_compat - .with_context(|| anyhow!("Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - let _ = api_versions_compat - .with_context(|| anyhow!("Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - let _ = imgs_avail - .with_context(|| anyhow!("Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; + let _ = versions_compat.with_context(|| { + anyhow!( + "Checking version compatibility for {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + let _ = api_versions_compat.with_context(|| { + anyhow!( + "Checking API version compatibility for {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + let _ = imgs_avail.with_context(|| { + anyhow!( + "Checking for available images on {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; Ok(ep) } fn setup_endpoint(ep: &crate::config::Endpoint) -> Result { match ep.endpoint_type() { - crate::config::EndpointType::Http => { - shiplift::Uri::from_str(ep.uri()) - .map(shiplift::Docker::host) - .with_context(|| anyhow!("Connecting to {}", ep.uri())) - .map_err(Error::from) - .map(|docker| { - Endpoint::builder() - .name(ep.name().clone()) - .uri(ep.uri().clone()) - .docker(docker) - .num_max_jobs(ep.maxjobs()) - .build() - }) - } - - crate::config::EndpointType::Socket => { - Ok({ + crate::config::EndpointType::Http => shiplift::Uri::from_str(ep.uri()) + .map(shiplift::Docker::host) + .with_context(|| anyhow!("Connecting to {}", ep.uri())) + .map_err(Error::from) + .map(|docker| { Endpoint::builder() .name(ep.name().clone()) .uri(ep.uri().clone()) + .docker(docker) .num_max_jobs(ep.maxjobs()) - .docker(shiplift::Docker::unix(ep.uri())) .build() - }) - } + }), + + crate::config::EndpointType::Socket => Ok({ + Endpoint::builder() + .name(ep.name().clone()) + .uri(ep.uri().clone()) + .num_max_jobs(ep.maxjobs()) + .docker(shiplift::Docker::unix(ep.uri())) + .build() + }), } } @@ -117,14 +134,19 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker() + let avail = ep + .docker() .version() .await .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?; if !v.contains(&avail.version) { - Err(anyhow!("Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", - ep.name(), avail.version, v.join(", "))) + Err(anyhow!( + "Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", + ep.name(), + avail.version, + v.join(", ") + )) } else { Ok(()) } @@ -136,7 +158,8 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker() + let avail = ep + .docker() .version() .await .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?; @@ -163,7 +186,8 @@ impl Endpoint { .with_context(|| anyhow!("Listing images on endpoint: {}", ep.name))? .into_iter() .map(|image_rep| { - image_rep.repo_tags + image_rep + .repo_tags .unwrap_or_default() .into_iter() .map(ImageName::from) @@ -176,7 +200,11 @@ impl Endpoint { imgs.iter() .map(|img| { if !available_names.contains(img) { - Err(anyhow!("Image '{}' missing from endpoint '{}'", img.as_ref(), ep.name)) + Err(anyhow!( + "Image '{}' missing from endpoint '{}'", + img.as_ref(), + ep.name + )) } else { Ok(()) } @@ -185,8 +213,11 @@ impl Endpoint { .map(|_| ()) } - - pub async fn prepare_container(&self, job: RunnableJob, staging: Arc>) -> Result> { + pub async fn prepare_container( + &self, + job: RunnableJob, + staging: Arc>, + ) -> Result> { PreparedContainer::new(self, job, staging).await } @@ -199,7 +230,6 @@ impl Endpoint { .map_err(Error::from) .map(|list| list.len()) } - } #[derive(Getters)] @@ -212,10 +242,14 @@ pub struct PreparedContainer<'a> { } impl<'a> PreparedContainer<'a> { - async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc>) -> Result> { - let script = job.script().clone(); + async fn new( + endpoint: &'a Endpoint, + job: RunnableJob, + staging: Arc>, + ) -> Result> { + let script = job.script().clone(); let create_info = Self::build_container(endpoint, &job).await?; - let container = endpoint.docker.containers().get(&create_info.id); + let container = endpoint.docker.containers().get(&create_info.id); let (cpysrc, cpyart, cpyscr) = tokio::join!( Self::copy_source_to_container(&container, &job), @@ -223,26 +257,45 @@ impl<'a> PreparedContainer<'a> { Self::copy_script_to_container(&container, &script) ); - let _ = cpysrc - .with_context(|| anyhow!("Copying the sources to container {} on '{}'", create_info.id, endpoint.name))?; - - let _ = cpyart - .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name))?; - - let _ = cpyscr - .with_context(|| anyhow!("Copying the script to container {} on '{}'", create_info.id, endpoint.name))?; + let _ = cpysrc.with_context(|| { + anyhow!( + "Copying the sources to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; + + let _ = cpyart.with_context(|| { + anyhow!( + "Copying the artifacts to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; + + let _ = cpyscr.with_context(|| { + anyhow!( + "Copying the script to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; Ok({ PreparedContainer { endpoint, script, - create_info + create_info, } }) } - async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result { - let envs = job.environment() + async fn build_container( + endpoint: &Endpoint, + job: &RunnableJob, + ) -> Result { + let envs = job + .environment() .into_iter() .map(|(k, v)| format!("{}={}", k.as_ref(), v)) .chain({ @@ -270,7 +323,10 @@ impl<'a> PreparedContainer<'a> { Ok(create_info) } - async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> { + async fn copy_source_to_container<'ca>( + container: &Container<'ca>, + job: &RunnableJob, + ) -> Result<()> { use tokio::io::AsyncReadExt; job.package_sources() @@ -278,9 +334,16 @@ impl<'a> PreparedContainer<'a> { .map(|entry| async { let source_path = entry.path(); let destination = PathBuf::from("/inputs").join({ - source_path.file_name() + source_path + .file_name() .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) - .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), container.id()))? + .with_context(|| { + anyhow!( + "Copying package source from {} to container {}", + source_path.display(), + container.id() + ) + })? }); trace!("Source path = {:?}", source_path); trace!("Source dest = {:?}", destination); @@ -309,17 +372,33 @@ impl<'a> PreparedContainer<'a> { .map_err(Error::from) } - async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc>) -> Result<()> { + async fn copy_artifacts_to_container<'ca>( + container: &Container<'ca>, + job: &RunnableJob, + staging: Arc>, + ) -> Result<()> { job.resources() .iter() .filter_map(JobResource::artifact) .cloned() .map(|art| async { - let artifact_file_name = art.path().file_name() + let artifact_file_name = art + .path() + .file_name() .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) - .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container.id()))?; + .with_context(|| { + anyhow!( + "Collecting artifacts for copying to container {}", + container.id() + ) + })?; let destination = PathBuf::from("/inputs/").join(artifact_file_name); - trace!("Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display()); + trace!( + "Copying {} to container: {}:{}", + art.path().display(), + container.id(), + destination.display() + ); let buf = staging .read() .await @@ -327,11 +406,24 @@ impl<'a> PreparedContainer<'a> { .join(art.path())? .read() .await - .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?; - - let r = container.copy_file_into(&destination, &buf) + .with_context(|| { + anyhow!( + "Reading artifact {}, so it can be copied to container", + art.path().display() + ) + })?; + + let r = container + .copy_file_into(&destination, &buf) .await - .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display())) + .with_context(|| { + anyhow!( + "Copying artifact {} to container {} at {}", + art.path().display(), + container.id(), + destination.display() + ) + }) .map_err(Error::from); drop(art); // ensure `art` is moved into closure r @@ -344,9 +436,13 @@ impl<'a> PreparedContainer<'a> { .map(|_| ()) } - async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> { + async fn copy_script_to_container<'ca>( + container: &Container<'ca>, + script: &Script, + ) -> Result<()> { let script_path = PathBuf::from("/script"); - container.copy_file_into(script_path, script.as_ref().as_bytes()) + container + .copy_file_into(script_path, script.as_ref().as_bytes()) .await .with_context(|| anyhow!("Copying the script into container {}", container.id())) .map_err(Error::from) @@ -354,9 +450,20 @@ impl<'a> PreparedContainer<'a> { pub async fn start(self) -> Result> { let container = self.endpoint.docker.containers().get(&self.create_info.id); - let _ = container.start() - .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); }) - .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", self.create_info.id, self.endpoint.name))) + let _ = container + .start() + .inspect(|r| { + trace!("Starting container {} -> {:?}", self.create_info.id, r); + }) + .map(|r| { + r.with_context(|| { + anyhow!( + "Starting the container {} on '{}'", + self.create_info.id, + self.endpoint.name + ) + }) + }) .await?; Ok({ @@ -376,7 +483,10 @@ pub struct StartedContainer<'a> { } impl<'a> StartedContainer<'a> { - pub async fn execute_script(self, logsink: UnboundedSender) -> Result> { + pub async fn execute_script( + self, + logsink: UnboundedSender, + ) -> Result> { let exec_opts = ExecContainerOptions::builder() .cmd(vec!["/bin/bash", "/script"]) .attach_stderr(true) @@ -386,26 +496,51 @@ impl<'a> StartedContainer<'a> { let container = self.endpoint.docker.containers().get(&self.create_info.id); - trace!("Moving logs to log sink for container {}", self.create_info.id); + trace!( + "Moving logs to log sink for container {}", + self.create_info.id + ); let stream = container.exec(&exec_opts); - let exited_successfully: Option<(bool, Option)> = buffer_stream_to_line_stream(stream) - .map(|line| { - trace!("['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line); - line.with_context(|| anyhow!("Getting log from {}:{}", self.endpoint.name, self.create_info.id)) + let exited_successfully: Option<(bool, Option)> = + buffer_stream_to_line_stream(stream) + .map(|line| { + trace!( + "['{}':{}] Found log line: {:?}", + self.endpoint.name, + self.create_info.id, + line + ); + line.with_context(|| { + anyhow!( + "Getting log from {}:{}", + self.endpoint.name, + self.create_info.id + ) + }) .map_err(Error::from) .and_then(|l| { crate::log::parser() .parse(l.as_bytes()) - .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l)) + .with_context(|| { + anyhow!( + "Parsing log from {}:{}: {:?}", + self.endpoint.name, + self.create_info.id, + l + ) + }) .map_err(Error::from) .and_then(|item| { - let mut exited_successfully = None; { match item { - LogItem::State(Ok(_)) => exited_successfully = Some((true, None)), - LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))), + LogItem::State(Ok(_)) => { + exited_successfully = Some((true, None)) + } + LogItem::State(Err(ref msg)) => { + exited_successfully = Some((false, Some(msg.clone()))) + } _ => { // Nothing } @@ -413,25 +548,39 @@ impl<'a> StartedContainer<'a> { } trace!("Log item: {}", item.display()?); - logsink.send(item) + logsink + .send(item) .with_context(|| anyhow!("Sending log to log sink")) .map_err(Error::from) .map(|_| exited_successfully) }) }) - }) - .collect::>>() - .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", self.create_info.id, self.endpoint.name))) - .await - .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", self.create_info.id))? - .into_iter() - .fold(None, |accu, elem| match (accu, elem) { - (None , b) => b, - (Some((false, msg)) , _) => Some((false, msg)), - (_ , Some((false, msg))) => Some((false, msg)), - (a , None) => a, - (Some((true, _)) , Some((true, _))) => Some((true, None)), - }); + }) + .collect::>>() + .map(|r| { + r.with_context(|| { + anyhow!( + "Fetching log from container {} on {}", + self.create_info.id, + self.endpoint.name + ) + }) + }) + .await + .with_context(|| { + anyhow!( + "Copying script to container, running container and getting logs: {}", + self.create_info.id + ) + })? + .into_iter() + .fold(None, |accu, elem| match (accu, elem) { + (None, b) => b, + (Some((false, msg)), _) => Some((false, msg)), + (_, Some((false, msg))) => Some((false, msg)), + (a, None) => a, + (Some((true, _)), Some((true, _))) => Some((true, None)), + }); Ok({ ExecutedContainer { @@ -442,7 +591,6 @@ impl<'a> StartedContainer<'a> { } }) } - } pub struct ExecutedContainer<'a> { @@ -467,8 +615,13 @@ impl<'a> ExecutedContainer<'a> { let tar_stream = container .copy_from(&PathBuf::from("/outputs/")) .map(|item| { - item.with_context(|| anyhow!("Copying item from container {} to host", self.create_info.id)) - .map_err(Error::from) + item.with_context(|| { + anyhow!( + "Copying item from container {} to host", + self.create_info.id + ) + }) + .map_err(Error::from) }); let mut writelock = staging.write().await; @@ -479,7 +632,7 @@ impl<'a> ExecutedContainer<'a> { .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; let exit_info = match self.exit_info { - Some((false, msg)) => { + Some((false, msg)) => { let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.", container_id = self.create_info.id, uri = self.endpoint.uri(), @@ -488,15 +641,16 @@ impl<'a> ExecutedContainer<'a> { // error because the container errored Err(err) - }, + } Some((true, _)) | None => { let container = self.endpoint.docker.containers().get(&self.create_info.id); - container.stop(Some(std::time::Duration::new(1, 0))) + container + .stop(Some(std::time::Duration::new(1, 0))) .await .with_context(|| anyhow!("Stopping container {}", self.create_info.id))?; Ok(()) - }, + } }; Ok({ @@ -519,5 +673,3 @@ impl FinalizedContainer { (self.artifacts, self.exit_info) } } - - -- cgit v1.2.3