diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-03-05 13:20:12 +0100 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-03-05 13:20:13 +0100 |
commit | b818f01f5408c37d1a889b1ad61529321257f56a (patch) | |
tree | db23499d1ee5cad428b60049edb1c87e583c6076 /src/endpoint | |
parent | 60a3fa633a33e315c1439a9f2436fcdb48da62ae (diff) |
Move endpoint connection setup to utility module
Because we can re-use this function in our commandline endpoint maintenance
commands implementation.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint')
-rw-r--r-- | src/endpoint/mod.rs | 3 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 15 | ||||
-rw-r--r-- | src/endpoint/util.rs | 30 |
3 files changed, 34 insertions, 14 deletions
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index b87a303..4d24ff5 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -16,3 +16,6 @@ pub use scheduler::*; mod configured; pub use configured::*; + +pub mod util; + diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 34a79d7..5f0882b 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -17,14 +17,12 @@ use anyhow::Error; use anyhow::Result; use colored::Colorize; use diesel::PgConnection; -use futures::FutureExt; use indicatif::ProgressBar; use itertools::Itertools; use log::trace; use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedReceiver; -use tokio_stream::StreamExt; use uuid::Uuid; use crate::db::models as dbmodels; @@ -57,7 +55,7 @@ impl EndpointScheduler { submit: crate::db::models::Submit, log_dir: Option<PathBuf>, ) -> Result<Self> { - let endpoints = Self::setup_endpoints(endpoints).await?; + let endpoints = crate::endpoint::util::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { log_dir, @@ -69,17 +67,6 @@ impl EndpointScheduler { }) } - async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { - let unordered = futures::stream::FuturesUnordered::new(); - - for cfg in endpoints.into_iter() { - unordered - .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); - } - - unordered.collect().await - } - /// Schedule a Job /// /// # Warning diff --git a/src/endpoint/util.rs b/src/endpoint/util.rs new file mode 100644 index 0000000..5f6bbc8 --- /dev/null +++ b/src/endpoint/util.rs @@ -0,0 +1,30 @@ +// +// Copyright (c) 2020-2021 science+computing ag and other contributors +// +// This program and the accompanying materials are made +// available under the terms of the Eclipse Public License 2.0 +// which is available at https://www.eclipse.org/legal/epl-2.0/ +// +// SPDX-License-Identifier: EPL-2.0 +// + +use std::sync::Arc; + +use anyhow::Result; +use futures::FutureExt; +use tokio_stream::StreamExt; + +use crate::endpoint::Endpoint; +use crate::endpoint::EndpointConfiguration; + +pub async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { + let unordered = futures::stream::FuturesUnordered::new(); + + for cfg in endpoints.into_iter() { + unordered + .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); + } + + unordered.collect().await +} + |