summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-06 13:22:42 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-06 13:27:12 +0100
commit9e81c7c913d6363315c36229ad97df3891d21b06 (patch)
treed47e3fb6641dee25b7a24f6f4820993719f46236 /src/endpoint/configured.rs
parent228e2a2836f12227dfe2b1725eff123d774670f9 (diff)
Chain futures properly
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 9a57682..9d5c5da 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -159,6 +159,7 @@ impl Endpoint {
pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> {
use crate::log::buffer_stream_to_line_stream;
use tokio::stream::StreamExt;
+ use futures::FutureExt;
let (container_id, _warnings) = {
let envs: Vec<String> = job.resources()
@@ -192,19 +193,28 @@ impl Endpoint {
.build();
let container = self.docker.containers().get(&container_id);
- container.copy_file_into(script_path, script).await?;
- let stream = container.exec(&exec_opts);
- let _ = buffer_stream_to_line_stream(stream)
- .map(|line| {
- line.map_err(Error::from)
- .and_then(|l| {
- crate::log::parser()
- .parse(l.as_bytes())
- .map_err(Error::from)
- .and_then(|item| logsink.send(item).map_err(Error::from))
+ container
+ .copy_file_into(script_path, script)
+ .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name)))
+ .then(|_| container.start())
+ .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
+ .then(|_| async {
+ let stream = container.exec(&exec_opts);
+ buffer_stream_to_line_stream(stream)
+ .map(|line| {
+ trace!("['{}':{}] Found log line: {:?}", self.name, container_id, line);
+ line.map_err(Error::from)
+ .and_then(|l| {
+ crate::log::parser()
+ .parse(l.as_bytes())
+ .map_err(Error::from)
+ .and_then(|item| logsink.send(item).map_err(Error::from))
+ })
})
+ .collect::<Result<Vec<_>>>()
+ .await
})
- .collect::<Result<Vec<_>>>()
+ .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
.await?;
let tar_stream = container.copy_from(&PathBuf::from("/outputs/"))