summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-03-01 19:33:46 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-03-02 13:55:35 +0100
commit9cbbff2be1f5938c83c097c6cb1094c8b2bfc49e (patch)
tree8f8239a6b5a88db62d348d68afb6371f3eb10749 /src/endpoint
parent5c9f1d46bb64429b0ef2c828e0808ef729b97b86 (diff)
Implement scheduling with max jobs per endpoint
This patch implements support for max jobs per endpoint. The number of running jobs on one endpoint are tracked with a wrapper around the Endpoint object, which increases the job counter on allocation and decreases it on deallocation. This way, the scheduler can know how many jobs are running on one endpoint and select the next endpoint accordingly. The loading/comparing is not perfect, so it might happen that more jobs run on one endpoint than configured, but this is the first step into the right direction. Also, the selection happens on a tokio job which runs in a loop{}. Because this almost blocks the whole executor thread, we use `tokio::task::yield_now()` as soon as there is no free endpoint anymore, to yield the execution to another future to free resources for doing actual work, not scheduling. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/configured.rs37
-rw-r--r--src/endpoint/scheduler.rs30
2 files changed, 40 insertions, 27 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 311e0a6..bbaaec6 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -56,6 +56,9 @@ pub struct Endpoint {
#[getset(get = "pub")]
uri: String,
+
+ #[builder(default)]
+ running_jobs: std::sync::atomic::AtomicUsize,
}
impl Debug for Endpoint {
@@ -228,17 +231,35 @@ impl Endpoint {
PreparedContainer::new(self, job, staging_store, release_stores).await
}
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
- .map_err(Error::from)
- .map(|list| list.len())
+ pub fn running_jobs(&self) -> usize {
+ self.running_jobs.load(std::sync::atomic::Ordering::Relaxed)
+ }
+}
+
+pub struct EndpointHandle(Arc<Endpoint>);
+
+impl EndpointHandle {
+ pub fn new(ep: Arc<Endpoint>) -> Self {
+ let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ EndpointHandle(ep)
}
}
+impl Drop for EndpointHandle {
+ fn drop(&mut self) {
+ let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+impl std::ops::Deref for EndpointHandle {
+ type Target = Endpoint;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.deref()
+ }
+}
+
+
#[derive(Getters)]
pub struct PreparedContainer<'a> {
endpoint: &'a Endpoint,
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 111e3c4..4f49034 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointHandle;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
use crate::filestore::ReleaseStore;
@@ -84,11 +85,7 @@ impl EndpointScheduler {
/// # Warning
///
/// This function blocks as long as there is no free endpoint available!
- pub async fn schedule_job(
- &self,
- job: RunnableJob,
- bar: indicatif::ProgressBar,
- ) -> Result<JobHandle> {
+ pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
@@ -103,29 +100,24 @@ impl EndpointScheduler {
})
}
- async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> {
+ async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
loop {
let ep = self
.endpoints
.iter()
- .cloned()
- .map(|ep| async move {
- ep.number_of_running_containers()
- .await
- .map(|num_running| (num_running, ep.clone()))
+ .filter(|ep| { // filter out all running containers where the number of max jobs is reached
+ let r = ep.running_jobs() < ep.num_max_jobs();
+ trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
+ r
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await?
- .iter()
- .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0))
- .map(|tpl| tpl.1.clone())
+ .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs()))
.next();
if let Some(endpoint) = ep {
- return Ok(endpoint);
+ return Ok(EndpointHandle::new(endpoint.clone()));
} else {
trace!("No free endpoint found, retry...");
+ tokio::task::yield_now().await
}
}
}
@@ -133,7 +125,7 @@ impl EndpointScheduler {
pub struct JobHandle {
log_dir: Option<PathBuf>,
- endpoint: Arc<Endpoint>,
+ endpoint: EndpointHandle,
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,