diff options
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r-- | src/endpoint/scheduler.rs | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 111e3c4..4f49034 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; +use crate::endpoint::EndpointHandle; use crate::endpoint::EndpointConfiguration; use crate::filestore::ArtifactPath; use crate::filestore::ReleaseStore; @@ -84,11 +85,7 @@ impl EndpointScheduler { /// # Warning /// /// This function blocks as long as there is no free endpoint available! - pub async fn schedule_job( - &self, - job: RunnableJob, - bar: indicatif::ProgressBar, - ) -> Result<JobHandle> { + pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> { let endpoint = self.select_free_endpoint().await?; Ok(JobHandle { @@ -103,29 +100,24 @@ impl EndpointScheduler { }) } - async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> { + async fn select_free_endpoint(&self) -> Result<EndpointHandle> { loop { let ep = self .endpoints .iter() - .cloned() - .map(|ep| async move { - ep.number_of_running_containers() - .await - .map(|num_running| (num_running, ep.clone())) + .filter(|ep| { // filter out all running containers where the number of max jobs is reached + let r = ep.running_jobs() < ep.num_max_jobs(); + trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r); + r }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<_>>>() - .await? - .iter() - .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0)) - .map(|tpl| tpl.1.clone()) + .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs())) .next(); if let Some(endpoint) = ep { - return Ok(endpoint); + return Ok(EndpointHandle::new(endpoint.clone())); } else { trace!("No free endpoint found, retry..."); + tokio::task::yield_now().await } } } @@ -133,7 +125,7 @@ impl EndpointScheduler { pub struct JobHandle { log_dir: Option<PathBuf>, - endpoint: Arc<Endpoint>, + endpoint: EndpointHandle, job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, |