diff options
-rw-r--r-- | src/cli.rs | 8 | ||||
-rw-r--r-- | src/commands/build.rs | 2 | ||||
-rw-r--r-- | src/config/not_validated.rs | 3 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 32 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 6 |
5 files changed, 45 insertions, 6 deletions
@@ -238,6 +238,14 @@ pub fn cli<'a>() -> App<'a> { .long("image") .help("Name of the docker image to use") ) + + .arg(Arg::with_name("write-log-file") + .required(false) + .multiple(false) + .long("write-log") + .short('L') + .help("Write the log not only to database, but also in a plain-text-file") + ) ) .subcommand(App::new("what-depends") diff --git a/src/commands/build.rs b/src/commands/build.rs index 08d7804..6fc2312 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -193,7 +193,7 @@ pub async fn build<'a>(matches: &ArgMatches, .database(database_connection) .source_cache(source_cache) .submit(submit) - .file_log_sink_factory(None) + .log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None }) .jobsets(jobsets) .build() .setup() diff --git a/src/config/not_validated.rs b/src/config/not_validated.rs index 87bb816..b3f9f8a 100644 --- a/src/config/not_validated.rs +++ b/src/config/not_validated.rs @@ -16,6 +16,9 @@ pub struct NotValidatedConfiguration { #[getset(get = "pub")] repository: PathBuf, + #[getset(get = "pub")] + log_dir: PathBuf, + #[serde(default = "default_progress_format")] #[getset(get = "pub")] progress_format: String, diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 90fb991..cc20444 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -10,6 +10,7 @@ use futures::FutureExt; use indicatif::ProgressBar; use itertools::Itertools; use tokio::stream::StreamExt; +use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; @@ -23,6 +24,7 @@ use crate::log::LogItem; use crate::util::progress::ProgressBars; pub struct EndpointScheduler { + log_dir: Option<PathBuf>, endpoints: Vec<Arc<RwLock<Endpoint>>>, staging_store: Arc<RwLock<StagingStore>>, @@ -33,10 +35,11 @@ pub struct EndpointScheduler { impl EndpointScheduler { - pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit) -> Result<Self> { + pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { + log_dir, endpoints, staging_store, db, @@ -70,6 +73,7 @@ impl EndpointScheduler { let endpoint = self.select_free_endpoint().await?; Ok(JobHandle { + log_dir: self.log_dir.clone(), bar: multibar.add(self.progressbars.job_bar(job.uuid())), endpoint, job, @@ -105,6 +109,7 @@ impl EndpointScheduler { } pub struct JobHandle { + log_dir: Option<PathBuf>, endpoint: Arc<RwLock<Endpoint>>, job: RunnableJob, bar: ProgressBar, @@ -137,6 +142,7 @@ impl JobHandle { .run_job(self.job, log_sender, self.staging_store); let logres = LogReceiver { + log_dir: self.log_dir.as_ref(), job_id, log_receiver, bar: &self.bar, @@ -156,6 +162,7 @@ impl JobHandle { } struct LogReceiver<'a> { + log_dir: Option<&'a PathBuf>, job_id: Uuid, log_receiver: UnboundedReceiver<LogItem>, bar: &'a ProgressBar, @@ -166,10 +173,30 @@ impl<'a> LogReceiver<'a> { async fn join(mut self) -> Result<String> { use resiter::Map; + let mut logfile = if let Some(log_dir) = self.log_dir.as_ref() { + Some({ + let path = log_dir.join(self.job_id.to_string()).join(".log"); + tokio::fs::OpenOptions::new() + .create(true) + .create_new(true) + .write(true) + .open(path) + .await + .map(tokio::io::BufWriter::new)? + }) + } else { + None + }; + let mut success = None; let mut accu = vec![]; while let Some(logitem) = self.log_receiver.recv().await { + if let Some(lf) = logfile.as_mut() { + lf.write_all(logitem.display()?.to_string().as_bytes()).await?; + lf.write_all("\n".as_bytes()).await?; + } + match logitem { LogItem::Line(ref l) => { // ignore @@ -205,6 +232,9 @@ impl<'a> LogReceiver<'a> { } drop(self.bar); + if let Some(mut lf) = logfile { + let _ = lf.flush().await?; + } Ok({ accu.into_iter() diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 010a573..e42582e 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -32,7 +32,6 @@ pub struct Orchestrator { source_cache: SourceCache, jobsets: Vec<JobSet>, database: Arc<PgConnection>, - file_log_sink_factory: Option<FileLogSinkFactory>, } #[derive(TypedBuilder)] @@ -45,13 +44,13 @@ pub struct OrchestratorSetup { jobsets: Vec<JobSet>, database: PgConnection, submit: Submit, - file_log_sink_factory: Option<FileLogSinkFactory>, + log_dir: Option<PathBuf>, } impl OrchestratorSetup { pub async fn setup(self) -> Result<Orchestrator> { let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone()).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir).await?; Ok(Orchestrator { progress_generator: self.progress_generator, @@ -61,7 +60,6 @@ impl OrchestratorSetup { source_cache: self.source_cache, jobsets: self.jobsets, database: db, - file_log_sink_factory: self.file_log_sink_factory, }) } } |