summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/configured.rs37
-rw-r--r--src/endpoint/scheduler.rs30
2 files changed, 40 insertions, 27 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 311e0a6..bbaaec6 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -56,6 +56,9 @@ pub struct Endpoint {
#[getset(get = "pub")]
uri: String,
+
+ #[builder(default)]
+ running_jobs: std::sync::atomic::AtomicUsize,
}
impl Debug for Endpoint {
@@ -228,17 +231,35 @@ impl Endpoint {
PreparedContainer::new(self, job, staging_store, release_stores).await
}
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
- .map_err(Error::from)
- .map(|list| list.len())
+ pub fn running_jobs(&self) -> usize {
+ self.running_jobs.load(std::sync::atomic::Ordering::Relaxed)
+ }
+}
+
+pub struct EndpointHandle(Arc<Endpoint>);
+
+impl EndpointHandle {
+ pub fn new(ep: Arc<Endpoint>) -> Self {
+ let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ EndpointHandle(ep)
}
}
+impl Drop for EndpointHandle {
+ fn drop(&mut self) {
+ let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+impl std::ops::Deref for EndpointHandle {
+ type Target = Endpoint;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.deref()
+ }
+}
+
+
#[derive(Getters)]
pub struct PreparedContainer<'a> {
endpoint: &'a Endpoint,
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 111e3c4..4f49034 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointHandle;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
use crate::filestore::ReleaseStore;
@@ -84,11 +85,7 @@ impl EndpointScheduler {
/// # Warning
///
/// This function blocks as long as there is no free endpoint available!
- pub async fn schedule_job(
- &self,
- job: RunnableJob,
- bar: indicatif::ProgressBar,
- ) -> Result<JobHandle> {
+ pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
@@ -103,29 +100,24 @@ impl EndpointScheduler {
})
}
- async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> {
+ async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
loop {
let ep = self
.endpoints
.iter()
- .cloned()
- .map(|ep| async move {
- ep.number_of_running_containers()
- .await
- .map(|num_running| (num_running, ep.clone()))
+ .filter(|ep| { // filter out all running containers where the number of max jobs is reached
+ let r = ep.running_jobs() < ep.num_max_jobs();
+ trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
+ r
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await?
- .iter()
- .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0))
- .map(|tpl| tpl.1.clone())
+ .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs()))
.next();
if let Some(endpoint) = ep {
- return Ok(endpoint);
+ return Ok(EndpointHandle::new(endpoint.clone()));
} else {
trace!("No free endpoint found, retry...");
+ tokio::task::yield_now().await
}
}
}
@@ -133,7 +125,7 @@ impl EndpointScheduler {
pub struct JobHandle {
log_dir: Option<PathBuf>,
- endpoint: Arc<Endpoint>,
+ endpoint: EndpointHandle,
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,