summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-03-04 12:58:50 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-03-04 12:58:50 +0100
commit73028e90276cb94346994672c9d9a90f5059b342 (patch)
tree38ca7c48ad5450089434653d2c0973f6972174bd
parent746d46d57d39b2f32d8f02454463b03f64c16cdd (diff)
parent682cf3f2b4567965cef73f31f1dd7b0ae6ccea28 (diff)
Merge branch 'sched-improvements'
-rw-r--r--config.toml7
-rw-r--r--src/endpoint/configured.rs37
-rw-r--r--src/endpoint/scheduler.rs61
3 files changed, 60 insertions, 45 deletions
diff --git a/config.toml b/config.toml
index 5f504ef..9536234 100644
--- a/config.toml
+++ b/config.toml
@@ -175,7 +175,12 @@ name = "testhostname"
uri = "http://0.0.0.0:8095" # the URI of the endpoint. Either http or socket path
endpoint_type = "http" # either "http" or "socket"
speed = 1 # currently ignored, but required to be present
-maxjobs = 1 # currently ignored, but required to be present
+
+# maximum number of jobs running on this endpoint.
+# Set this to a reasonable high number to be able to run a lot of small jobs.
+# For example, if you're compiling with `make -j 1`, this should at least be the
+# number of CPU cores, maybe a bit more (eg. (ncpu * 1.1))
+maxjobs = 1
#
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 311e0a6..bbaaec6 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -56,6 +56,9 @@ pub struct Endpoint {
#[getset(get = "pub")]
uri: String,
+
+ #[builder(default)]
+ running_jobs: std::sync::atomic::AtomicUsize,
}
impl Debug for Endpoint {
@@ -228,17 +231,35 @@ impl Endpoint {
PreparedContainer::new(self, job, staging_store, release_stores).await
}
- pub async fn number_of_running_containers(&self) -> Result<usize> {
- self.docker
- .containers()
- .list(&Default::default())
- .await
- .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
- .map_err(Error::from)
- .map(|list| list.len())
+ pub fn running_jobs(&self) -> usize {
+ self.running_jobs.load(std::sync::atomic::Ordering::Relaxed)
+ }
+}
+
+pub struct EndpointHandle(Arc<Endpoint>);
+
+impl EndpointHandle {
+ pub fn new(ep: Arc<Endpoint>) -> Self {
+ let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ EndpointHandle(ep)
}
}
+impl Drop for EndpointHandle {
+ fn drop(&mut self) {
+ let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+impl std::ops::Deref for EndpointHandle {
+ type Target = Endpoint;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.deref()
+ }
+}
+
+
#[derive(Getters)]
pub struct PreparedContainer<'a> {
endpoint: &'a Endpoint,
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 826f905..4f49034 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointHandle;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
use crate::filestore::ReleaseStore;
@@ -39,7 +40,7 @@ use crate::log::LogItem;
pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
- endpoints: Vec<Arc<RwLock<Endpoint>>>,
+ endpoints: Vec<Arc<Endpoint>>,
staging_store: Arc<RwLock<StagingStore>>,
release_stores: Vec<Arc<ReleaseStore>>,
@@ -68,14 +69,12 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(
- endpoints: Vec<EndpointConfiguration>,
- ) -> Result<Vec<Arc<RwLock<Endpoint>>>> {
+ async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
let unordered = futures::stream::FuturesUnordered::new();
for cfg in endpoints.into_iter() {
unordered
- .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new)));
+ .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
}
unordered.collect().await
@@ -86,11 +85,7 @@ impl EndpointScheduler {
/// # Warning
///
/// This function blocks as long as there is no free endpoint available!
- pub async fn schedule_job(
- &self,
- job: RunnableJob,
- bar: indicatif::ProgressBar,
- ) -> Result<JobHandle> {
+ pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
@@ -105,31 +100,24 @@ impl EndpointScheduler {
})
}
- async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> {
+ async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
loop {
let ep = self
.endpoints
.iter()
- .cloned()
- .map(|ep| async move {
- ep.write()
- .await
- .number_of_running_containers()
- .await
- .map(|num_running| (num_running, ep.clone()))
+ .filter(|ep| { // filter out all running containers where the number of max jobs is reached
+ let r = ep.running_jobs() < ep.num_max_jobs();
+ trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
+ r
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<_>>>()
- .await?
- .iter()
- .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0))
- .map(|tpl| tpl.1.clone())
+ .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs()))
.next();
if let Some(endpoint) = ep {
- return Ok(endpoint);
+ return Ok(EndpointHandle::new(endpoint.clone()));
} else {
trace!("No free endpoint found, retry...");
+ tokio::task::yield_now().await
}
}
}
@@ -137,7 +125,7 @@ impl EndpointScheduler {
pub struct JobHandle {
log_dir: Option<PathBuf>,
- endpoint: Arc<RwLock<Endpoint>>,
+ endpoint: EndpointHandle,
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,
@@ -155,14 +143,15 @@ impl std::fmt::Debug for JobHandle {
impl JobHandle {
pub async fn run(self) -> Result<Result<Vec<ArtifactPath>>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
- let ep = self.endpoint.read().await;
- let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
+ let endpoint_uri = self.endpoint.uri().clone();
+ let endpoint_name = self.endpoint.name().clone();
+ let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, self.endpoint.name())?;
let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
let envs = self.create_env_in_db()?;
let job_id = *self.job.uuid();
- trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
- let prepared_container = ep
+ trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name());
+ let prepared_container = self.endpoint
.prepare_container(self.job, self.staging_store.clone(), self.release_stores.clone())
.await?;
let container_id = prepared_container.create_info().id.clone();
@@ -174,7 +163,7 @@ impl JobHandle {
&job_id,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?
@@ -192,7 +181,7 @@ impl JobHandle {
drop(self.bar);
let (run_container, logres) = tokio::join!(running_container, logres);
- let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
+ let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", endpoint_name))?;
let run_container = run_container
.with_context(|| anyhow!("Running container {} failed"))
.with_context(|| {
@@ -200,7 +189,7 @@ impl JobHandle {
&job_id,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?;
@@ -233,7 +222,7 @@ impl JobHandle {
&job.uuid,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?;
@@ -241,13 +230,13 @@ impl JobHandle {
trace!("Found result for job {}: {:?}", job_id, res);
let (paths, res) = res.unpack();
let res = res
- .with_context(|| anyhow!("Error during running job on '{}'", ep.name()))
+ .with_context(|| anyhow!("Error during running job on '{}'", endpoint_name))
.with_context(|| {
Self::create_job_run_error(
&job.uuid,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})