diff options
Diffstat (limited to 'src/endpoint')
-rw-r--r-- | src/endpoint/configured.rs | 37 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 30 |
2 files changed, 40 insertions, 27 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 311e0a6..bbaaec6 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -56,6 +56,9 @@ pub struct Endpoint { #[getset(get = "pub")] uri: String, + + #[builder(default)] + running_jobs: std::sync::atomic::AtomicUsize, } impl Debug for Endpoint { @@ -228,17 +231,35 @@ impl Endpoint { PreparedContainer::new(self, job, staging_store, release_stores).await } - pub async fn number_of_running_containers(&self) -> Result<usize> { - self.docker - .containers() - .list(&Default::default()) - .await - .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) - .map_err(Error::from) - .map(|list| list.len()) + pub fn running_jobs(&self) -> usize { + self.running_jobs.load(std::sync::atomic::Ordering::Relaxed) + } +} + +pub struct EndpointHandle(Arc<Endpoint>); + +impl EndpointHandle { + pub fn new(ep: Arc<Endpoint>) -> Self { + let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + EndpointHandle(ep) } } +impl Drop for EndpointHandle { + fn drop(&mut self) { + let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } +} + +impl std::ops::Deref for EndpointHandle { + type Target = Endpoint; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + + #[derive(Getters)] pub struct PreparedContainer<'a> { endpoint: &'a Endpoint, diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 111e3c4..4f49034 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; +use crate::endpoint::EndpointHandle; use crate::endpoint::EndpointConfiguration; use crate::filestore::ArtifactPath; use crate::filestore::ReleaseStore; @@ -84,11 +85,7 @@ impl EndpointScheduler { /// # Warning /// /// This function blocks as long as there is no free endpoint available! - pub async fn schedule_job( - &self, - job: RunnableJob, - bar: indicatif::ProgressBar, - ) -> Result<JobHandle> { + pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> { let endpoint = self.select_free_endpoint().await?; Ok(JobHandle { @@ -103,29 +100,24 @@ impl EndpointScheduler { }) } - async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> { + async fn select_free_endpoint(&self) -> Result<EndpointHandle> { loop { let ep = self .endpoints .iter() - .cloned() - .map(|ep| async move { - ep.number_of_running_containers() - .await - .map(|num_running| (num_running, ep.clone())) + .filter(|ep| { // filter out all running containers where the number of max jobs is reached + let r = ep.running_jobs() < ep.num_max_jobs(); + trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r); + r }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<_>>>() - .await? - .iter() - .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0)) - .map(|tpl| tpl.1.clone()) + .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs())) .next(); if let Some(endpoint) = ep { - return Ok(endpoint); + return Ok(EndpointHandle::new(endpoint.clone())); } else { trace!("No free endpoint found, retry..."); + tokio::task::yield_now().await } } } @@ -133,7 +125,7 @@ impl EndpointScheduler { pub struct JobHandle { log_dir: Option<PathBuf>, - endpoint: Arc<Endpoint>, + endpoint: EndpointHandle, job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, |