summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-03-01 19:33:46 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-03-02 13:55:35 +0100
commit9cbbff2be1f5938c83c097c6cb1094c8b2bfc49e (patch)
tree8f8239a6b5a88db62d348d68afb6371f3eb10749 /src/endpoint/configured.rs
parent5c9f1d46bb64429b0ef2c828e0808ef729b97b86 (diff)
Implement scheduling with max jobs per endpoint
This patch implements support for max jobs per endpoint. The number of running jobs on one endpoint are tracked with a wrapper around the Endpoint object, which increases the job counter on allocation and decreases it on deallocation. This way, the scheduler can know how many jobs are running on one endpoint and select the next endpoint accordingly. The loading/comparing is not perfect, so it might happen that more jobs run on one endpoint than configured, but this is the first step into the right direction. Also, the selection happens on a tokio job which runs in a loop{}. Because this almost blocks the whole executor thread, we use `tokio::task::yield_now()` as soon as there is no free endpoint anymore, to yield the execution to another future to free resources for doing actual work, not scheduling. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs37
1 files changed, 29 insertions, 8 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 311e0a6..bbaaec6 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -56,6 +56,9 @@ pub struct Endpoint {
#[getset(get = "pub")]
uri: String,
+
+ #[builder(default)]
+ running_jobs: std::sync::atomic::AtomicUsize,
}
impl Debug for Endpoint {
@@ -228,17 +231,35 @@ impl Endpoint {
PreparedContainer::new(self, job, staging_store, release_stores).await
}
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
- .map_err(Error::from)
- .map(|list| list.len())
+ pub fn running_jobs(&self) -> usize {
+ self.running_jobs.load(std::sync::atomic::Ordering::Relaxed)
+ }
+}
+
+pub struct EndpointHandle(Arc<Endpoint>);
+
+impl EndpointHandle {
+ pub fn new(ep: Arc<Endpoint>) -> Self {
+ let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ EndpointHandle(ep)
}
}
+impl Drop for EndpointHandle {
+ fn drop(&mut self) {
+ let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+impl std::ops::Deref for EndpointHandle {
+ type Target = Endpoint;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.deref()
+ }
+}
+
+
#[derive(Getters)]
pub struct PreparedContainer<'a> {
endpoint: &'a Endpoint,