diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-03-01 19:27:49 +0100 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-03-01 19:27:49 +0100 |
commit | 5c9f1d46bb64429b0ef2c828e0808ef729b97b86 (patch) | |
tree | ef50ffa2e82fbba54810224163f45037ee232100 /src/endpoint | |
parent | 746d46d57d39b2f32d8f02454463b03f64c16cdd (diff) |
Remove RwLock around Endpoint
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint')
-rw-r--r-- | src/endpoint/scheduler.rs | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 826f905..111e3c4 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -39,7 +39,7 @@ use crate::log::LogItem; pub struct EndpointScheduler { log_dir: Option<PathBuf>, - endpoints: Vec<Arc<RwLock<Endpoint>>>, + endpoints: Vec<Arc<Endpoint>>, staging_store: Arc<RwLock<StagingStore>>, release_stores: Vec<Arc<ReleaseStore>>, @@ -68,14 +68,12 @@ impl EndpointScheduler { }) } - async fn setup_endpoints( - endpoints: Vec<EndpointConfiguration>, - ) -> Result<Vec<Arc<RwLock<Endpoint>>>> { + async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { let unordered = futures::stream::FuturesUnordered::new(); for cfg in endpoints.into_iter() { unordered - .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new))); + .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); } unordered.collect().await @@ -105,16 +103,14 @@ impl EndpointScheduler { }) } - async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> { + async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> { loop { let ep = self .endpoints .iter() .cloned() .map(|ep| async move { - ep.write() - .await - .number_of_running_containers() + ep.number_of_running_containers() .await .map(|num_running| (num_running, ep.clone())) }) @@ -137,7 +133,7 @@ impl EndpointScheduler { pub struct JobHandle { log_dir: Option<PathBuf>, - endpoint: Arc<RwLock<Endpoint>>, + endpoint: Arc<Endpoint>, job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, @@ -155,14 +151,15 @@ impl std::fmt::Debug for JobHandle { impl JobHandle { pub async fn run(self) -> Result<Result<Vec<ArtifactPath>>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); - let ep = self.endpoint.read().await; - let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; + let endpoint_uri = self.endpoint.uri().clone(); + let endpoint_name = self.endpoint.name().clone(); + let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, self.endpoint.name())?; let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; let envs = self.create_env_in_db()?; let job_id = *self.job.uuid(); - trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); - let prepared_container = ep + trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name()); + let prepared_container = self.endpoint .prepare_container(self.job, self.staging_store.clone(), self.release_stores.clone()) .await?; let container_id = prepared_container.create_info().id.clone(); @@ -174,7 +171,7 @@ impl JobHandle { &job_id, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })? @@ -192,7 +189,7 @@ impl JobHandle { drop(self.bar); let (run_container, logres) = tokio::join!(running_container, logres); - let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; + let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", endpoint_name))?; let run_container = run_container .with_context(|| anyhow!("Running container {} failed")) .with_context(|| { @@ -200,7 +197,7 @@ impl JobHandle { &job_id, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })?; @@ -233,7 +230,7 @@ impl JobHandle { &job.uuid, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) })?; @@ -241,13 +238,13 @@ impl JobHandle { trace!("Found result for job {}: {:?}", job_id, res); let (paths, res) = res.unpack(); let res = res - .with_context(|| anyhow!("Error during running job on '{}'", ep.name())) + .with_context(|| anyhow!("Error during running job on '{}'", endpoint_name)) .with_context(|| { Self::create_job_run_error( &job.uuid, &package.name, &package.version, - ep.uri(), + &endpoint_uri, &container_id, ) }) |