summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cli.rs8
-rw-r--r--src/commands/build.rs2
-rw-r--r--src/config/not_validated.rs3
-rw-r--r--src/endpoint/scheduler.rs32
-rw-r--r--src/orchestrator/orchestrator.rs6
5 files changed, 45 insertions, 6 deletions
diff --git a/src/cli.rs b/src/cli.rs
index d41e415..00beb5b 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -238,6 +238,14 @@ pub fn cli<'a>() -> App<'a> {
.long("image")
.help("Name of the docker image to use")
)
+
+ .arg(Arg::with_name("write-log-file")
+ .required(false)
+ .multiple(false)
+ .long("write-log")
+ .short('L')
+ .help("Write the log not only to database, but also in a plain-text-file")
+ )
)
.subcommand(App::new("what-depends")
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 08d7804..6fc2312 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -193,7 +193,7 @@ pub async fn build<'a>(matches: &ArgMatches,
.database(database_connection)
.source_cache(source_cache)
.submit(submit)
- .file_log_sink_factory(None)
+ .log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None })
.jobsets(jobsets)
.build()
.setup()
diff --git a/src/config/not_validated.rs b/src/config/not_validated.rs
index 87bb816..b3f9f8a 100644
--- a/src/config/not_validated.rs
+++ b/src/config/not_validated.rs
@@ -16,6 +16,9 @@ pub struct NotValidatedConfiguration {
#[getset(get = "pub")]
repository: PathBuf,
+ #[getset(get = "pub")]
+ log_dir: PathBuf,
+
#[serde(default = "default_progress_format")]
#[getset(get = "pub")]
progress_format: String,
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 90fb991..cc20444 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -10,6 +10,7 @@ use futures::FutureExt;
use indicatif::ProgressBar;
use itertools::Itertools;
use tokio::stream::StreamExt;
+use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
@@ -23,6 +24,7 @@ use crate::log::LogItem;
use crate::util::progress::ProgressBars;
pub struct EndpointScheduler {
+ log_dir: Option<PathBuf>,
endpoints: Vec<Arc<RwLock<Endpoint>>>,
staging_store: Arc<RwLock<StagingStore>>,
@@ -33,10 +35,11 @@ pub struct EndpointScheduler {
impl EndpointScheduler {
- pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit) -> Result<Self> {
+ pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
+ log_dir,
endpoints,
staging_store,
db,
@@ -70,6 +73,7 @@ impl EndpointScheduler {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
+ log_dir: self.log_dir.clone(),
bar: multibar.add(self.progressbars.job_bar(job.uuid())),
endpoint,
job,
@@ -105,6 +109,7 @@ impl EndpointScheduler {
}
pub struct JobHandle {
+ log_dir: Option<PathBuf>,
endpoint: Arc<RwLock<Endpoint>>,
job: RunnableJob,
bar: ProgressBar,
@@ -137,6 +142,7 @@ impl JobHandle {
.run_job(self.job, log_sender, self.staging_store);
let logres = LogReceiver {
+ log_dir: self.log_dir.as_ref(),
job_id,
log_receiver,
bar: &self.bar,
@@ -156,6 +162,7 @@ impl JobHandle {
}
struct LogReceiver<'a> {
+ log_dir: Option<&'a PathBuf>,
job_id: Uuid,
log_receiver: UnboundedReceiver<LogItem>,
bar: &'a ProgressBar,
@@ -166,10 +173,30 @@ impl<'a> LogReceiver<'a> {
async fn join(mut self) -> Result<String> {
use resiter::Map;
+ let mut logfile = if let Some(log_dir) = self.log_dir.as_ref() {
+ Some({
+ let path = log_dir.join(self.job_id.to_string()).join(".log");
+ tokio::fs::OpenOptions::new()
+ .create(true)
+ .create_new(true)
+ .write(true)
+ .open(path)
+ .await
+ .map(tokio::io::BufWriter::new)?
+ })
+ } else {
+ None
+ };
+
let mut success = None;
let mut accu = vec![];
while let Some(logitem) = self.log_receiver.recv().await {
+ if let Some(lf) = logfile.as_mut() {
+ lf.write_all(logitem.display()?.to_string().as_bytes()).await?;
+ lf.write_all("\n".as_bytes()).await?;
+ }
+
match logitem {
LogItem::Line(ref l) => {
// ignore
@@ -205,6 +232,9 @@ impl<'a> LogReceiver<'a> {
}
drop(self.bar);
+ if let Some(mut lf) = logfile {
+ let _ = lf.flush().await?;
+ }
Ok({
accu.into_iter()
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 010a573..e42582e 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -32,7 +32,6 @@ pub struct Orchestrator {
source_cache: SourceCache,
jobsets: Vec<JobSet>,
database: Arc<PgConnection>,
- file_log_sink_factory: Option<FileLogSinkFactory>,
}
#[derive(TypedBuilder)]
@@ -45,13 +44,13 @@ pub struct OrchestratorSetup {
jobsets: Vec<JobSet>,
database: PgConnection,
submit: Submit,
- file_log_sink_factory: Option<FileLogSinkFactory>,
+ log_dir: Option<PathBuf>,
}
impl OrchestratorSetup {
pub async fn setup(self) -> Result<Orchestrator> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone()).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir).await?;
Ok(Orchestrator {
progress_generator: self.progress_generator,
@@ -61,7 +60,6 @@ impl OrchestratorSetup {
source_cache: self.source_cache,
jobsets: self.jobsets,
database: db,
- file_log_sink_factory: self.file_log_sink_factory,
})
}
}