summaryrefslogtreecommitdiffstats
path: root/src/endpoint/scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r--src/endpoint/scheduler.rs30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 111e3c4..4f49034 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointHandle;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
use crate::filestore::ReleaseStore;
@@ -84,11 +85,7 @@ impl EndpointScheduler {
/// # Warning
///
/// This function blocks as long as there is no free endpoint available!
- pub async fn schedule_job(
- &self,
- job: RunnableJob,
- bar: indicatif::ProgressBar,
- ) -> Result<JobHandle> {
+ pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
@@ -103,29 +100,24 @@ impl EndpointScheduler {
})
}
- async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> {
+ async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
loop {
let ep = self
.endpoints
.iter()
- .cloned()
- .map(|ep| async move {
- ep.number_of_running_containers()
- .await
- .map(|num_running| (num_running, ep.clone()))
+ .filter(|ep| { // filter out all running containers where the number of max jobs is reached
+ let r = ep.running_jobs() < ep.num_max_jobs();
+ trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
+ r
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await?
- .iter()
- .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0))
- .map(|tpl| tpl.1.clone())
+ .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs()))
.next();
if let Some(endpoint) = ep {
- return Ok(endpoint);
+ return Ok(EndpointHandle::new(endpoint.clone()));
} else {
trace!("No free endpoint found, retry...");
+ tokio::task::yield_now().await
}
}
}
@@ -133,7 +125,7 @@ impl EndpointScheduler {
pub struct JobHandle {
log_dir: Option<PathBuf>,
- endpoint: Arc<Endpoint>,
+ endpoint: EndpointHandle,
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,