diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-06 13:22:42 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-06 13:27:12 +0100 |
commit | 9e81c7c913d6363315c36229ad97df3891d21b06 (patch) | |
tree | d47e3fb6641dee25b7a24f6f4820993719f46236 /src/endpoint/configured.rs | |
parent | 228e2a2836f12227dfe2b1725eff123d774670f9 (diff) |
Chain futures properly
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r-- | src/endpoint/configured.rs | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 9a57682..9d5c5da 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -159,6 +159,7 @@ impl Endpoint { pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> { use crate::log::buffer_stream_to_line_stream; use tokio::stream::StreamExt; + use futures::FutureExt; let (container_id, _warnings) = { let envs: Vec<String> = job.resources() @@ -192,19 +193,28 @@ impl Endpoint { .build(); let container = self.docker.containers().get(&container_id); - container.copy_file_into(script_path, script).await?; - let stream = container.exec(&exec_opts); - let _ = buffer_stream_to_line_stream(stream) - .map(|line| { - line.map_err(Error::from) - .and_then(|l| { - crate::log::parser() - .parse(l.as_bytes()) - .map_err(Error::from) - .and_then(|item| logsink.send(item).map_err(Error::from)) + container + .copy_file_into(script_path, script) + .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name))) + .then(|_| container.start()) + .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name))) + .then(|_| async { + let stream = container.exec(&exec_opts); + buffer_stream_to_line_stream(stream) + .map(|line| { + trace!("['{}':{}] Found log line: {:?}", self.name, container_id, line); + line.map_err(Error::from) + .and_then(|l| { + crate::log::parser() + .parse(l.as_bytes()) + .map_err(Error::from) + .and_then(|item| logsink.send(item).map_err(Error::from)) + }) }) + .collect::<Result<Vec<_>>>() + .await }) - .collect::<Result<Vec<_>>>() + .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) .await?; let tar_stream = container.copy_from(&PathBuf::from("/outputs/")) |