summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-03-01 19:27:49 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-03-01 19:27:49 +0100
commit5c9f1d46bb64429b0ef2c828e0808ef729b97b86 (patch)
treeef50ffa2e82fbba54810224163f45037ee232100 /src/endpoint
parent746d46d57d39b2f32d8f02454463b03f64c16cdd (diff)
Remove RwLock around Endpoint
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/scheduler.rs37
1 files changed, 17 insertions, 20 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 826f905..111e3c4 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -39,7 +39,7 @@ use crate::log::LogItem;
pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
- endpoints: Vec<Arc<RwLock<Endpoint>>>,
+ endpoints: Vec<Arc<Endpoint>>,
staging_store: Arc<RwLock<StagingStore>>,
release_stores: Vec<Arc<ReleaseStore>>,
@@ -68,14 +68,12 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(
- endpoints: Vec<EndpointConfiguration>,
- ) -> Result<Vec<Arc<RwLock<Endpoint>>>> {
+ async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
let unordered = futures::stream::FuturesUnordered::new();
for cfg in endpoints.into_iter() {
unordered
- .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new)));
+ .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
}
unordered.collect().await
@@ -105,16 +103,14 @@ impl EndpointScheduler {
})
}
- async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> {
+ async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> {
loop {
let ep = self
.endpoints
.iter()
.cloned()
.map(|ep| async move {
- ep.write()
- .await
- .number_of_running_containers()
+ ep.number_of_running_containers()
.await
.map(|num_running| (num_running, ep.clone()))
})
@@ -137,7 +133,7 @@ impl EndpointScheduler {
pub struct JobHandle {
log_dir: Option<PathBuf>,
- endpoint: Arc<RwLock<Endpoint>>,
+ endpoint: Arc<Endpoint>,
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,
@@ -155,14 +151,15 @@ impl std::fmt::Debug for JobHandle {
impl JobHandle {
pub async fn run(self) -> Result<Result<Vec<ArtifactPath>>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
- let ep = self.endpoint.read().await;
- let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
+ let endpoint_uri = self.endpoint.uri().clone();
+ let endpoint_name = self.endpoint.name().clone();
+ let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, self.endpoint.name())?;
let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
let envs = self.create_env_in_db()?;
let job_id = *self.job.uuid();
- trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
- let prepared_container = ep
+ trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name());
+ let prepared_container = self.endpoint
.prepare_container(self.job, self.staging_store.clone(), self.release_stores.clone())
.await?;
let container_id = prepared_container.create_info().id.clone();
@@ -174,7 +171,7 @@ impl JobHandle {
&job_id,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?
@@ -192,7 +189,7 @@ impl JobHandle {
drop(self.bar);
let (run_container, logres) = tokio::join!(running_container, logres);
- let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
+ let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", endpoint_name))?;
let run_container = run_container
.with_context(|| anyhow!("Running container {} failed"))
.with_context(|| {
@@ -200,7 +197,7 @@ impl JobHandle {
&job_id,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?;
@@ -233,7 +230,7 @@ impl JobHandle {
&job.uuid,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})?;
@@ -241,13 +238,13 @@ impl JobHandle {
trace!("Found result for job {}: {:?}", job_id, res);
let (paths, res) = res.unpack();
let res = res
- .with_context(|| anyhow!("Error during running job on '{}'", ep.name()))
+ .with_context(|| anyhow!("Error during running job on '{}'", endpoint_name))
.with_context(|| {
Self::create_job_run_error(
&job.uuid,
&package.name,
&package.version,
- ep.uri(),
+ &endpoint_uri,
&container_id,
)
})