summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-12-15 09:39:54 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-12-15 10:14:26 +0100
commitf4592ab5d21f4e4dcd23f0153699b27b687bfc65 (patch)
treee19fb185708ae441e0c07aadabb7e07395383288 /src/endpoint
parent352390565280165af60a6597247d4ed4008482f6 (diff)
Refactor job running on endpoint
This commit refactors the job running in the endpoint implementation to be a multi-stage (multi-function-call) process. Before this patch, preparing, starting and running the container as well as shutdown of the container was one huge function. This was sub-optimal, because we had to accummulate information during the run and be extra-careful when returning from that function. Ultimatively, we did not succeed in doing a good job here, which resulted in data-loss if a script exited to early because the database-code was just not hit yet. Consider the following packaging script: exit 1 this does nothing. This does, in fact, not even notify butido that it errored. This is not a scientific scenario, because a valid script might exit in the first command (unintentionally) and thus result in the same as the above example. The issue here is, that this causes butido to continue operation as normal -> It tries to copy the artifacts from the container. This, of course, then fails and butido stops processing. Because of that, the job information is never written to the database, because butido exits before that can happen. With this patch, the whole chain of container-orchestration commands gets split up into multiple, chainable functions. There is a function to prepare the container, which returns information that can then be used to execute the container, which returns information that can be used to finalize the container. You get the idea. Because this is multi-staged now, information can be retrieved and processed _in between these steps_. This results in us being able to write information to the database as soon as possible, which is what we do now. Of course this is only the refactoring patch and butido runs with this. More fixes and minor tweaks in the whole processing chain might be required to make the whole process even smoother. But this can be the stepping stone for such improvements. Tested-by: Matthias Beyer <mail@beyermatthias.de> Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/configured.rs469
-rw-r--r--src/endpoint/scheduler.rs24
2 files changed, 297 insertions, 196 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 2c72b9b..8ef961a 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -10,7 +10,8 @@ use anyhow::Result;
use anyhow::anyhow;
use futures::FutureExt;
use getset::{Getters, CopyGetters};
-use log::{warn, trace};
+use log::trace;
+use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::stream::StreamExt;
@@ -175,59 +176,195 @@ impl Endpoint {
.map(|_| ())
}
- /// Run a job
- ///
- /// The return type of this function is a bit complex, so here's an explanation:
- ///
- /// * The outer Result is for indicating whether the general process of running the container
- /// and all related tasks worked.
- ///
- /// The tuple holds the result of the container run itself (the first item), the hash of the
- /// container that was run and the script that was run.
- ///
- /// The script is for reporting and should be written to the database by the caller.
- /// The ContainerHash as well.
- ///
- /// The result inside the tuple is an Err if the container script returned an error.
- /// It is Ok containing the created artifact pathes if the script exited successfully.
- ///
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Result<Vec<ArtifactPath>>, ContainerHash, Script)> {
- let (container_id, _warnings) = {
- let envs = job.environment()
- .into_iter()
- .map(|(k, v)| format!("{}={}", k.as_ref(), v))
- .chain({
- job.package_environment()
- .into_iter()
- .map(|(k, v)| format!("{}={}", k, v))
- })
- .collect::<Vec<_>>();
- trace!("Job resources: Environment variables = {:?}", envs);
-
- let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
- .env(envs.iter().map(AsRef::as_ref).collect())
- .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later
- .attach_stdin(true) // we have to attach, otherwise bash exits
- .build();
- trace!("Builder options = {:?}", builder_opts);
-
- let create_info = self.docker
- .containers()
- .create(&builder_opts)
- .await
- .with_context(|| anyhow!("Creating container on '{}'", self.name))?;
- trace!("Create info = {:?}", create_info);
-
- if let Some(warnings) = create_info.warnings.as_ref() {
- for warning in warnings {
- warn!("{}", warning);
- }
+
+ pub async fn prepare_container<'a>(&'a self, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> {
+ PreparedContainer::new(self, job, staging).await
+ }
+
+ pub async fn number_of_running_containers(&self) -> Result<usize> {
+ self.docker
+ .containers()
+ .list(&Default::default())
+ .await
+ .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
+ .map_err(Error::from)
+ .map(|list| list.len())
+ }
+
+}
+
+pub struct PreparedContainer<'a> {
+ endpoint: &'a Endpoint,
+ script: Script,
+ create_info: shiplift::rep::ContainerCreateInfo,
+}
+
+impl<'a> PreparedContainer<'a> {
+ async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> {
+ let script = job.script().clone();
+ let create_info = Self::build_container(endpoint, &job).await?;
+ let container = endpoint.docker.containers().get(&create_info.id);
+
+ let (cpysrc, cpyart, cpyscr) = tokio::join!(
+ Self::copy_source_to_container(&container, &job),
+ Self::copy_artifacts_to_container(&container, &job, staging),
+ Self::copy_script_to_container(&container, &script)
+ );
+
+ let _ = cpysrc
+ .with_context(|| anyhow!("Copying the sources to container {} on '{}'", create_info.id, endpoint.name))?;
+
+ let _ = cpyart
+ .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name))?;
+
+ let _ = cpyscr
+ .with_context(|| anyhow!("Copying the script to container {} on '{}'", create_info.id, endpoint.name))?;
+
+ Ok({
+ PreparedContainer {
+ endpoint,
+ script,
+ create_info
}
+ })
+ }
- (create_info.id, create_info.warnings)
- };
+ async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result<shiplift::rep::ContainerCreateInfo> {
+ let envs = job.environment()
+ .into_iter()
+ .map(|(k, v)| format!("{}={}", k.as_ref(), v))
+ .chain({
+ job.package_environment()
+ .into_iter()
+ .map(|(k, v)| format!("{}={}", k, v))
+ })
+ .collect::<Vec<_>>();
+ trace!("Job resources: Environment variables = {:?}", envs);
+
+ let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
+ .env(envs.iter().map(AsRef::as_ref).collect())
+ .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later
+ .attach_stdin(true) // we have to attach, otherwise bash exits
+ .build();
+ trace!("Builder options = {:?}", builder_opts);
+
+ let create_info = endpoint
+ .docker
+ .containers()
+ .create(&builder_opts)
+ .await
+ .with_context(|| anyhow!("Creating container on '{}'", endpoint.name))?;
+ trace!("Create info = {:?}", create_info);
+ Ok(create_info)
+ }
+
+ async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> {
+ use tokio::io::AsyncReadExt;
+
+ job.package_sources()
+ .into_iter()
+ .map(|entry| async {
+ let source_path = entry.path();
+ let destination = PathBuf::from("/inputs").join({
+ source_path.file_name()
+ .ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
+ .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), container.id()))?
+ });
+ trace!("Source path = {:?}", source_path);
+ trace!("Source dest = {:?}", destination);
+ let mut buf = vec![];
+ tokio::fs::OpenOptions::new()
+ .create(false)
+ .create_new(false)
+ .append(false)
+ .write(false)
+ .read(true)
+ .open(&source_path)
+ .await
+ .with_context(|| anyhow!("Getting source file: {}", source_path.display()))?
+ .read_to_end(&mut buf)
+ .await
+ .with_context(|| anyhow!("Reading file {}", source_path.display()))?;
+
+ drop(entry);
+ let _ = container.copy_file_into(destination, &buf).await?;
+ Ok(())
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<()>>()
+ .await
+ .with_context(|| anyhow!("Copying sources to container {}", container.id()))
+ .map_err(Error::from)
+ }
+
+ async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<()> {
+ job.resources()
+ .into_iter()
+ .filter_map(JobResource::artifact)
+ .cloned()
+ .map(|art| async {
+ let artifact_file_name = art.path().file_name()
+ .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
+ .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container.id()))?;
+ let destination = PathBuf::from("/inputs/").join(artifact_file_name);
+ trace!("Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display());
+ let buf = staging
+ .read()
+ .await
+ .root_path()
+ .join(art.path())?
+ .read()
+ .await
+ .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?;
+
+ let r = container.copy_file_into(&destination, &buf)
+ .await
+ .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display()))
+ .map_err(Error::from);
+ drop(art); // ensure `art` is moved into closure
+ r
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await
+ .with_context(|| anyhow!("Copying artifacts to container {}", container.id()))
+ .map_err(Error::from)
+ .map(|_| ())
+ }
+ async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> {
let script_path = PathBuf::from("/script");
+ container.copy_file_into(script_path, script.as_ref().as_bytes())
+ .await
+ .with_context(|| anyhow!("Copying the script into container {}", container.id()))
+ .map_err(Error::from)
+ }
+
+ pub async fn start(self) -> Result<StartedContainer<'a>> {
+ let container = self.endpoint.docker.containers().get(&self.create_info.id);
+ let _ = container.start()
+ .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); })
+ .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", self.create_info.id, self.endpoint.name)))
+ .await?;
+
+ Ok({
+ StartedContainer {
+ endpoint: self.endpoint,
+ script: self.script,
+ create_info: self.create_info,
+ }
+ })
+ }
+}
+
+pub struct StartedContainer<'a> {
+ endpoint: &'a Endpoint,
+ script: Script,
+ create_info: shiplift::rep::ContainerCreateInfo,
+}
+
+impl<'a> StartedContainer<'a> {
+ pub async fn execute_script(self, logsink: UnboundedSender<LogItem>) -> Result<ExecutedContainer<'a>> {
let exec_opts = ExecContainerOptions::builder()
.cmd(vec!["/bin/bash", "/script"])
.attach_stderr(true)
@@ -235,123 +372,46 @@ impl Endpoint {
.build();
trace!("Exec options = {:?}", exec_opts);
- let container = self.docker.containers().get(&container_id);
- trace!("Container id = {:?}", container_id);
- { // copy source to container
- use tokio::io::AsyncReadExt;
-
- job.package_sources()
- .into_iter()
- .map(|entry| async {
- let source_path = entry.path();
- let destination = PathBuf::from("/inputs").join({
- source_path.file_name()
- .ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
- .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), self.name))?
- });
- trace!("Source path = {:?}", source_path);
- trace!("Source dest = {:?}", destination);
- let mut buf = vec![];
- tokio::fs::OpenOptions::new()
- .create(false)
- .create_new(false)
- .append(false)
- .write(false)
- .read(true)
- .open(&source_path)
- .await
- .with_context(|| anyhow!("Getting source file: {}", source_path.display()))?
- .read_to_end(&mut buf)
- .await
- .with_context(|| anyhow!("Reading file {}", source_path.display()))?;
-
- drop(entry);
- let _ = container.copy_file_into(destination, &buf).await?;
- Ok(())
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<()>>()
- .await
- .with_context(|| anyhow!("Copying sources to container {}", container_id))?;
- }
- { // Copy all Path artifacts to the container
- job.resources()
- .into_iter()
- .filter_map(JobResource::artifact)
- .cloned()
- .map(|art| async {
- let artifact_file_name = art.path().file_name()
- .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
- .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container_id))?;
- let destination = PathBuf::from("/inputs/").join(artifact_file_name);
- trace!("Copying {} to container: {}:{}", art.path().display(), container_id, destination.display());
- let buf = staging
- .read()
- .await
- .root_path()
- .join(art.path())?
- .read()
- .await
- .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?;
-
- let r = container.copy_file_into(&destination, &buf)
- .await
- .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container_id, destination.display()))
- .map_err(Error::from);
- drop(art); // ensure `art` is moved into closure
- r
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await
- .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?;
- }
+ let container = self.endpoint.docker.containers().get(&self.create_info.id);
- let exited_successfully: Option<(bool, Option<String>)> = container
- .copy_file_into(script_path, job.script().as_ref().as_bytes())
- .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); })
- .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name)))
- .then(|_| container.start())
- .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
- .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
- .then(|_| {
- trace!("Moving logs to log sink for container {}", container_id);
- buffer_stream_to_line_stream(container.exec(&exec_opts))
- .map(|line| {
- trace!("['{}':{}] Found log line: {:?}", self.name, container_id, line);
- line.with_context(|| anyhow!("Getting log from {}:{}", self.name, container_id))
+ trace!("Moving logs to log sink for container {}", self.create_info.id);
+ let stream = container.exec(&exec_opts);
+
+ let exited_successfully: Option<(bool, Option<String>)> = buffer_stream_to_line_stream(stream)
+ .map(|line| {
+ trace!("['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line);
+ line.with_context(|| anyhow!("Getting log from {}:{}", self.endpoint.name, self.create_info.id))
+ .map_err(Error::from)
+ .and_then(|l| {
+ crate::log::parser()
+ .parse(l.as_bytes())
+ .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l))
.map_err(Error::from)
- .and_then(|l| {
- crate::log::parser()
- .parse(l.as_bytes())
- .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l))
- .map_err(Error::from)
- .and_then(|item| {
-
- let mut exited_successfully = None;
- {
- match item {
- LogItem::State(Ok(_)) => exited_successfully = Some((true, None)),
- LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))),
- _ => {
- // Nothing
- }
- }
+ .and_then(|item| {
+
+ let mut exited_successfully = None;
+ {
+ match item {
+ LogItem::State(Ok(_)) => exited_successfully = Some((true, None)),
+ LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))),
+ _ => {
+ // Nothing
}
+ }
+ }
- trace!("Log item: {}", item.display()?);
- logsink.send(item)
- .with_context(|| anyhow!("Sending log to log sink"))
- .map_err(Error::from)
- .map(|_| exited_successfully)
- })
+ trace!("Log item: {}", item.display()?);
+ logsink.send(item)
+ .with_context(|| anyhow!("Sending log to log sink"))
+ .map_err(Error::from)
+ .map(|_| exited_successfully)
})
})
- .collect::<Result<Vec<_>>>()
})
- .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
+ .collect::<Result<Vec<_>>>()
+ .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", self.create_info.id, self.endpoint.name)))
.await
- .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?
+ .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", self.create_info.id))?
.into_iter()
.fold(None, |accu, elem| match (accu, elem) {
(None , b) => b,
@@ -361,30 +421,56 @@ impl Endpoint {
(Some((true, _)) , Some((true, _))) => Some((true, None)),
});
- trace!("Fetching /outputs from container {}", container_id);
+ Ok({
+ ExecutedContainer {
+ endpoint: self.endpoint,
+ create_info: self.create_info,
+ script: self.script,
+ exit_info: exited_successfully,
+ }
+ })
+ }
+
+}
+
+pub struct ExecutedContainer<'a> {
+ endpoint: &'a Endpoint,
+ create_info: shiplift::rep::ContainerCreateInfo,
+ script: Script,
+ exit_info: Option<(bool, Option<String>)>,
+}
+
+impl<'a> ExecutedContainer<'a> {
+ pub fn container_hash(&self) -> ContainerHash {
+ ContainerHash::from(self.create_info.id.clone())
+ }
+
+ pub fn script(&self) -> &Script {
+ &self.script
+ }
+
+ pub async fn finalize(self, staging: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
+ trace!("Fetching /outputs from container {}", self.create_info.id);
+ let container = self.endpoint.docker.containers().get(&self.create_info.id);
let tar_stream = container
.copy_from(&PathBuf::from("/outputs/"))
.map(|item| {
- item.with_context(|| anyhow!("Copying item from container {} to host", container_id))
+ item.with_context(|| anyhow!("Copying item from container {} to host", self.create_info.id))
.map_err(Error::from)
});
- let r = {
- let mut writelock = staging.write().await;
+ let mut writelock = staging.write().await;
- writelock
- .write_files_from_tar_stream(tar_stream)
- .await
- .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
- };
+ let artifacts = writelock
+ .write_files_from_tar_stream(tar_stream)
+ .await
+ .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?;
- let script: Script = job.script().clone();
- match exited_successfully {
+ let exit_info = match self.exit_info {
Some((false, msg)) => {
- let conthash = ContainerHash::from(container_id);
let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.",
- container_id = &conthash,
- uri = self.uri(),
+ container_id = self.create_info.id,
+ uri = self.endpoint.uri(),
msg = msg.as_ref().map(String::deref).unwrap_or(""),
);
@@ -392,28 +478,37 @@ impl Endpoint {
let conterr = Err(Error::from(err));
// Ok because the general process worked.
- Ok((conterr, conthash, script))
+ conterr
},
Some((true, _)) | None => {
+ let container = self.endpoint.docker.containers().get(&self.create_info.id);
container.stop(Some(std::time::Duration::new(1, 0)))
.await
- .with_context(|| anyhow!("Stopping container {}", container_id))?;
-
- Ok((Ok(r), ContainerHash::from(container_id), script))
+ .with_context(|| anyhow!("Stopping container {}", self.create_info.id))?;
+ Ok(())
},
- }
- }
+ };
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
- .map_err(Error::from)
- .map(|list| list.len())
+ Ok({
+ FinalizedContainer {
+ artifacts,
+ exit_info,
+ }
+ })
}
+}
+
+#[derive(Debug)]
+pub struct FinalizedContainer {
+ artifacts: Vec<ArtifactPath>,
+ exit_info: Result<()>,
+}
+impl FinalizedContainer {
+ pub fn unpack(self) -> (Vec<ArtifactPath>, Result<()>) {
+ (self.artifacts, self.exit_info)
+ }
}
+
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 74155e7..25165fa 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -138,8 +138,11 @@ impl JobHandle {
let envs = self.create_env_in_db()?;
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
- let res = ep
- .run_job(self.job, log_sender, self.staging_store);
+ let running_container = ep.prepare_container(self.job, self.staging_store.clone())
+ .await?
+ .start()
+ .await?
+ .execute_script(log_sender);
let logres = LogReceiver {
package_name: &package.name,
@@ -150,19 +153,22 @@ impl JobHandle {
bar: &self.bar,
}.join();
- let (res, logres) = tokio::join!(res, logres);
-
- trace!("Found result for job {}: {:?}", job_id, res);
+ let (run_container, logres) = tokio::join!(running_container, logres);
let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
- let (paths, container_hash, script) = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?;
-
- let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
+ let run_container = run_container.with_context(|| anyhow!("Running container {} failed"))?;
+ let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &run_container.container_hash(), run_container.script(), &log)?;
trace!("DB: Job entry for job {} created: {}", job.uuid, job.id);
for env in envs {
let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?;
}
- let paths = paths?;
+
+ let res : crate::endpoint::FinalizedContainer = run_container
+ .finalize(self.staging_store.clone())
+ .await?;
+ trace!("Found result for job {}: {:?}", job_id, res);
+ let (paths, res) = res.unpack();
+ let _ = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?;
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];