diff options
-rw-r--r-- | src/endpoint/scheduler.rs | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index dc1e405..014d68a 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -88,6 +88,8 @@ impl EndpointScheduler { } async fn select_free_endpoint(&self) -> Result<EndpointHandle> { + use futures::stream::StreamExt; + loop { let ep = self .endpoints @@ -97,13 +99,50 @@ impl EndpointScheduler { trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r); r }) - .sorted_by(|ep1, ep2| { - ep1.utilization().partial_cmp(&ep2.utilization()).unwrap_or(std::cmp::Ordering::Equal) + .map(|ep| { + let ep = ep.clone(); + async { + let num = ep.number_of_running_containers().await?; + trace!("Number of running containers on {} = {}", ep.name(), num); + Ok((ep, num)) + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await // Vec<Result<_>> + .into_iter() + .collect::<Result<Vec<_>>>()? // -> Vec<_> + .into_iter() + .sorted_by(|(ep1, ep1_running), (ep2, ep2_running)| { + match ep1_running.partial_cmp(ep2_running).unwrap_or(std::cmp::Ordering::Equal) { + std::cmp::Ordering::Equal => { + trace!("Number of running containers on {} and {} equal ({}), using utilization", ep1.name(), ep2.name(), ep2_running); + let ep1_util = ep1.utilization(); + let ep2_util = ep1.utilization(); + + trace!("{} utilization: {}", ep1.name(), ep1_util); + trace!("{} utilization: {}", ep2.name(), ep2_util); + + ep1_util.partial_cmp(&ep2_util).unwrap_or(std::cmp::Ordering::Equal) + }, + + std::cmp::Ordering::Less => { + trace!("On {} run less ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running); + std::cmp::Ordering::Less + }, + + std::cmp::Ordering::Greater => { + trace!("On {} run more ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running); + std::cmp::Ordering::Greater + } + } }) - .next(); + .next() + .map(|(ep, _)| ep); if let Some(endpoint) = ep { - return Ok(EndpointHandle::new(endpoint.clone())); + trace!("Selected = {}", endpoint.name()); + return Ok(EndpointHandle::new(endpoint)); } else { trace!("No free endpoint found, retry..."); tokio::task::yield_now().await |