summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/endpoint/configured.rs25
-rw-r--r--src/endpoint/scheduler.rs47
2 files changed, 66 insertions, 6 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index e2dda2c..e82901e 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -246,6 +246,7 @@ impl Endpoint {
pub fn utilization(&self) -> f64 {
let max_jobs = self.num_max_jobs() as f64;
let run_jobs = self.running_jobs() as f64;
+ trace!("utilization of {}: 100.0 / {} * {}", self.name(), max_jobs, run_jobs);
100.0 / max_jobs * run_jobs
}
@@ -280,6 +281,24 @@ impl Endpoint {
})
}
+ pub async fn number_of_running_containers(&self) -> Result<usize> {
+ self.docker
+ .containers()
+ .list({
+ &shiplift::builder::ContainerListOptions::builder()
+ .all()
+ .build()
+ })
+ .await
+ .map_err(Error::from)
+ .map(|list| {
+ list.into_iter()
+ .inspect(|stat| trace!("stat = {:?}", stat))
+ .filter(|stat| stat.state == "running")
+ .count()
+ })
+ }
+
pub async fn has_container_with_id(&self, id: &str) -> Result<bool> {
self.container_stats()
.await?
@@ -401,14 +420,16 @@ pub struct EndpointHandle(Arc<Endpoint>);
impl EndpointHandle {
pub fn new(ep: Arc<Endpoint>) -> Self {
- let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let res = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ trace!("Endpoint {} has one job more: {}", ep.name(), res + 1);
EndpointHandle(ep)
}
}
impl Drop for EndpointHandle {
fn drop(&mut self) {
- let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ let res = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ trace!("Endpoint {} has one job less: {}", self.0.name(), res - 1);
}
}
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index dc1e405..014d68a 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -88,6 +88,8 @@ impl EndpointScheduler {
}
async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
+ use futures::stream::StreamExt;
+
loop {
let ep = self
.endpoints
@@ -97,13 +99,50 @@ impl EndpointScheduler {
trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
r
})
- .sorted_by(|ep1, ep2| {
- ep1.utilization().partial_cmp(&ep2.utilization()).unwrap_or(std::cmp::Ordering::Equal)
+ .map(|ep| {
+ let ep = ep.clone();
+ async {
+ let num = ep.number_of_running_containers().await?;
+ trace!("Number of running containers on {} = {}", ep.name(), num);
+ Ok((ep, num))
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await // Vec<Result<_>>
+ .into_iter()
+ .collect::<Result<Vec<_>>>()? // -> Vec<_>
+ .into_iter()
+ .sorted_by(|(ep1, ep1_running), (ep2, ep2_running)| {
+ match ep1_running.partial_cmp(ep2_running).unwrap_or(std::cmp::Ordering::Equal) {
+ std::cmp::Ordering::Equal => {
+ trace!("Number of running containers on {} and {} equal ({}), using utilization", ep1.name(), ep2.name(), ep2_running);
+ let ep1_util = ep1.utilization();
+ let ep2_util = ep1.utilization();
+
+ trace!("{} utilization: {}", ep1.name(), ep1_util);
+ trace!("{} utilization: {}", ep2.name(), ep2_util);
+
+ ep1_util.partial_cmp(&ep2_util).unwrap_or(std::cmp::Ordering::Equal)
+ },
+
+ std::cmp::Ordering::Less => {
+ trace!("On {} run less ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running);
+ std::cmp::Ordering::Less
+ },
+
+ std::cmp::Ordering::Greater => {
+ trace!("On {} run more ({}) containers than on {} ({})", ep1.name(), ep1_running, ep2.name(), ep2_running);
+ std::cmp::Ordering::Greater
+ }
+ }
})
- .next();
+ .next()
+ .map(|(ep, _)| ep);
if let Some(endpoint) = ep {
- return Ok(EndpointHandle::new(endpoint.clone()));
+ trace!("Selected = {}", endpoint.name());
+ return Ok(EndpointHandle::new(endpoint));
} else {
trace!("No free endpoint found, retry...");
tokio::task::yield_now().await