// // Copyright (c) 2020-2021 science+computing ag and other contributors // // This program and the accompanying materials are made // available under the terms of the Eclipse Public License 2.0 // which is available at https://www.eclipse.org/legal/epl-2.0/ // // SPDX-License-Identifier: EPL-2.0 // use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; use futures::FutureExt; use getset::{CopyGetters, Getters}; use log::trace; use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::StreamExt; use typed_builder::TypedBuilder; use crate::endpoint::EndpointConfiguration; use crate::filestore::path::ArtifactPath; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; use crate::log::buffer_stream_to_line_stream; use crate::log::LogItem; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; #[derive(Getters, CopyGetters, TypedBuilder)] pub struct Endpoint { #[getset(get = "pub")] name: String, #[getset(get = "pub")] docker: Docker, #[getset(get_copy = "pub")] num_max_jobs: usize, #[getset(get = "pub")] network_mode: Option, #[getset(get = "pub")] uri: String, } impl Debug for Endpoint { fn fmt(&self, f: &mut Formatter) -> std::result::Result<(), std::fmt::Error> { write!(f, "Endpoint({}, max: {})", self.name, self.num_max_jobs) } } impl Endpoint { pub(super) async fn setup(epc: EndpointConfiguration) -> Result { let ep = Endpoint::setup_endpoint(epc.endpoint()).with_context(|| { anyhow!( "Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri() ) })?; let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep); let (versions_compat, api_versions_compat, imgs_avail) = tokio::join!(versions_compat, api_versions_compat, imgs_avail); let _ = versions_compat.with_context(|| { anyhow!( "Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri() ) })?; let _ = api_versions_compat.with_context(|| { anyhow!( "Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri() ) })?; let _ = imgs_avail.with_context(|| { anyhow!( "Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri() ) })?; Ok(ep) } fn setup_endpoint(ep: &crate::config::Endpoint) -> Result { match ep.endpoint_type() { crate::config::EndpointType::Http => shiplift::Uri::from_str(ep.uri()) .map(shiplift::Docker::host) .with_context(|| anyhow!("Connecting to {}", ep.uri())) .map_err(Error::from) .map(|docker| { Endpoint::builder() .name(ep.name().clone()) .uri(ep.uri().clone()) .docker(docker) .num_max_jobs(ep.maxjobs()) .network_mode(ep.network_mode().clone()) .build() }), crate::config::EndpointType::Socket => Ok({ Endpoint::builder() .name(ep.name().clone()) .uri(ep.uri().clone()) .num_max_jobs(ep.maxjobs()) .network_mode(ep.network_mode().clone()) .docker(shiplift::Docker::unix(ep.uri())) .build() }), } } async fn check_version_compat(req: Option<&Vec>, ep: &Endpoint) -> Result<()> { match req { None => Ok(()), Some(v) => { let avail = ep .docker() .version() .await .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?; if !v.contains(&avail.version) { Err(anyhow!( "Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", ep.name(), avail.version, v.join(", ") )) } else { Ok(()) } } } } async fn check_api_version_compat(req: Option<&Vec>, ep: &Endpoint) -> Result<()> { match req { None => Ok(()), Some(v) => { let avail = ep .docker() .version() .await .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?; if !v.contains(&avail.api_version) { Err(anyhow!("Incompatible docker API version on endpoint {}: Exepected: {}, Available: [{}]", ep.name(), avail.api_version, v.join(", "))) } else { Ok(()) } } } } async fn check_images_available(imgs: &[ImageName], ep: &Endpoint) -> Result<()> { use shiplift::ImageListOptions; trace!("Checking availability of images: {:?}", imgs); let available_names = ep .docker() .images() .list(&ImageListOptions::builder().all().build()) .await .with_context(|| anyhow!("Listing images on endpoint: {}", ep.name))? .into_iter() .map(|image_rep| { image_rep .repo_tags .unwrap_or_default() .into_iter() .map(ImageName::from) }) .flatten() .collect::>(); trace!("Available images = {:?}", available_names); imgs.iter() .map(|img| { if !available_names.contains(img) { Err(anyhow!( "Image '{}' missing from endpoint '{}'", img.as_ref(), ep.name )) } else { Ok(()) } }) .collect::>>() .map(|_| ()) } pub async fn prepare_container( &self, job: RunnableJob, staging: Arc>, ) -> Result> { PreparedContainer::new(self, job, staging).await } pub async fn number_of_running_containers(&self) -> Result { self.docker .containers() .list(&Default::default()) .await .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) .map_err(Error::from) .map(|list| list.len()) } } #[derive(Getters)] pub struct PreparedContainer<'a> { endpoint: &'a Endpoint, script: Script, #[getset(get = "pub")] create_info: shiplift::rep::ContainerCreateInfo, } impl<'a> PreparedContainer<'a> { async fn new( endpoint: &'a Endpoint, job: RunnableJob, staging: Arc>, ) -> Result> { let script = job.script().clone(); let create_info = Self::build_container(endpoint, &job).await?; let container = endpoint.docker.containers().get(&create_info.id); let (cpysrc, cpyart, cpyscr) = tokio::join!( Self::copy_source_to_container(&container, &job), Self::copy_artifacts_to_container(&container, &job, staging), Self::copy_script_to_container(&container, &script) ); let _ = cpysrc.with_context(|| { anyhow!( "Copying the sources to container {} on '{}'", create_info.id, endpoint.name ) })?; let _ = cpyart.with_context(|| { anyhow!( "Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name ) })?; let _ = cpyscr.with_context(|| { anyhow!( "Copying the script to container {} on '{}'", create_info.id, endpoint.name ) })?; Ok({ PreparedContainer { endpoint, script, create_info, } }) } async fn build_container( endpoint: &Endpoint, job: &RunnableJob, ) -> Result { let envs = job .environment() .into_iter() .map(|(k, v)| format!("{}={}", k.as_ref(), v)) .collect::>(); trace!("Job resources: Environment variables = {:?}", envs); let builder_opts = { let mut builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()); builder_opts.env(envs.iter().map(AsRef::as_ref).collect::>()); builder_opts.cmd(vec!["/bin/bash"]); // we start the container with /bin/bash, but exec() the script in it later builder_opts.attach_stdin(true); // we have to attach, otherwise bash exits if let Some(network_mode) = endpoint.network_mode().as_ref() { builder_opts.network_mode(network_mode); } builder_opts.build() }; trace!("Builder options = {:?}", builder_opts); let create_info = endpoint .docker .containers() .create(&builder_opts) .await .with_context(|| anyhow!("Creating container on '{}'", endpoint.name))?; trace!("Create info = {:?}", create_info); Ok(create_info) } async fn copy_source_to_container<'ca>( container: &Container<'ca>, job: &RunnableJob, ) -> Result<()> { use tokio::io::AsyncReadExt; job.package_sources() .into_iter() .map(|entry| async { let source_path = entry.path(); let destination = PathBuf::from("/inputs").join({ source_path .file_name() .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) .with_context(|| { anyhow!( "Copying package source from {} to container {}", source_path.display(), container.id() ) })? }); trace!("Source path = {:?}", source_path); trace!("Source dest = {:?}", destination); let mut buf = vec![]; tokio::fs::OpenOptions::new() .create(false) .create_new(false) .append(false) .write(false) .read(true) .open(&source_path) .await .with_context(|| anyhow!("Getting source file: {}", source_path.display()))? .read_to_end(&mut buf) .await .with_context(|| anyhow!("Reading file {}", source_path.display()))?; drop(entry); let _ = container.copy_file_into(destination, &buf).await?; Ok(()) }) .collect::>() .collect::>() .await .with_context(|| anyhow!("Copying sources to container {}", container.id())) .map_err(Error::from) } async fn copy_artifacts_to_container<'ca>( container: &Container<'ca>, job: &RunnableJob, staging: Arc>, ) -> Result<()> { job.resources() .iter() .filter_map(JobResource::artifact) .cloned() .map(|art| async { let artifact_file_name = art .path() .file_name() .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) .with_context(|| { anyhow!( "Collecting artifacts for copying to container {}", container.id() ) })?; let destination = PathBuf::from("/inputs/").join(artifact_file_name); trace!( "Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display() ); let buf = staging .read() .await .root_path() .join(art.path())? .read() .await .with_context(|| { anyhow!( "Reading artifact {}, so it can be copied to container", art.path().display() ) })?; let r = container .copy_file_into(&destination, &buf) .await .with_context(|| { anyhow!( "Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display() ) }) .map_err(Error::from); drop(art); // ensure `art` is moved into closure r }) .collect::>() .collect::>>() .await .with_context(|| anyhow!("Copying artifacts to container {}", container.id())) .map_err(Error::from) .map(|_| ()) } async fn copy_script_to_container<'ca>( container: &Container<'ca>, script: &Script, ) -> Result<()> { let script_path = PathBuf::from("/script"); container .copy_file_into(script_path, script.as_ref().as_bytes()) .await .with_context(|| anyhow!("Copying the script into container {}", container.id())) .map_err(Error::from) } pub async fn start(self) -> Result> { let container = self.endpoint.docker.containers().get(&self.create_info.id); let _ = container .start() .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); }) .map(|r| { r.with_context(|| { anyhow!( "Starting the container {} on '{}'", self.create_info.id, self.endpoint.name ) }) }) .await?; Ok({ StartedContainer { endpoint: self.endpoint, script: self.script, create_info: self.create_info, } }) } } pub struct StartedContainer<'a> { endpoint: &'a Endpoint, script: Script, create_info: shiplift::rep::ContainerCreateInfo, } impl<'a> StartedContainer<'a> { pub async fn execute_script( self, logsink: UnboundedSender, ) -> Result> { let exec_opts = ExecContainerOptions::builder() .cmd(vec!["/bin/bash", "/script"]) .attach_stderr(true) .attach_stdout(true) .build(); trace!("Exec options = {:?}", exec_opts); let container = self.endpoint.docker.containers().get(&self.create_info.id); trace!( "Moving logs to log sink for container {}", self.create_info.id ); let stream = container.exec(&exec_opts); let exited_successfully: Option<(bool, Option)> = buffer_stream_to_line_stream(stream) .map(|line| { trace!( "['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line ); line.with_context(|| { anyhow!( "Getting log from {}:{}", self.endpoint.name, self.create_info.id ) }) .map_err(Error::from) .and_then(|l| { crate::log::parser() .parse(l.as_bytes()) .with_context(|| { anyhow!( "Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l ) }) .map_err(Error::from) .and_then(|item| { let mut exited_successfully = None; { match item { LogItem::State(Ok(_)) => { exited_successfully = Some((true, None)) } LogItem::State(Err(ref msg)) => { exited_successfully = Some((false, Some(msg.clone()))) } _ => { // Nothing } } } trace!("Log item: {}", item.display()?); logsink .send(item) .with_context(|| anyhow!("Sending log to log sink")) .map_err(Error::from) .map(|_| exited_successfully) }) }) }) .collect::>>() .map(|r| { r.with_context(|| { anyhow!( "Fetching log from container {} on {}", self.create_info.id, self.endpoint.name ) }) }) .await .with_context(|| { anyhow!( "Copying script to container, running container and getting logs: {}", self.create_info.id ) })? .into_iter() .fold(None, |accu, elem| match (accu, elem) { (None, b) => b, (Some((false, msg)), _) => Some((false, msg)), (_, Some((false, msg))) => Some((false, msg)), (a, None) => a, (Some((true, _)), Some((true, _))) => Some((true, None)), }); Ok({ ExecutedContainer { endpoint: self.endpoint, create_info: self.create_info, script: self.script, exit_info: exited_successfully, } }) } } pub struct ExecutedContainer<'a> { endpoint: &'a Endpoint, create_info: shiplift::rep::ContainerCreateInfo, script: Script, exit_info: Option<(bool, Option)>, } impl<'a> ExecutedContainer<'a> { pub fn container_hash(&self) -> ContainerHash { ContainerHash::from(self.create_info.id.clone()) } pub fn script(&self) -> &Script { &self.script } pub async fn finalize(self, staging: Arc>) -> Result { trace!("Fetching /outputs from container {}", self.create_info.id); let container = self.endpoint.docker.containers().get(&self.create_info.id); let tar_stream = container .copy_from(&PathBuf::from("/outputs/")) .map(|item| { item.with_context(|| { anyhow!( "Copying item from container {} to host", self.create_info.id ) }) .map_err(Error::from) }); let mut writelock = staging.write().await; let artifacts = writelock .write_files_from_tar_stream(tar_stream) .await .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; let exit_info = match self.exit_info { Some((false, msg)) => { let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.", container_id = self.create_info.id, uri = self.endpoint.uri(), msg = msg.as_deref().unwrap_or(""), ); // error because the container errored Err(err) } Some((true, _)) | None => { let container = self.endpoint.docker.containers().get(&self.create_info.id); container .stop(Some(std::time::Duration::new(1, 0))) .await .with_context(|| anyhow!("Stopping container {}", self.create_info.id))?; Ok(()) } }; Ok({ FinalizedContainer { artifacts, exit_info, } }) } } #[derive(Debug)] pub struct FinalizedContainer { artifacts: Vec, exit_info: Result<()>, } impl FinalizedContainer { pub fn unpack(self) -> (Vec, Result<()>) { (self.artifacts, self.exit_info) } }