summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-03-05 13:20:12 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-03-05 13:20:13 +0100
commitb818f01f5408c37d1a889b1ad61529321257f56a (patch)
treedb23499d1ee5cad428b60049edb1c87e583c6076 /src/endpoint
parent60a3fa633a33e315c1439a9f2436fcdb48da62ae (diff)
Move endpoint connection setup to utility module
Because we can re-use this function in our commandline endpoint maintenance commands implementation. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/mod.rs3
-rw-r--r--src/endpoint/scheduler.rs15
-rw-r--r--src/endpoint/util.rs30
3 files changed, 34 insertions, 14 deletions
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
index b87a303..4d24ff5 100644
--- a/src/endpoint/mod.rs
+++ b/src/endpoint/mod.rs
@@ -16,3 +16,6 @@ pub use scheduler::*;
mod configured;
pub use configured::*;
+
+pub mod util;
+
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 34a79d7..5f0882b 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -17,14 +17,12 @@ use anyhow::Error;
use anyhow::Result;
use colored::Colorize;
use diesel::PgConnection;
-use futures::FutureExt;
use indicatif::ProgressBar;
use itertools::Itertools;
use log::trace;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
-use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::db::models as dbmodels;
@@ -57,7 +55,7 @@ impl EndpointScheduler {
submit: crate::db::models::Submit,
log_dir: Option<PathBuf>,
) -> Result<Self> {
- let endpoints = Self::setup_endpoints(endpoints).await?;
+ let endpoints = crate::endpoint::util::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
log_dir,
@@ -69,17 +67,6 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
- let unordered = futures::stream::FuturesUnordered::new();
-
- for cfg in endpoints.into_iter() {
- unordered
- .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
- }
-
- unordered.collect().await
- }
-
/// Schedule a Job
///
/// # Warning
diff --git a/src/endpoint/util.rs b/src/endpoint/util.rs
new file mode 100644
index 0000000..5f6bbc8
--- /dev/null
+++ b/src/endpoint/util.rs
@@ -0,0 +1,30 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::FutureExt;
+use tokio_stream::StreamExt;
+
+use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointConfiguration;
+
+pub async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
+ let unordered = futures::stream::FuturesUnordered::new();
+
+ for cfg in endpoints.into_iter() {
+ unordered
+ .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
+ }
+
+ unordered.collect().await
+}
+