diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-03-04 12:58:50 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-03-04 12:58:50 +0100 |
commit | 73028e90276cb94346994672c9d9a90f5059b342 (patch) | |
tree | 38ca7c48ad5450089434653d2c0973f6972174bd | |
parent | 746d46d57d39b2f32d8f02454463b03f64c16cdd (diff) | |
parent | 682cf3f2b4567965cef73f31f1dd7b0ae6ccea28 (diff) |
Merge branch 'sched-improvements'
-rw-r--r-- | config.toml | 7 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 37 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 61 |
3 files changed, 60 insertions, 45 deletions
diff --git a/config.toml b/config.toml index 5f504ef..9536234 100644 --- a/config.toml +++ b/config.toml @@ -175,7 +175,12 @@ name = "testhostname" uri = "http://0.0.0.0:8095" # the URI of the endpoint. Either http or socket path endpoint_type = "http" # either "http" or "socket" speed = 1 # currently ignored, but required to be present -maxjobs = 1 # currently ignored, but required to be present + +# maximum number of jobs running on this endpoint. +# Set this to a reasonable high number to be able to run a lot of small jobs. +# For example, if you're compiling with `make -j 1`, this should at least be the +# number of CPU cores, maybe a bit more (eg. (ncpu * 1.1)) +maxjobs = 1 # diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 311e0a6..bbaaec6 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -56,6 +56,9 @@ pub struct Endpoint { #[getset(get = "pub")] uri: String, + + #[builder(default)] + running_jobs: std::sync::atomic::AtomicUsize, } impl Debug for Endpoint { @@ -228,17 +231,35 @@ impl Endpoint { PreparedContainer::new(self, job, staging_store, release_stores).await } - pub async fn number_of_running_containers(&self) -> Result<usize> { - self.docker - .containers() - .list(&Default::default()) - .await - .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) - .map_err(Error::from) - .map(|list| list.len()) + pub fn running_jobs(&self) -> usize { + self.running_jobs.load(std::sync::atomic::Ordering::Relaxed) + } +} + +pub struct EndpointHandle(Arc<Endpoint>); + +impl EndpointHandle { + pub fn new(ep: Arc<Endpoint>) -> Self { + let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + EndpointHandle(ep) } } +impl Drop for EndpointHandle { + fn drop(&mut self) { + let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } +} + +impl std::ops::Deref for EndpointHandle { + type Target = Endpoint; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + + #[derive(Getters)] pub struct PreparedContainer<'a> { endpoint: &'a Endpoint, diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 826f905..4f49034 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; +use crate::endpoint::EndpointHandle; use crate::endpoint::EndpointConfiguration; use crate::filestore::ArtifactPath; use crate::filestore::ReleaseStore; @@ -39,7 +40,7 @@ use crate::log::LogItem; pub struct EndpointScheduler { log_dir: Option<PathBuf>, - endpoints: Vec<Arc<RwLock<Endpoint>>>, + endpoints: Vec<Arc<Endpoint>>, staging_store: Arc<RwLock<StagingStore>>, release_stores: Vec<Arc<ReleaseStore>>, @@ -68,14 +69,12 @@ impl EndpointScheduler { }) } - async fn setup_endpoints( - endpoints: Vec<EndpointConfiguration>, - ) -> Result<Vec<Arc<RwLock<Endpoint>>>> { + async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { let unordered = futures::stream::FuturesUnordered::new(); for cfg in endpoints.into_iter() { unordered - .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new))); + .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); } unordered.collect().await @@ -86,11 +85,7 @@ impl EndpointScheduler { /// # Warning /// /// This function blocks as long as there is no free endpoint available! - pub async fn schedule_job( - &self, - job: RunnableJob, - bar: indicatif::ProgressBar, - ) -> Result<JobHandle> { + pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> { let endpoint = self.select_free_endpoint().await?; Ok(JobHandle { @@ -105,31 +100,24 @@ impl EndpointScheduler { }) } - async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> { + async fn select_free_endpoint(&self) -> Result<EndpointHandle> { loop { let ep = self .endpoints .iter() - .cloned() - .map(|ep| async move { - ep.write() - .await - .number_of_running_containers() - .await - .map(|num_running| (num_running, ep.clone())) + .filter(|ep| { // filter out all running containers where the number of max jobs is reached + let r = ep.running_jobs() < ep.num_max_jobs(); + trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r); + r }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<_>>>() - .await? - .iter() - .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0)) - .map(|tpl| tpl.1.clone()) + .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs())) .next(); if let Some(endpoint) = ep { - return Ok(endpoint); + return Ok(EndpointHandle::new(endpoint.clone())); } else { trace!("No free endpoint found, retry..."); + tokio::task::yield_now().await } } } @@ -137,7 +125,7 @@ impl EndpointScheduler { pub struct JobHandle { log_dir: Option<PathBuf>, - endpoint: Arc<RwLock<Endpoint>>, + endpoint: EndpointHandle, job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, @@ -155,14 +143,15 @@ impl std::fmt::Debug for JobHandle { impl JobHandle { pub async fn run(self) -> Result<Result<Vec<ArtifactPath>>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); - let ep = self.endpoint.read().await; - let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; + let endpoint_uri = self.endpoint.uri().clone(); + let endpoint_name = self.endpoint.name().clone(); + let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, self.endpoint.name())?; let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; let envs = self.create_env_in_db()?; let job_id = *self.job.uuid(); - trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); - let prepared_container = ep + trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name()); + let prepared_container = self.endpoint .prepare_container(self.job, self.staging_store.clone(), self.release_stores.clone()) .await?; let container_id = prepared_container.create_info().id.clone(); @@ -174,7 +163,7 @@ impl JobHandle { &job_id, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })? @@ -192,7 +181,7 @@ impl JobHandle { drop(self.bar); let (run_container, logres) = tokio::join!(running_container, logres); - let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; + let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", endpoint_name))?; let run_container = run_container .with_context(|| anyhow!("Running container {} failed")) .with_context(|| { @@ -200,7 +189,7 @@ impl JobHandle { &job_id, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })?; @@ -233,7 +222,7 @@ impl JobHandle { &job.uuid, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })?; @@ -241,13 +230,13 @@ impl JobHandle { trace!("Found result for job {}: {:?}", job_id, res); let (paths, res) = res.unpack(); let res = res - .with_context(|| anyhow!("Error during running job on '{}'", ep.name())) + .with_context(|| anyhow!("Error during running job on '{}'", endpoint_name)) .with_context(|| { Self::create_job_run_error( &job.uuid, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) }) |