diff options
2 files changed, 297 insertions, 196 deletions
diff --git a/src/endpoint/ b/src/endpoint/
index 2c72b9b..8ef961a 100644
--- a/src/endpoint/
+++ b/src/endpoint/
@@ -10,7 +10,8 @@ use anyhow::Result;
use anyhow::anyhow;
use futures::FutureExt;
use getset::{Getters, CopyGetters};
-use log::{warn, trace};
+use log::trace;
+use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::stream::StreamExt;
@@ -175,59 +176,195 @@ impl Endpoint {
.map(|_| ())
- /// Run a job
- ///
- /// The return type of this function is a bit complex, so here's an explanation:
- ///
- /// * The outer Result is for indicating whether the general process of running the container
- /// and all related tasks worked.
- ///
- /// The tuple holds the result of the container run itself (the first item), the hash of the
- /// container that was run and the script that was run.
- ///
- /// The script is for reporting and should be written to the database by the caller.
- /// The ContainerHash as well.
- ///
- /// The result inside the tuple is an Err if the container script returned an error.
- /// It is Ok containing the created artifact pathes if the script exited successfully.
- ///
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Result<Vec<ArtifactPath>>, ContainerHash, Script)> {
- let (container_id, _warnings) = {
- let envs = job.environment()
- .into_iter()
- .map(|(k, v)| format!("{}={}", k.as_ref(), v))
- .chain({
- job.package_environment()
- .into_iter()
- .map(|(k, v)| format!("{}={}", k, v))
- })
- .collect::<Vec<_>>();
- trace!("Job resources: Environment variables = {:?}", envs);
- let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
- .env(envs.iter().map(AsRef::as_ref).collect())
- .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later
- .attach_stdin(true) // we have to attach, otherwise bash exits
- .build();
- trace!("Builder options = {:?}", builder_opts);
- let create_info = self.docker
- .containers()
- .create(&builder_opts)
- .await
- .with_context(|| anyhow!("Creating container on '{}'",;
- trace!("Create info = {:?}", create_info);
- if let Some(warnings) = create_info.warnings.as_ref() {
- for warning in warnings {
- warn!("{}", warning);
- }
+ pub async fn prepare_container<'a>(&'a self, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> {
+ PreparedContainer::new(self, job, staging).await
+ }
+ pub async fn number_of_running_containers(&self) -> Result<usize> {
+ self.docker
+ .containers()
+ .list(&Default::default())
+ .await
+ .with_context(|| anyhow!("Getting number of running containers on {}",
+ .map_err(Error::from)
+ .map(|list| list.len())
+ }
+pub struct PreparedContainer<'a> {
+ endpoint: &'a Endpoint,
+ script: Script,
+ create_info: shiplift::rep::ContainerCreateInfo,
+impl<'a> PreparedContainer<'a> {
+ async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> {
+ let script = job.script().clone();
+ let create_info = Self::build_container(endpoint, &job).await?;
+ let container = endpoint.docker.containers().get(&;
+ let (cpysrc, cpyart, cpyscr) = tokio::join!(
+ Self::copy_source_to_container(&container, &job),
+ Self::copy_artifacts_to_container(&container, &job, staging),
+ Self::copy_script_to_container(&container, &script)
+ );
+ let _ = cpysrc
+ .with_context(|| anyhow!("Copying the sources to container {} on '{}'",,;
+ let _ = cpyart
+ .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'",,;
+ let _ = cpyscr
+ .with_context(|| anyhow!("Copying the script to container {} on '{}'",,;
+ Ok({
+ PreparedContainer {
+ endpoint,
+ script,
+ create_info
+ })
+ }
- (, create_info.warnings)
- };
+ async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result<shiplift::rep::ContainerCreateInfo> {
+ let envs = job.environment()
+ .into_iter()
+ .map(|(k, v)| format!("{}={}", k.as_ref(), v))
+ .chain({
+ job.package_environment()
+ .into_iter()
+ .map(|(k, v)| format!("{}={}", k, v))
+ })
+ .collect::<Vec<_>>();
+ trace!("Job resources: Environment variables = {:?}", envs);
+ let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
+ .env(envs.iter().map(AsRef::as_ref).collect())
+ .cmd(vec!["/bin/bash"]) // we start the container with /bin/bash, but exec() the script in it later
+ .attach_stdin(true) // we have to attach, otherwise bash exits
+ .build();
+ trace!("Builder options = {:?}", builder_opts);
+ let create_info = endpoint
+ .docker
+ .containers()
+ .create(&builder_opts)
+ .await
+ .with_context(|| anyhow!("Creating container on '{}'",;
+ trace!("Create info = {:?}", create_info);
+ Ok(create_info)
+ }
+ async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> {
+ use tokio::io::AsyncReadExt;
+ job.package_sources()
+ .into_iter()
+ .map(|entry| async {
+ let source_path = entry.path();
+ let destination = PathBuf::from("/inputs").join({
+ source_path.file_name()
+ .ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
+ .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(),
+ });
+ trace!("Source path = {:?}", source_path);
+ trace!("Source dest = {:?}", destination);
+ let mut buf = vec![];
+ tokio::fs::OpenOptions::new()
+ .create(false)
+ .create_new(false)
+ .append(false)
+ .write(false)
+ .read(true)
+ .open(&source_path)
+ .await
+ .with_context(|| anyhow!("Getting source file: {}", source_path.display()))?
+ .read_to_end(&mut buf)
+ .await
+ .with_context(|| anyhow!("Reading file {}", source_path.display()))?;
+ drop(entry);
+ let _ = container.copy_file_into(destination, &buf).await?;
+ Ok(())
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<()>>()
+ .await
+ .with_context(|| anyhow!("Copying sources to container {}",
+ .map_err(Error::from)
+ }
+ async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<()> {
+ job.resources()
+ .into_iter()
+ .filter_map(JobResource::artifact)
+ .cloned()
+ .map(|art| async {
+ let artifact_file_name = art.path().file_name()
+ .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
+ .with_context(|| anyhow!("Collecting artifacts for copying to container {}",;
+ let destination = PathBuf::from("/inputs/").join(artifact_file_name);
+ trace!("Copying {} to container: {}:{}", art.path().display(),, destination.display());
+ let buf = staging
+ .read()
+ .await
+ .root_path()
+ .join(art.path())?
+ .read()
+ .await
+ .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?;
+ let r = container.copy_file_into(&destination, &buf)
+ .await
+ .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(),, destination.display()))
+ .map_err(Error::from);
+ drop(art); // ensure `art` is moved into closure
+ r
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await
+ .with_context(|| anyhow!("Copying artifacts to container {}",
+ .map_err(Error::from)
+ .map(|_| ())
+ }
+ async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> {
let script_path = PathBuf::from("/script");
+ container.copy_file_into(script_path, script.as_ref().as_bytes())
+ .await
+ .with_context(|| anyhow!("Copying the script into container {}",
+ .map_err(Error::from)
+ }
+ pub async fn start(self) -> Result<StartedContainer<'a>> {
+ let container = self.endpoint.docker.containers().get(&;
+ let _ = container.start()
+ .inspect(|r| { trace!("Starting container {} -> {:?}",, r); })
+ .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'",,
+ .await?;
+ Ok({
+ StartedContainer {
+ endpoint: self.endpoint,
+ script: self.script,
+ create_info: self.create_info,
+ }
+ })
+ }
+pub struct StartedContainer<'a> {
+ endpoint: &'a Endpoint,
+ script: Script,
+ create_info: shiplift::rep::ContainerCreateInfo,
+impl<'a> StartedContainer<'a> {
+ pub async fn execute_script(self, logsink: UnboundedSender<LogItem>) -> Result<ExecutedContainer<'a>> {
let exec_opts = ExecContainerOptions::builder()
.cmd(vec!["/bin/bash", "/script"])
@@ -235,123 +372,46 @@ impl Endpoint {
trace!("Exec options = {:?}", exec_opts);
- let container = self.docker.containers().get(&container_id);
- trace!("Container id = {:?}", container_id);
- { // copy source to container
- use tokio::io::AsyncReadExt;
- job.package_sources()
- .into_iter()
- .map(|entry| async {
- let source_path = entry.path();
- let destination = PathBuf::from("/inputs").join({
- source_path.file_name()
- .ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
- .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(),
- });
- trace!("Source path = {:?}", source_path);
- trace!("Source dest = {:?}", destination);
- let mut buf = vec![];
- tokio::fs::OpenOptions::new()
- .create(false)
- .create_new(false)
- .append(false)
- .write(false)
- .read(true)
- .open(&source_path)
- .await
- .with_context(|| anyhow!("Getting source file: {}", source_path.display()))?
- .read_to_end(&mut buf)
- .await
- .with_context(|| anyhow!("Reading file {}", source_path.display()))?;
- drop(entry);
- let _ = container.copy_file_into(destination, &buf).await?;
- Ok(())
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<()>>()
- .await
- .with_context(|| anyhow!("Copying sources to container {}", container_id))?;
- }
- { // Copy all Path artifacts to the container
- job.resources()
- .into_iter()
- .filter_map(JobResource::artifact)
- .cloned()
- .map(|art| async {
- let artifact_file_name = art.path().file_name()
- .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
- .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container_id))?;
- let destination = PathBuf::from("/inputs/").join(artifact_file_name);
- trace!("Copying {} to container: {}:{}", art.path().display(), container_id, destination.display());
- let buf = staging
- .read()
- .await
- .root_path()
- .join(art.path())?
- .read()
- .await
- .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?;
- let r = container.copy_file_into(&destination, &buf)
- .await
- .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container_id, destination.display()))
- .map_err(Error::from);
- drop(art); // ensure `art` is moved into closure
- r
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await
- .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?;
- }
+ let container = self.endpoint.docker.containers().get(&;
- let exited_successfully: Option<(bool, Option<String>)> = container
- .copy_file_into(script_path, job.script().as_ref().as_bytes())
- .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); })
- .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id,
- .then(|_| container.start())
- .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
- .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id,
- .then(|_| {
- trace!("Moving logs to log sink for container {}", container_id);
- buffer_stream_to_line_stream(container.exec(&exec_opts))
- .map(|line| {
- trace!("['{}':{}] Found log line: {:?}",, container_id, line);
- line.with_context(|| anyhow!("Getting log from {}:{}",, container_id))
+ trace!("Moving logs to log sink for container {}",;
+ let stream = container.exec(&exec_opts);
+ let exited_successfully: Option<(bool, Option<String>)> = buffer_stream_to_line_stream(stream)
+ .map(|line| {
+ trace!("['{}':{}] Found log line: {:?}",,, line);
+ line.with_context(|| anyhow!("Getting log from {}:{}",,
+ .map_err(Error::from)
+ .and_then(|l| {
+ crate::log::parser()
+ .parse(l.as_bytes())
+ .with_context(|| anyhow!("Parsing log from {}:{}: {:?}",,, l))
- .and_then(|l| {
- crate::log::parser()
- .parse(l.as_bytes())
- .with_context(|| anyhow!("Parsing log from {}:{}: {:?}",, container_id, l))
- .map_err(Error::from)
- .and_then(|item| {
- let mut exited_successfully = None;
- {
- match item {
- LogItem::State(Ok(_)) => exited_successfully = Some((true, None)),
- LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))),
- _ => {
- // Nothing
- }
- }
+ .and_then(|item| {
+ let mut exited_successfully = None;
+ {
+ match item {
+ LogItem::State(Ok(_)) => exited_successfully = Some((true, None)),
+ LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))),
+ _ => {
+ // Nothing
+ }
+ }
- trace!("Log item: {}", item.display()?);
- logsink.send(item)
- .with_context(|| anyhow!("Sending log to log sink"))
- .map_err(Error::from)
- .map(|_| exited_successfully)
- })
+ trace!("Log item: {}", item.display()?);
+ logsink.send(item)
+ .with_context(|| anyhow!("Sending log to log sink"))
+ .map_err(Error::from)
+ .map(|_| exited_successfully)
- .collect::<Result<Vec<_>>>()
- .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id,
+ .collect::<Result<Vec<_>>>()
+ .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}",,
- .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?
+ .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}",
.fold(None, |accu, elem| match (accu, elem) {
(None , b) => b,
@@ -361,30 +421,56 @@ impl Endpoint {
(Some((true, _)) , Some((true, _))) => Some((true, None)),
- trace!("Fetching /outputs from container {}", container_id);
+ Ok({
+ ExecutedContainer {
+ endpoint: self.endpoint,
+ create_info: self.create_info,
+ script: self.script,
+ exit_info: exited_successfully,
+ }
+ })
+ }
+pub struct ExecutedContainer<'a> {
+ endpoint: &'a Endpoint,
+ create_info: shiplift::rep::ContainerCreateInfo,
+ script: Script,
+ exit_info: Option<(bool, Option<String>)>,
+impl<'a> ExecutedContainer<'a> {
+ pub fn container_hash(&self) -> ContainerHash {
+ ContainerHash::from(
+ }
+ pub fn script(&self) -> &Script {
+ &self.script
+ }
+ pub async fn finalize(self, staging: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
+ trace!("Fetching /outputs from container {}",;
+ let container = self.endpoint.docker.containers().get(&;
let tar_stream = container
.map(|item| {
- item.with_context(|| anyhow!("Copying item from container {} to host", container_id))
+ item.with_context(|| anyhow!("Copying item from container {} to host",
- let r = {
- let mut writelock = staging.write().await;
+ let mut writelock = staging.write().await;
- writelock
- .write_files_from_tar_stream(tar_stream)
- .await
- .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
- };
+ let artifacts = writelock
+ .write_files_from_tar_stream(tar_stream)
+ .await
+ .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?;
- let script: Script = job.script().clone();
- match exited_successfully {
+ let exit_info = match self.exit_info {
Some((false, msg)) => {
- let conthash = ContainerHash::from(container_id);
let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.",
- container_id = &conthash,
- uri = self.uri(),
+ container_id =,
+ uri = self.endpoint.uri(),
msg = msg.as_ref().map(String::deref).unwrap_or(""),
@@ -392,28 +478,37 @@ impl Endpoint {
let conterr = Err(Error::from(err));
// Ok because the general process worked.
- Ok((conterr, conthash, script))
+ conterr
Some((true, _)) | None => {
+ let container = self.endpoint.docker.containers().get(&;
container.stop(Some(std::time::Duration::new(1, 0)))
- .with_context(|| anyhow!("Stopping container {}", container_id))?;
- Ok((Ok(r), ContainerHash::from(container_id), script))
+ .with_context(|| anyhow!("Stopping container {}",;
+ Ok(())
- }
- }
+ };
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}",
- .map_err(Error::from)
- .map(|list| list.len())
+ Ok({
+ FinalizedContainer {
+ artifacts,
+ exit_info,
+ }
+ })
+pub struct FinalizedContainer {
+ artifacts: Vec<ArtifactPath>,
+ exit_info: Result<()>,
+impl FinalizedContainer {
+ pub fn unpack(self) -> (Vec<ArtifactPath>, Result<()>) {
+ (self.artifacts, self.exit_info)
+ }
diff --git a/src/endpoint/ b/src/endpoint/
index 74155e7..25165fa 100644
--- a/src/endpoint/
+++ b/src/endpoint/
@@ -138,8 +138,11 @@ impl JobHandle {
let envs = self.create_env_in_db()?;
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id,;
- let res = ep
- .run_job(self.job, log_sender, self.staging_store);
+ let running_container = ep.prepare_container(self.job, self.staging_store.clone())
+ .await?
+ .start()
+ .await?
+ .execute_script(log_sender);
let logres = LogReceiver {
package_name: &,
@@ -150,19 +153,22 @@ impl JobHandle {
bar: &,
- let (res, logres) = tokio::join!(res, logres);
- trace!("Found result for job {}: {:?}", job_id, res);
+ let (run_container, logres) = tokio::join!(running_container, logres);
let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'",;
- let (paths, container_hash, script) = res.with_context(|| anyhow!("Error during running job on '{}'",;
- let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
+ let run_container = run_container.with_context(|| anyhow!("Running container {} failed"))?;
+ let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &run_container.container_hash(), run_container.script(), &log)?;
trace!("DB: Job entry for job {} created: {}", job.uuid,;
for env in envs {
let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?;
- let paths = paths?;
+ let res : crate::endpoint::FinalizedContainer = run_container
+ .finalize(self.staging_store.clone())
+ .await?;
+ trace!("Found result for job {}: {:?}", job_id, res);
+ let (paths, res) = res.unpack();
+ let _ = res.with_context(|| anyhow!("Error during running job on '{}'",;
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];