diff options
-rw-r--r-- | src/endpoint/configured.rs | 469 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 24 |
2 files changed, 297 insertions, 196 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 2c72b9b..8ef961a 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -10,7 +10,8 @@ use anyhow::Result; use anyhow::anyhow; use futures::FutureExt; use getset::{Getters, CopyGetters}; -use log::{warn, trace}; +use log::trace; +use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; use tokio::stream::StreamExt; @@ -175,59 +176,195 @@ impl Endpoint { .map(|_| ()) } - /// Run a job - /// - /// The return type of this function is a bit complex, so here's an explanation: - /// - /// * The outer Result is for indicating whether the general process of running the container - /// and all related tasks worked. - /// - /// The tuple holds the result of the container run itself (the first item), the hash of the - /// container that was run and the script that was run. - /// - /// The script is for reporting and should be written to the database by the caller. - /// The ContainerHash as well. - /// - /// The result inside the tuple is an Err if the container script returned an error. - /// It is Ok containing the created artifact pathes if the script exited successfully. - /// - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Result<Vec<ArtifactPath>>, ContainerHash, Script)> { - let (container_id, _warnings) = { - let envs = job.environment() - .into_iter() - .map(|(k, v)| format!("{}={}", k.as_ref(), v)) - .chain({ - job.package_environment() - .into_iter() - .map(|(k, v)| format!("{}={}", k, v)) - }) - .collect::<Vec<_>>(); - trace!("Job resources: Environment variables = {:?}", envs); - - let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()) - .env(envs.iter().map(AsRef::as_ref).collect()) - .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later - .attach_stdin(true) // we have to attach, otherwise bash exits - .build(); - trace!("Builder options = {:?}", builder_opts); - - let create_info = self.docker - .containers() - .create(&builder_opts) - .await - .with_context(|| anyhow!("Creating container on '{}'", self.name))?; - trace!("Create info = {:?}", create_info); - - if let Some(warnings) = create_info.warnings.as_ref() { - for warning in warnings { - warn!("{}", warning); - } + + pub async fn prepare_container<'a>(&'a self, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> { + PreparedContainer::new(self, job, staging).await + } + + pub async fn number_of_running_containers(&self) -> Result<usize> { + self.docker + .containers() + .list(&Default::default()) + .await + .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) + .map_err(Error::from) + .map(|list| list.len()) + } + +} + +pub struct PreparedContainer<'a> { + endpoint: &'a Endpoint, + script: Script, + create_info: shiplift::rep::ContainerCreateInfo, +} + +impl<'a> PreparedContainer<'a> { + async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> { + let script = job.script().clone(); + let create_info = Self::build_container(endpoint, &job).await?; + let container = endpoint.docker.containers().get(&create_info.id); + + let (cpysrc, cpyart, cpyscr) = tokio::join!( + Self::copy_source_to_container(&container, &job), + Self::copy_artifacts_to_container(&container, &job, staging), + Self::copy_script_to_container(&container, &script) + ); + + let _ = cpysrc + .with_context(|| anyhow!("Copying the sources to container {} on '{}'", create_info.id, endpoint.name))?; + + let _ = cpyart + .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name))?; + + let _ = cpyscr + .with_context(|| anyhow!("Copying the script to container {} on '{}'", create_info.id, endpoint.name))?; + + Ok({ + PreparedContainer { + endpoint, + script, + create_info } + }) + } - (create_info.id, create_info.warnings) - }; + async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result<shiplift::rep::ContainerCreateInfo> { + let envs = job.environment() + .into_iter() + .map(|(k, v)| format!("{}={}", k.as_ref(), v)) + .chain({ + job.package_environment() + .into_iter() + .map(|(k, v)| format!("{}={}", k, v)) + }) + .collect::<Vec<_>>(); + trace!("Job resources: Environment variables = {:?}", envs); + + let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()) + .env(envs.iter().map(AsRef::as_ref).collect()) + .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later + .attach_stdin(true) // we have to attach, otherwise bash exits + .build(); + trace!("Builder options = {:?}", builder_opts); + + let create_info = endpoint + .docker + .containers() + .create(&builder_opts) + .await + .with_context(|| anyhow!("Creating container on '{}'", endpoint.name))?; + trace!("Create info = {:?}", create_info); + Ok(create_info) + } + + async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> { + use tokio::io::AsyncReadExt; + + job.package_sources() + .into_iter() + .map(|entry| async { + let source_path = entry.path(); + let destination = PathBuf::from("/inputs").join({ + source_path.file_name() + .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) + .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), container.id()))? + }); + trace!("Source path = {:?}", source_path); + trace!("Source dest = {:?}", destination); + let mut buf = vec![]; + tokio::fs::OpenOptions::new() + .create(false) + .create_new(false) + .append(false) + .write(false) + .read(true) + .open(&source_path) + .await + .with_context(|| anyhow!("Getting source file: {}", source_path.display()))? + .read_to_end(&mut buf) + .await + .with_context(|| anyhow!("Reading file {}", source_path.display()))?; + + drop(entry); + let _ = container.copy_file_into(destination, &buf).await?; + Ok(()) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<()>>() + .await + .with_context(|| anyhow!("Copying sources to container {}", container.id())) + .map_err(Error::from) + } + + async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<()> { + job.resources() + .into_iter() + .filter_map(JobResource::artifact) + .cloned() + .map(|art| async { + let artifact_file_name = art.path().file_name() + .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) + .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container.id()))?; + let destination = PathBuf::from("/inputs/").join(artifact_file_name); + trace!("Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display()); + let buf = staging + .read() + .await + .root_path() + .join(art.path())? + .read() + .await + .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?; + + let r = container.copy_file_into(&destination, &buf) + .await + .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display())) + .map_err(Error::from); + drop(art); // ensure `art` is moved into closure + r + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<_>>>() + .await + .with_context(|| anyhow!("Copying artifacts to container {}", container.id())) + .map_err(Error::from) + .map(|_| ()) + } + async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> { let script_path = PathBuf::from("/script"); + container.copy_file_into(script_path, script.as_ref().as_bytes()) + .await + .with_context(|| anyhow!("Copying the script into container {}", container.id())) + .map_err(Error::from) + } + + pub async fn start(self) -> Result<StartedContainer<'a>> { + let container = self.endpoint.docker.containers().get(&self.create_info.id); + let _ = container.start() + .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); }) + .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", self.create_info.id, self.endpoint.name))) + .await?; + + Ok({ + StartedContainer { + endpoint: self.endpoint, + script: self.script, + create_info: self.create_info, + } + }) + } +} + +pub struct StartedContainer<'a> { + endpoint: &'a Endpoint, + script: Script, + create_info: shiplift::rep::ContainerCreateInfo, +} + +impl<'a> StartedContainer<'a> { + pub async fn execute_script(self, logsink: UnboundedSender<LogItem>) -> Result<ExecutedContainer<'a>> { let exec_opts = ExecContainerOptions::builder() .cmd(vec!["/bin/bash", "/script"]) .attach_stderr(true) @@ -235,123 +372,46 @@ impl Endpoint { .build(); trace!("Exec options = {:?}", exec_opts); - let container = self.docker.containers().get(&container_id); - trace!("Container id = {:?}", container_id); - { // copy source to container - use tokio::io::AsyncReadExt; - - job.package_sources() - .into_iter() - .map(|entry| async { - let source_path = entry.path(); - let destination = PathBuf::from("/inputs").join({ - source_path.file_name() - .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) - .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), self.name))? - }); - trace!("Source path = {:?}", source_path); - trace!("Source dest = {:?}", destination); - let mut buf = vec![]; - tokio::fs::OpenOptions::new() - .create(false) - .create_new(false) - .append(false) - .write(false) - .read(true) - .open(&source_path) - .await - .with_context(|| anyhow!("Getting source file: {}", source_path.display()))? - .read_to_end(&mut buf) - .await - .with_context(|| anyhow!("Reading file {}", source_path.display()))?; - - drop(entry); - let _ = container.copy_file_into(destination, &buf).await?; - Ok(()) - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<()>>() - .await - .with_context(|| anyhow!("Copying sources to container {}", container_id))?; - } - { // Copy all Path artifacts to the container - job.resources() - .into_iter() - .filter_map(JobResource::artifact) - .cloned() - .map(|art| async { - let artifact_file_name = art.path().file_name() - .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) - .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container_id))?; - let destination = PathBuf::from("/inputs/").join(artifact_file_name); - trace!("Copying {} to container: {}:{}", art.path().display(), container_id, destination.display()); - let buf = staging - .read() - .await - .root_path() - .join(art.path())? - .read() - .await - .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?; - - let r = container.copy_file_into(&destination, &buf) - .await - .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container_id, destination.display())) - .map_err(Error::from); - drop(art); // ensure `art` is moved into closure - r - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<_>>>() - .await - .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?; - } + let container = self.endpoint.docker.containers().get(&self.create_info.id); - let exited_successfully: Option<(bool, Option<String>)> = container - .copy_file_into(script_path, job.script().as_ref().as_bytes()) - .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); }) - .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name))) - .then(|_| container.start()) - .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); }) - .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name))) - .then(|_| { - trace!("Moving logs to log sink for container {}", container_id); - buffer_stream_to_line_stream(container.exec(&exec_opts)) - .map(|line| { - trace!("['{}':{}] Found log line: {:?}", self.name, container_id, line); - line.with_context(|| anyhow!("Getting log from {}:{}", self.name, container_id)) + trace!("Moving logs to log sink for container {}", self.create_info.id); + let stream = container.exec(&exec_opts); + + let exited_successfully: Option<(bool, Option<String>)> = buffer_stream_to_line_stream(stream) + .map(|line| { + trace!("['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line); + line.with_context(|| anyhow!("Getting log from {}:{}", self.endpoint.name, self.create_info.id)) + .map_err(Error::from) + .and_then(|l| { + crate::log::parser() + .parse(l.as_bytes()) + .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l)) .map_err(Error::from) - .and_then(|l| { - crate::log::parser() - .parse(l.as_bytes()) - .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l)) - .map_err(Error::from) - .and_then(|item| { - - let mut exited_successfully = None; - { - match item { - LogItem::State(Ok(_)) => exited_successfully = Some((true, None)), - LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))), - _ => { - // Nothing - } - } + .and_then(|item| { + + let mut exited_successfully = None; + { + match item { + LogItem::State(Ok(_)) => exited_successfully = Some((true, None)), + LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))), + _ => { + // Nothing } + } + } - trace!("Log item: {}", item.display()?); - logsink.send(item) - .with_context(|| anyhow!("Sending log to log sink")) - .map_err(Error::from) - .map(|_| exited_successfully) - }) + trace!("Log item: {}", item.display()?); + logsink.send(item) + .with_context(|| anyhow!("Sending log to log sink")) + .map_err(Error::from) + .map(|_| exited_successfully) }) }) - .collect::<Result<Vec<_>>>() }) - .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) + .collect::<Result<Vec<_>>>() + .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", self.create_info.id, self.endpoint.name))) .await - .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))? + .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", self.create_info.id))? .into_iter() .fold(None, |accu, elem| match (accu, elem) { (None , b) => b, @@ -361,30 +421,56 @@ impl Endpoint { (Some((true, _)) , Some((true, _))) => Some((true, None)), }); - trace!("Fetching /outputs from container {}", container_id); + Ok({ + ExecutedContainer { + endpoint: self.endpoint, + create_info: self.create_info, + script: self.script, + exit_info: exited_successfully, + } + }) + } + +} + +pub struct ExecutedContainer<'a> { + endpoint: &'a Endpoint, + create_info: shiplift::rep::ContainerCreateInfo, + script: Script, + exit_info: Option<(bool, Option<String>)>, +} + +impl<'a> ExecutedContainer<'a> { + pub fn container_hash(&self) -> ContainerHash { + ContainerHash::from(self.create_info.id.clone()) + } + + pub fn script(&self) -> &Script { + &self.script + } + + pub async fn finalize(self, staging: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> { + trace!("Fetching /outputs from container {}", self.create_info.id); + let container = self.endpoint.docker.containers().get(&self.create_info.id); let tar_stream = container .copy_from(&PathBuf::from("/outputs/")) .map(|item| { - item.with_context(|| anyhow!("Copying item from container {} to host", container_id)) + item.with_context(|| anyhow!("Copying item from container {} to host", self.create_info.id)) .map_err(Error::from) }); - let r = { - let mut writelock = staging.write().await; + let mut writelock = staging.write().await; - writelock - .write_files_from_tar_stream(tar_stream) - .await - .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? - }; + let artifacts = writelock + .write_files_from_tar_stream(tar_stream) + .await + .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; - let script: Script = job.script().clone(); - match exited_successfully { + let exit_info = match self.exit_info { Some((false, msg)) => { - let conthash = ContainerHash::from(container_id); let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.", - container_id = &conthash, - uri = self.uri(), + container_id = self.create_info.id, + uri = self.endpoint.uri(), msg = msg.as_ref().map(String::deref).unwrap_or(""), ); @@ -392,28 +478,37 @@ impl Endpoint { let conterr = Err(Error::from(err)); // Ok because the general process worked. - Ok((conterr, conthash, script)) + conterr }, Some((true, _)) | None => { + let container = self.endpoint.docker.containers().get(&self.create_info.id); container.stop(Some(std::time::Duration::new(1, 0))) .await - .with_context(|| anyhow!("Stopping container {}", container_id))?; - - Ok((Ok(r), ContainerHash::from(container_id), script)) + .with_context(|| anyhow!("Stopping container {}", self.create_info.id))?; + Ok(()) }, - } - } + }; - pub async fn number_of_running_containers(&self) -> Result<usize> { - self.docker - .containers() - .list(&Default::default()) - .await - .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) - .map_err(Error::from) - .map(|list| list.len()) + Ok({ + FinalizedContainer { + artifacts, + exit_info, + } + }) } +} + +#[derive(Debug)] +pub struct FinalizedContainer { + artifacts: Vec<ArtifactPath>, + exit_info: Result<()>, +} +impl FinalizedContainer { + pub fn unpack(self) -> (Vec<ArtifactPath>, Result<()>) { + (self.artifacts, self.exit_info) + } } + diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 74155e7..25165fa 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -138,8 +138,11 @@ impl JobHandle { let envs = self.create_env_in_db()?; let job_id = self.job.uuid().clone(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); - let res = ep - .run_job(self.job, log_sender, self.staging_store); + let running_container = ep.prepare_container(self.job, self.staging_store.clone()) + .await? + .start() + .await? + .execute_script(log_sender); let logres = LogReceiver { package_name: &package.name, @@ -150,19 +153,22 @@ impl JobHandle { bar: &self.bar, }.join(); - let (res, logres) = tokio::join!(res, logres); - - trace!("Found result for job {}: {:?}", job_id, res); + let (run_container, logres) = tokio::join!(running_container, logres); let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; - let (paths, container_hash, script) = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?; - - let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?; + let run_container = run_container.with_context(|| anyhow!("Running container {} failed"))?; + let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &run_container.container_hash(), run_container.script(), &log)?; trace!("DB: Job entry for job {} created: {}", job.uuid, job.id); for env in envs { let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?; } - let paths = paths?; + + let res : crate::endpoint::FinalizedContainer = run_container + .finalize(self.staging_store.clone()) + .await?; + trace!("Found result for job {}: {:?}", job_id, res); + let (paths, res) = res.unpack(); + let _ = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?; // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; |