diff options
-rw-r--r-- | src/commands/db.rs | 13 | ||||
-rw-r--r-- | src/commands/env_of.rs | 2 | ||||
-rw-r--r-- | src/commands/find_pkg.rs | 1 | ||||
-rw-r--r-- | src/commands/source.rs | 1 | ||||
-rw-r--r-- | src/config/not_validated.rs | 2 | ||||
-rw-r--r-- | src/db/models/job_env.rs | 1 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 14 | ||||
-rw-r--r-- | src/endpoint/error.rs | 1 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 2 | ||||
-rw-r--r-- | src/filestore/merged.rs | 1 | ||||
-rw-r--r-- | src/filestore/staging.rs | 1 | ||||
-rw-r--r-- | src/job/set.rs | 5 | ||||
-rw-r--r-- | src/log/filesink.rs | 61 | ||||
-rw-r--r-- | src/log/mod.rs | 3 | ||||
-rw-r--r-- | src/log/parser.rs | 1 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 15 | ||||
-rw-r--r-- | src/package/source.rs | 2 | ||||
-rw-r--r-- | src/package/tree.rs | 2 | ||||
-rw-r--r-- | src/source/mod.rs | 1 | ||||
-rw-r--r-- | src/util/progress.rs | 18 |
20 files changed, 10 insertions, 137 deletions
diff --git a/src/commands/db.rs b/src/commands/db.rs index f2a2d3d..00530fd 100644 --- a/src/commands/db.rs +++ b/src/commands/db.rs @@ -1,4 +1,5 @@ use std::fmt::Display; +use std::io::Write; use std::path::PathBuf; use std::process::Command; @@ -7,9 +8,9 @@ use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; +use colored::Colorize; use diesel::BelongingToDsl; use diesel::ExpressionMethods; -use diesel::JoinOnDsl; use diesel::QueryDsl; use diesel::RunQueryDsl; use itertools::Itertools; @@ -22,6 +23,7 @@ use crate::config::Configuration; use crate::db::DbConnectionConfig; use crate::db::models; use crate::log::LogItem; +use crate::schema; pub fn db<'a>(db_connection_config: DbConnectionConfig, config: &Configuration<'a>, matches: &ArgMatches) -> Result<()> { match matches.subcommand() { @@ -227,9 +229,6 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> { .map(uuid::Uuid::parse_str) .transpose()? .map(|submit_uuid| { - use crate::schema; - use diesel::BelongingToDsl; - let submit = models::Submit::with_id(&conn, &submit_uuid)?; models::Job::belonging_to(&submit) @@ -273,10 +272,6 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> { } fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &ArgMatches) -> Result<()> { - use std::io::Write; - use colored::Colorize; - use crate::schema; - use crate::schema::jobs::dsl; let highlighting_disabled = matches.is_present("script_disable_highlighting"); let configured_theme = config.script_highlight_theme(); @@ -420,8 +415,6 @@ fn mk_header(vec: Vec<&str>) -> Vec<ascii_table::Column> { /// Display the passed data as nice ascii table, /// or, if stdout is a pipe, print it nicely parseable fn display_data<D: Display>(headers: Vec<ascii_table::Column>, data: Vec<Vec<D>>, csv: bool) -> Result<()> { - use std::io::Write; - if csv { use csv::WriterBuilder; let mut wtr = WriterBuilder::new().from_writer(vec![]); diff --git a/src/commands/env_of.rs b/src/commands/env_of.rs index 9f2e667..4a56db1 100644 --- a/src/commands/env_of.rs +++ b/src/commands/env_of.rs @@ -1,5 +1,3 @@ -use anyhow::Error; -use anyhow::Context; use anyhow::Result; use clap::ArgMatches; diff --git a/src/commands/find_pkg.rs b/src/commands/find_pkg.rs index 32f451d..b6c26a5 100644 --- a/src/commands/find_pkg.rs +++ b/src/commands/find_pkg.rs @@ -1,5 +1,4 @@ use anyhow::Error; -use anyhow::Context; use anyhow::Result; use clap::ArgMatches; diff --git a/src/commands/source.rs b/src/commands/source.rs index 81779b4..46c548d 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -7,7 +7,6 @@ use anyhow::anyhow; use clap::ArgMatches; use tokio::stream::StreamExt; use tokio::io::AsyncWriteExt; -use futures::TryStreamExt; use crate::config::*; use crate::package::PackageName; diff --git a/src/config/not_validated.rs b/src/config/not_validated.rs index 200b1e4..c91a0cb 100644 --- a/src/config/not_validated.rs +++ b/src/config/not_validated.rs @@ -79,7 +79,7 @@ impl<'reg> NotValidatedConfiguration { "InspiredGitHub", "Solarized (dark)", "Solarized (light)", - ].into_iter().any(|allowed_theme| configured_theme == *allowed_theme); + ].iter().any(|allowed_theme| configured_theme == *allowed_theme); if !allowed_theme_present { return Err(anyhow!("Theme not known: {}", configured_theme)) diff --git a/src/db/models/job_env.rs b/src/db/models/job_env.rs index 3ef1ae3..f2c5fb1 100644 --- a/src/db/models/job_env.rs +++ b/src/db/models/job_env.rs @@ -2,7 +2,6 @@ use anyhow::Result; use diesel::PgConnection; use diesel::prelude::*; -use crate::schema::job_envs::*; use crate::schema::job_envs; use crate::db::models::Job; use crate::db::models::EnvVar; diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 9e9171b..d09bd37 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -8,22 +8,25 @@ use anyhow::Context; use anyhow::Error; use anyhow::Result; use anyhow::anyhow; +use futures::FutureExt; use getset::{Getters, CopyGetters}; use shiplift::Docker; use shiplift::ExecContainerOptions; +use tokio::stream::StreamExt; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; use typed_builder::TypedBuilder; +use crate::endpoint::ContainerError; use crate::endpoint::EndpointConfiguration; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; use crate::log::LogItem; +use crate::log::buffer_stream_to_line_stream; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; -use crate::endpoint::ContainerError; #[derive(Getters, CopyGetters, TypedBuilder)] pub struct Endpoint { @@ -34,9 +37,6 @@ pub struct Endpoint { docker: Docker, #[getset(get_copy = "pub")] - speed: usize, - - #[getset(get_copy = "pub")] num_max_jobs: usize, #[getset(get = "pub")] @@ -84,7 +84,6 @@ impl Endpoint { .name(ep.name().clone()) .uri(ep.uri().clone()) .docker(docker) - .speed(ep.speed()) .num_max_jobs(ep.maxjobs()) .build() }) @@ -95,7 +94,6 @@ impl Endpoint { Endpoint::builder() .name(ep.name().clone()) .uri(ep.uri().clone()) - .speed(ep.speed()) .num_max_jobs(ep.maxjobs()) .docker(shiplift::Docker::unix(ep.uri())) .build() @@ -177,9 +175,6 @@ impl Endpoint { } pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { - use crate::log::buffer_stream_to_line_stream; - use tokio::stream::StreamExt; - use futures::FutureExt; let (container_id, _warnings) = { let envs = job.environment() @@ -292,7 +287,6 @@ impl Endpoint { .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name))) .then(|_| { - use futures::FutureExt; trace!("Moving logs to log sink for container {}", container_id); buffer_stream_to_line_stream(container.exec(&exec_opts)) .map(|line| { diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs index e04ee51..dc4468a 100644 --- a/src/endpoint/error.rs +++ b/src/endpoint/error.rs @@ -1,7 +1,6 @@ use thiserror::Error as ThisError; use crate::util::docker::ContainerHash; -use crate::package::Script; #[derive(ThisError, Debug)] pub enum ContainerError { diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index cbc0bd9..ede7cc6 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -3,7 +3,6 @@ use std::result::Result as RResult; use std::sync::Arc; use anyhow::Context; -use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; @@ -14,7 +13,6 @@ use tokio::stream::StreamExt; use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; use uuid::Uuid; use crate::endpoint::Endpoint; diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs index 38bd302..22b29ab 100644 --- a/src/filestore/merged.rs +++ b/src/filestore/merged.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use tokio::sync::RwLock; use anyhow::Result; -use anyhow::anyhow; use crate::filestore::Artifact; use crate::filestore::ReleaseStore; diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs index 36da173..91e71d2 100644 --- a/src/filestore/staging.rs +++ b/src/filestore/staging.rs @@ -8,7 +8,6 @@ use anyhow::Result; use anyhow::anyhow; use futures::stream::Stream; use indicatif::ProgressBar; -use resiter::Filter; use resiter::Map; use result_inspect::ResultInspect; use tar; diff --git a/src/job/set.rs b/src/job/set.rs index 4410e20..745b38d 100644 --- a/src/job/set.rs +++ b/src/job/set.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use futures::future::Future; use tokio::stream::StreamExt; use crate::filestore::MergedStores; @@ -34,10 +33,6 @@ impl JobSet { .await } - pub fn len(&self) -> usize { - self.set.len() - } - } /// Get the tree as sets of jobs, the deepest level of the tree first diff --git a/src/log/filesink.rs b/src/log/filesink.rs deleted file mode 100644 index 39de00b..0000000 --- a/src/log/filesink.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::fs::File; -use std::fs::OpenOptions; -use std::io::Write; -use std::path::Path; -use std::path::PathBuf; - -use anyhow::Error; -use anyhow::Result; - -use crate::job::Job; -use crate::log::LogItem; -use crate::log::LogSink; - -pub struct FileSink { - file: File, -} - -impl FileSink { - fn new(path: &Path) -> Result<Self> { - OpenOptions::new() - .create(true) - .append(true) - .write(false) - .open(path) - .map(|file| FileSink { file }) - .map_err(Error::from) - } -} - -impl LogSink for FileSink { - fn log_item(&mut self, item: &LogItem) -> Result<()> { - let s = item.display()?; - writeln!(self.file, "{}", s)?; - Ok(()) - } -} - -pub struct FileLogSinkFactory { - root: PathBuf -} - -impl FileLogSinkFactory { - pub fn new(root: PathBuf) -> Self { - FileLogSinkFactory { root } - } - - pub fn new_file_sink(&self, job: &Job) -> Result<FileSink> { - let now = chrono::offset::Local::now() - .naive_local() - .format("%Y-%m-%dT%H:%M:%S"); - - trace!("Got current time: {}", now); - let filename = format!("{}-{}", now, job.package().name()); - - trace!("Building path from {} and {}", self.root.display(), filename); - let p = self.root.join(filename); - - FileSink::new(&p) - } -} - diff --git a/src/log/mod.rs b/src/log/mod.rs index bf24ad9..e3d8325 100644 --- a/src/log/mod.rs +++ b/src/log/mod.rs @@ -7,8 +7,5 @@ pub use item::*; mod sink; pub use sink::*; -mod filesink; -pub use filesink::*; - mod util; diff --git a/src/log/parser.rs b/src/log/parser.rs index 6163bc8..fc7af20 100644 --- a/src/log/parser.rs +++ b/src/log/parser.rs @@ -8,7 +8,6 @@ use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; use pom::parser::Parser as PomParser; -use resiter::Filter; use shiplift::tty::TtyChunk; use crate::log::LogItem; diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index c45b5b3..9ad81b2 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -8,11 +8,8 @@ use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; -use indicatif::ProgressBar; use tokio::sync::RwLock; -use tokio::sync::mpsc::UnboundedReceiver; use typed_builder::TypedBuilder; -use uuid::Uuid; use crate::db::models::Submit; use crate::endpoint::ContainerError; @@ -22,19 +19,15 @@ use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobSet; -use crate::log::FileLogSinkFactory; -use crate::log::LogItem; use crate::source::SourceCache; use crate::util::progress::ProgressBars; pub struct Orchestrator { - progress_generator: ProgressBars, scheduler: EndpointScheduler, staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, jobsets: Vec<JobSet>, - database: Arc<PgConnection>, } #[derive(TypedBuilder)] @@ -54,16 +47,14 @@ pub struct OrchestratorSetup { impl OrchestratorSetup { pub async fn setup(self) -> Result<Orchestrator> { let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir, self.additional_env).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir, self.additional_env).await?; Ok(Orchestrator { - progress_generator: self.progress_generator, scheduler: scheduler, staging_store: self.staging_store, release_store: self.release_store, source_cache: self.source_cache, jobsets: self.jobsets, - database: db, }) } } @@ -74,10 +65,8 @@ impl Orchestrator { use tokio::stream::StreamExt; let mut report_result = vec![]; - let number_of_jobsets = self.jobsets.len(); - let database = self.database; - for (i, jobset) in self.jobsets.into_iter().enumerate() { + for jobset in self.jobsets.into_iter() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); let multibar = Arc::new(indicatif::MultiProgress::new()); diff --git a/src/package/source.rs b/src/package/source.rs index 137a9dd..9b5ed4a 100644 --- a/src/package/source.rs +++ b/src/package/source.rs @@ -1,5 +1,3 @@ -use std::path::Path; - use anyhow::Result; use getset::Getters; use serde::Deserialize; diff --git a/src/package/tree.rs b/src/package/tree.rs index 1fad393..b489ef3 100644 --- a/src/package/tree.rs +++ b/src/package/tree.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use anyhow::Result; use anyhow::anyhow; use indicatif::ProgressBar; diff --git a/src/source/mod.rs b/src/source/mod.rs index 162c787..455785c 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -8,7 +8,6 @@ use crate::package::Package; use crate::package::PackageName; use crate::package::PackageVersion; use crate::package::Source; -use crate::util::progress::ProgressBars; #[derive(Clone, Debug)] pub struct SourceCache { diff --git a/src/util/progress.rs b/src/util/progress.rs index 86d6d7a..727c704 100644 --- a/src/util/progress.rs +++ b/src/util/progress.rs @@ -36,12 +36,6 @@ impl ProgressBars { self.bar("Crawling dependencies", &self.bar_template) } - pub fn jobset_bar(&self, jobset_num: usize, number_of_jobsets: usize, jobs_in_jobset: usize) -> ProgressBar { - let b = self.bar(&format!("Jobset {}/{} ({} Jobs)", jobset_num, number_of_jobsets, jobs_in_jobset), &self.bar_template); - b.set_length(jobs_in_jobset as u64); - b - } - pub fn job_bar(&self, id: &Uuid) -> ProgressBar { let b = self.bar(&format!("Job: {}", id), &self.bar_template); b.set_length(100); @@ -63,17 +57,5 @@ impl ProgressBars { } } - fn spinner(&self, msg: &str, template: &str) -> ProgressBar { - if self.hide { - ProgressBar::hidden() - } else { - let b = ProgressBar::new_spinner(); - b.set_style(ProgressStyle::default_spinner().template(template)); - b.set_message(msg); - b - } - } - } - |