diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-09-16 12:38:43 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-09-16 12:38:43 +0200 |
commit | 63566cccba0c2c42af2e38d7ec0a922a60cf128d (patch) | |
tree | f89a2b3f67a37573cd656c6d6846e730165ae9cd | |
parent | aed91a7cfcd764094bd26e3b8972b16ecb7a9189 (diff) | |
parent | 591fbd5afe536bea549009ee0e241e50c9375783 (diff) |
Merge branch 'schedule-to-less-running-containers'
-rw-r--r-- | src/endpoint/configured.rs | 25 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 47 |
2 files changed, 66 insertions, 6 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index e2dda2c..e82901e 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -246,6 +246,7 @@ impl Endpoint { pub fn utilization(&self) -> f64 { let max_jobs = self.num_max_jobs() as f64; let run_jobs = self.running_jobs() as f64; + trace!("utilization of {}: 100.0 / {} * {}", self.name(), max_jobs, run_jobs); 100.0 / max_jobs * run_jobs } @@ -280,6 +281,24 @@ impl Endpoint { }) } + pub async fn number_of_running_containers(&self) -> Result<usize> { + self.docker + .containers() + .list({ + &shiplift::builder::ContainerListOptions::builder() + .all() + .build() + }) + .await + .map_err(Error::from) + .map(|list| { + list.into_iter() + .inspect(|stat| trace!("stat = {:?}", stat)) + .filter(|stat| stat.state == "running") + .count() + }) + } + pub async fn has_container_with_id(&self, id: &str) -> Result<bool> { self.container_stats() .await? @@ -401,14 +420,16 @@ pub struct EndpointHandle(Arc<Endpoint>); impl EndpointHandle { pub fn new(ep: Arc<Endpoint>) -> Self { - let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let res = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + trace!("Endpoint {} has one job more: {}", ep.name(), res + 1); EndpointHandle(ep) } } impl Drop for EndpointHandle { fn drop(&mut self) { - let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + let res = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + trace!("Endpoint {} has one job less: {}", self.0.name(), res - 1); } } diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index dc1e405..014d68a 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -88,6 +88,8 @@ impl EndpointScheduler { } async fn select_free_endpoint(&self) -> Result<EndpointHandle> { + use futures::stream::StreamExt; + loop { let ep = self .endpoints @@ -97,13 +99,50 @@ impl EndpointScheduler { trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r); r }) - .sorted_by(|ep1, ep2| { - ep1.utilization().partial_cmp(&ep2.utilization()).unwrap_or(std::cmp::Ordering::Equal) + .map(|ep| { + let ep = ep.clone(); + async { + let num = ep.number_of_running_containers().await?; + trace!("Number of running containers on {} = {}", ep.name(), num); + Ok((ep, num)) + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await // Vec<Result<_>> + .into_iter() + .collect::<Result<Vec<_>>>()? // -> Vec<_> + .into_iter() + .sorted_by(|(ep1, ep1_running), (ep2, ep2_running)| { + match ep1_running.partial_cmp(ep2_running).unwrap_or(std::cmp::Ordering::Equal) { + std::cmp::Ordering::Equal => { + trace!("Number of running containers on {} and {} equal ({}), using utilization", ep1.name(), ep2.name(), ep2_running); + let ep1_util = ep1.utilization(); + let ep2_util = ep1.utilization(); + + trace!("{} utilization: {}", ep1.name(), ep1_util); + trace!("{} utilization: {}", ep2.name(), ep2_util); + + ep1_util.partial_cmp(&ep2_util).unwrap_or(std::cmp::Ordering::Equal) + }, + + std::cmp::Ordering::Less => { + trace!("On {} run less ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running); + std::cmp::Ordering::Less + }, + + std::cmp::Ordering::Greater => { + trace!("On {} run more ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running); + std::cmp::Ordering::Greater + } + } }) - .next(); + .next() + .map(|(ep, _)| ep); if let Some(endpoint) = ep { - return Ok(EndpointHandle::new(endpoint.clone())); + trace!("Selected = {}", endpoint.name()); + return Ok(EndpointHandle::new(endpoint)); } else { trace!("No free endpoint found, retry..."); tokio::task::yield_now().await |