summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/endpoint/scheduler.rs47
1 files changed, 43 insertions, 4 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index dc1e405..014d68a 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -88,6 +88,8 @@ impl EndpointScheduler {
}
async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
+ use futures::stream::StreamExt;
+
loop {
let ep = self
.endpoints
@@ -97,13 +99,50 @@ impl EndpointScheduler {
trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
r
})
- .sorted_by(|ep1, ep2| {
- ep1.utilization().partial_cmp(&ep2.utilization()).unwrap_or(std::cmp::Ordering::Equal)
+ .map(|ep| {
+ let ep = ep.clone();
+ async {
+ let num = ep.number_of_running_containers().await?;
+ trace!("Number of running containers on {} = {}", ep.name(), num);
+ Ok((ep, num))
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await // Vec<Result<_>>
+ .into_iter()
+ .collect::<Result<Vec<_>>>()? // -> Vec<_>
+ .into_iter()
+ .sorted_by(|(ep1, ep1_running), (ep2, ep2_running)| {
+ match ep1_running.partial_cmp(ep2_running).unwrap_or(std::cmp::Ordering::Equal) {
+ std::cmp::Ordering::Equal => {
+ trace!("Number of running containers on {} and {} equal ({}), using utilization", ep1.name(), ep2.name(), ep2_running);
+ let ep1_util = ep1.utilization();
+ let ep2_util = ep1.utilization();
+
+ trace!("{} utilization: {}", ep1.name(), ep1_util);
+ trace!("{} utilization: {}", ep2.name(), ep2_util);
+
+ ep1_util.partial_cmp(&ep2_util).unwrap_or(std::cmp::Ordering::Equal)
+ },
+
+ std::cmp::Ordering::Less => {
+ trace!("On {} run less ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running);
+ std::cmp::Ordering::Less
+ },
+
+ std::cmp::Ordering::Greater => {
+ trace!("On {} run more ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running);
+ std::cmp::Ordering::Greater
+ }
+ }
})
- .next();
+ .next()
+ .map(|(ep, _)| ep);
if let Some(endpoint) = ep {
- return Ok(EndpointHandle::new(endpoint.clone()));
+ trace!("Selected = {}", endpoint.name());
+ return Ok(EndpointHandle::new(endpoint));
} else {
trace!("No free endpoint found, retry...");
tokio::task::yield_now().await