summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/commands/db.rs13
-rw-r--r--src/commands/env_of.rs2
-rw-r--r--src/commands/find_pkg.rs1
-rw-r--r--src/commands/source.rs1
-rw-r--r--src/config/not_validated.rs2
-rw-r--r--src/db/models/job_env.rs1
-rw-r--r--src/endpoint/configured.rs14
-rw-r--r--src/endpoint/error.rs1
-rw-r--r--src/endpoint/scheduler.rs2
-rw-r--r--src/filestore/merged.rs1
-rw-r--r--src/filestore/staging.rs1
-rw-r--r--src/job/set.rs5
-rw-r--r--src/log/filesink.rs61
-rw-r--r--src/log/mod.rs3
-rw-r--r--src/log/parser.rs1
-rw-r--r--src/orchestrator/orchestrator.rs15
-rw-r--r--src/package/source.rs2
-rw-r--r--src/package/tree.rs2
-rw-r--r--src/source/mod.rs1
-rw-r--r--src/util/progress.rs18
20 files changed, 10 insertions, 137 deletions
diff --git a/src/commands/db.rs b/src/commands/db.rs
index f2a2d3d..00530fd 100644
--- a/src/commands/db.rs
+++ b/src/commands/db.rs
@@ -1,4 +1,5 @@
use std::fmt::Display;
+use std::io::Write;
use std::path::PathBuf;
use std::process::Command;
@@ -7,9 +8,9 @@ use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
+use colored::Colorize;
use diesel::BelongingToDsl;
use diesel::ExpressionMethods;
-use diesel::JoinOnDsl;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
use itertools::Itertools;
@@ -22,6 +23,7 @@ use crate::config::Configuration;
use crate::db::DbConnectionConfig;
use crate::db::models;
use crate::log::LogItem;
+use crate::schema;
pub fn db<'a>(db_connection_config: DbConnectionConfig, config: &Configuration<'a>, matches: &ArgMatches) -> Result<()> {
match matches.subcommand() {
@@ -227,9 +229,6 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.map(uuid::Uuid::parse_str)
.transpose()?
.map(|submit_uuid| {
- use crate::schema;
- use diesel::BelongingToDsl;
-
let submit = models::Submit::with_id(&conn, &submit_uuid)?;
models::Job::belonging_to(&submit)
@@ -273,10 +272,6 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
}
fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &ArgMatches) -> Result<()> {
- use std::io::Write;
- use colored::Colorize;
- use crate::schema;
- use crate::schema::jobs::dsl;
let highlighting_disabled = matches.is_present("script_disable_highlighting");
let configured_theme = config.script_highlight_theme();
@@ -420,8 +415,6 @@ fn mk_header(vec: Vec<&str>) -> Vec<ascii_table::Column> {
/// Display the passed data as nice ascii table,
/// or, if stdout is a pipe, print it nicely parseable
fn display_data<D: Display>(headers: Vec<ascii_table::Column>, data: Vec<Vec<D>>, csv: bool) -> Result<()> {
- use std::io::Write;
-
if csv {
use csv::WriterBuilder;
let mut wtr = WriterBuilder::new().from_writer(vec![]);
diff --git a/src/commands/env_of.rs b/src/commands/env_of.rs
index 9f2e667..4a56db1 100644
--- a/src/commands/env_of.rs
+++ b/src/commands/env_of.rs
@@ -1,5 +1,3 @@
-use anyhow::Error;
-use anyhow::Context;
use anyhow::Result;
use clap::ArgMatches;
diff --git a/src/commands/find_pkg.rs b/src/commands/find_pkg.rs
index 32f451d..b6c26a5 100644
--- a/src/commands/find_pkg.rs
+++ b/src/commands/find_pkg.rs
@@ -1,5 +1,4 @@
use anyhow::Error;
-use anyhow::Context;
use anyhow::Result;
use clap::ArgMatches;
diff --git a/src/commands/source.rs b/src/commands/source.rs
index 81779b4..46c548d 100644
--- a/src/commands/source.rs
+++ b/src/commands/source.rs
@@ -7,7 +7,6 @@ use anyhow::anyhow;
use clap::ArgMatches;
use tokio::stream::StreamExt;
use tokio::io::AsyncWriteExt;
-use futures::TryStreamExt;
use crate::config::*;
use crate::package::PackageName;
diff --git a/src/config/not_validated.rs b/src/config/not_validated.rs
index 200b1e4..c91a0cb 100644
--- a/src/config/not_validated.rs
+++ b/src/config/not_validated.rs
@@ -79,7 +79,7 @@ impl<'reg> NotValidatedConfiguration {
"InspiredGitHub",
"Solarized (dark)",
"Solarized (light)",
- ].into_iter().any(|allowed_theme| configured_theme == *allowed_theme);
+ ].iter().any(|allowed_theme| configured_theme == *allowed_theme);
if !allowed_theme_present {
return Err(anyhow!("Theme not known: {}", configured_theme))
diff --git a/src/db/models/job_env.rs b/src/db/models/job_env.rs
index 3ef1ae3..f2c5fb1 100644
--- a/src/db/models/job_env.rs
+++ b/src/db/models/job_env.rs
@@ -2,7 +2,6 @@ use anyhow::Result;
use diesel::PgConnection;
use diesel::prelude::*;
-use crate::schema::job_envs::*;
use crate::schema::job_envs;
use crate::db::models::Job;
use crate::db::models::EnvVar;
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 9e9171b..d09bd37 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -8,22 +8,25 @@ use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
+use futures::FutureExt;
use getset::{Getters, CopyGetters};
use shiplift::Docker;
use shiplift::ExecContainerOptions;
+use tokio::stream::StreamExt;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use typed_builder::TypedBuilder;
+use crate::endpoint::ContainerError;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
+use crate::log::buffer_stream_to_line_stream;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
-use crate::endpoint::ContainerError;
#[derive(Getters, CopyGetters, TypedBuilder)]
pub struct Endpoint {
@@ -34,9 +37,6 @@ pub struct Endpoint {
docker: Docker,
#[getset(get_copy = "pub")]
- speed: usize,
-
- #[getset(get_copy = "pub")]
num_max_jobs: usize,
#[getset(get = "pub")]
@@ -84,7 +84,6 @@ impl Endpoint {
.name(ep.name().clone())
.uri(ep.uri().clone())
.docker(docker)
- .speed(ep.speed())
.num_max_jobs(ep.maxjobs())
.build()
})
@@ -95,7 +94,6 @@ impl Endpoint {
Endpoint::builder()
.name(ep.name().clone())
.uri(ep.uri().clone())
- .speed(ep.speed())
.num_max_jobs(ep.maxjobs())
.docker(shiplift::Docker::unix(ep.uri()))
.build()
@@ -177,9 +175,6 @@ impl Endpoint {
}
pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
- use crate::log::buffer_stream_to_line_stream;
- use tokio::stream::StreamExt;
- use futures::FutureExt;
let (container_id, _warnings) = {
let envs = job.environment()
@@ -292,7 +287,6 @@ impl Endpoint {
.inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
.then(|_| {
- use futures::FutureExt;
trace!("Moving logs to log sink for container {}", container_id);
buffer_stream_to_line_stream(container.exec(&exec_opts))
.map(|line| {
diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs
index e04ee51..dc4468a 100644
--- a/src/endpoint/error.rs
+++ b/src/endpoint/error.rs
@@ -1,7 +1,6 @@
use thiserror::Error as ThisError;
use crate::util::docker::ContainerHash;
-use crate::package::Script;
#[derive(ThisError, Debug)]
pub enum ContainerError {
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index cbc0bd9..ede7cc6 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -3,7 +3,6 @@ use std::result::Result as RResult;
use std::sync::Arc;
use anyhow::Context;
-use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use diesel::PgConnection;
@@ -14,7 +13,6 @@ use tokio::stream::StreamExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
-use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;
use crate::endpoint::Endpoint;
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 38bd302..22b29ab 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -2,7 +2,6 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use anyhow::Result;
-use anyhow::anyhow;
use crate::filestore::Artifact;
use crate::filestore::ReleaseStore;
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index 36da173..91e71d2 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -8,7 +8,6 @@ use anyhow::Result;
use anyhow::anyhow;
use futures::stream::Stream;
use indicatif::ProgressBar;
-use resiter::Filter;
use resiter::Map;
use result_inspect::ResultInspect;
use tar;
diff --git a/src/job/set.rs b/src/job/set.rs
index 4410e20..745b38d 100644
--- a/src/job/set.rs
+++ b/src/job/set.rs
@@ -1,5 +1,4 @@
use anyhow::Result;
-use futures::future::Future;
use tokio::stream::StreamExt;
use crate::filestore::MergedStores;
@@ -34,10 +33,6 @@ impl JobSet {
.await
}
- pub fn len(&self) -> usize {
- self.set.len()
- }
-
}
/// Get the tree as sets of jobs, the deepest level of the tree first
diff --git a/src/log/filesink.rs b/src/log/filesink.rs
deleted file mode 100644
index 39de00b..0000000
--- a/src/log/filesink.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-use std::fs::File;
-use std::fs::OpenOptions;
-use std::io::Write;
-use std::path::Path;
-use std::path::PathBuf;
-
-use anyhow::Error;
-use anyhow::Result;
-
-use crate::job::Job;
-use crate::log::LogItem;
-use crate::log::LogSink;
-
-pub struct FileSink {
- file: File,
-}
-
-impl FileSink {
- fn new(path: &Path) -> Result<Self> {
- OpenOptions::new()
- .create(true)
- .append(true)
- .write(false)
- .open(path)
- .map(|file| FileSink { file })
- .map_err(Error::from)
- }
-}
-
-impl LogSink for FileSink {
- fn log_item(&mut self, item: &LogItem) -> Result<()> {
- let s = item.display()?;
- writeln!(self.file, "{}", s)?;
- Ok(())
- }
-}
-
-pub struct FileLogSinkFactory {
- root: PathBuf
-}
-
-impl FileLogSinkFactory {
- pub fn new(root: PathBuf) -> Self {
- FileLogSinkFactory { root }
- }
-
- pub fn new_file_sink(&self, job: &Job) -> Result<FileSink> {
- let now = chrono::offset::Local::now()
- .naive_local()
- .format("%Y-%m-%dT%H:%M:%S");
-
- trace!("Got current time: {}", now);
- let filename = format!("{}-{}", now, job.package().name());
-
- trace!("Building path from {} and {}", self.root.display(), filename);
- let p = self.root.join(filename);
-
- FileSink::new(&p)
- }
-}
-
diff --git a/src/log/mod.rs b/src/log/mod.rs
index bf24ad9..e3d8325 100644
--- a/src/log/mod.rs
+++ b/src/log/mod.rs
@@ -7,8 +7,5 @@ pub use item::*;
mod sink;
pub use sink::*;
-mod filesink;
-pub use filesink::*;
-
mod util;
diff --git a/src/log/parser.rs b/src/log/parser.rs
index 6163bc8..fc7af20 100644
--- a/src/log/parser.rs
+++ b/src/log/parser.rs
@@ -8,7 +8,6 @@ use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use pom::parser::Parser as PomParser;
-use resiter::Filter;
use shiplift::tty::TtyChunk;
use crate::log::LogItem;
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index c45b5b3..9ad81b2 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -8,11 +8,8 @@ use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use diesel::PgConnection;
-use indicatif::ProgressBar;
use tokio::sync::RwLock;
-use tokio::sync::mpsc::UnboundedReceiver;
use typed_builder::TypedBuilder;
-use uuid::Uuid;
use crate::db::models::Submit;
use crate::endpoint::ContainerError;
@@ -22,19 +19,15 @@ use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobSet;
-use crate::log::FileLogSinkFactory;
-use crate::log::LogItem;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
pub struct Orchestrator {
- progress_generator: ProgressBars,
scheduler: EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobsets: Vec<JobSet>,
- database: Arc<PgConnection>,
}
#[derive(TypedBuilder)]
@@ -54,16 +47,14 @@ pub struct OrchestratorSetup {
impl OrchestratorSetup {
pub async fn setup(self) -> Result<Orchestrator> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir, self.additional_env).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir, self.additional_env).await?;
Ok(Orchestrator {
- progress_generator: self.progress_generator,
scheduler: scheduler,
staging_store: self.staging_store,
release_store: self.release_store,
source_cache: self.source_cache,
jobsets: self.jobsets,
- database: db,
})
}
}
@@ -74,10 +65,8 @@ impl Orchestrator {
use tokio::stream::StreamExt;
let mut report_result = vec![];
- let number_of_jobsets = self.jobsets.len();
- let database = self.database;
- for (i, jobset) in self.jobsets.into_iter().enumerate() {
+ for jobset in self.jobsets.into_iter() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
let multibar = Arc::new(indicatif::MultiProgress::new());
diff --git a/src/package/source.rs b/src/package/source.rs
index 137a9dd..9b5ed4a 100644
--- a/src/package/source.rs
+++ b/src/package/source.rs
@@ -1,5 +1,3 @@
-use std::path::Path;
-
use anyhow::Result;
use getset::Getters;
use serde::Deserialize;
diff --git a/src/package/tree.rs b/src/package/tree.rs
index 1fad393..b489ef3 100644
--- a/src/package/tree.rs
+++ b/src/package/tree.rs
@@ -1,5 +1,3 @@
-use std::collections::BTreeMap;
-
use anyhow::Result;
use anyhow::anyhow;
use indicatif::ProgressBar;
diff --git a/src/source/mod.rs b/src/source/mod.rs
index 162c787..455785c 100644
--- a/src/source/mod.rs
+++ b/src/source/mod.rs
@@ -8,7 +8,6 @@ use crate::package::Package;
use crate::package::PackageName;
use crate::package::PackageVersion;
use crate::package::Source;
-use crate::util::progress::ProgressBars;
#[derive(Clone, Debug)]
pub struct SourceCache {
diff --git a/src/util/progress.rs b/src/util/progress.rs
index 86d6d7a..727c704 100644
--- a/src/util/progress.rs
+++ b/src/util/progress.rs
@@ -36,12 +36,6 @@ impl ProgressBars {
self.bar("Crawling dependencies", &self.bar_template)
}
- pub fn jobset_bar(&self, jobset_num: usize, number_of_jobsets: usize, jobs_in_jobset: usize) -> ProgressBar {
- let b = self.bar(&format!("Jobset {}/{} ({} Jobs)", jobset_num, number_of_jobsets, jobs_in_jobset), &self.bar_template);
- b.set_length(jobs_in_jobset as u64);
- b
- }
-
pub fn job_bar(&self, id: &Uuid) -> ProgressBar {
let b = self.bar(&format!("Job: {}", id), &self.bar_template);
b.set_length(100);
@@ -63,17 +57,5 @@ impl ProgressBars {
}
}
- fn spinner(&self, msg: &str, template: &str) -> ProgressBar {
- if self.hide {
- ProgressBar::hidden()
- } else {
- let b = ProgressBar::new_spinner();
- b.set_style(ProgressStyle::default_spinner().template(template));
- b.set_message(msg);
- b
- }
- }
-
}
-