summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/configured.rs108
-rw-r--r--src/endpoint/mod.rs3
-rw-r--r--src/endpoint/scheduler.rs15
-rw-r--r--src/endpoint/util.rs30
4 files changed, 142 insertions, 14 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 0d3927b..7265dd6 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -241,6 +241,114 @@ impl Endpoint {
let run_jobs = self.running_jobs() as f64;
100.0 / max_jobs * run_jobs
}
+
+ /// Ping the endpoint (once)
+ pub async fn ping(&self) -> Result<String> {
+ self.docker.ping().await.map_err(Error::from)
+ }
+
+ pub async fn stats(&self) -> Result<EndpointStats> {
+ self.docker
+ .info()
+ .await
+ .map(EndpointStats::from)
+ .map_err(Error::from)
+ }
+
+ pub async fn container_stats(&self) -> Result<Vec<ContainerStat>> {
+ self.docker
+ .containers()
+ .list({
+ &shiplift::builder::ContainerListOptions::builder()
+ .all()
+ .build()
+ })
+ .await
+ .map_err(Error::from)
+ .map(|containers| {
+ containers
+ .into_iter()
+ .map(ContainerStat::from)
+ .collect()
+ })
+ }
+
+ pub async fn has_container_with_id(&self, id: &str) -> Result<bool> {
+ self.container_stats()
+ .await?
+ .iter()
+ .find(|st| st.id == id)
+ .map(Ok)
+ .transpose()
+ .map(|o| o.is_some())
+ }
+
+ pub async fn get_container_by_id(&self, id: &str) -> Result<Option<Container<'_>>> {
+ if self.has_container_with_id(id).await? {
+ Ok(Some(self.docker.containers().get(id)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+/// Helper type to store endpoint statistics
+///
+/// Currently, this can only be generated from a shiplift::rep::Info, but it does not hold all
+/// values the shiplift::rep::Info type holds, because some of these are not relevant for us.
+///
+/// Later, this might hold endpoint stats from other endpoint implementations as well
+pub struct EndpointStats {
+ pub name: String,
+ pub containers: u64,
+ pub images: u64,
+ pub id: String,
+ pub kernel_version: String,
+ pub mem_total: u64,
+ pub memory_limit: bool,
+ pub n_cpu: u64,
+ pub operating_system: String,
+ pub system_time: Option<String>,
+}
+
+impl From<shiplift::rep::Info> for EndpointStats {
+ fn from(info: shiplift::rep::Info) -> Self {
+ EndpointStats {
+ name: info.name,
+ containers: info.containers,
+ images: info.images,
+ id: info.id,
+ kernel_version: info.kernel_version,
+ mem_total: info.mem_total,
+ memory_limit: info.memory_limit,
+ n_cpu: info.n_cpu,
+ operating_system: info.operating_system,
+ system_time: info.system_time,
+ }
+ }
+}
+
+/// Helper type to store stats about a container
+pub struct ContainerStat {
+ pub created: chrono::DateTime<chrono::Utc>,
+ pub id: String,
+ pub image: String,
+ pub image_id: String,
+ pub state: String,
+ pub status: String,
+}
+
+impl From<shiplift::rep::Container> for ContainerStat {
+ fn from(cont: shiplift::rep::Container) -> Self {
+ ContainerStat {
+ created: cont.created,
+ id: cont.id,
+ image: cont.image,
+ image_id: cont.image_id,
+ state: cont.state,
+ status: cont.status,
+ }
+ }
}
pub struct EndpointHandle(Arc<Endpoint>);
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
index b87a303..4d24ff5 100644
--- a/src/endpoint/mod.rs
+++ b/src/endpoint/mod.rs
@@ -16,3 +16,6 @@ pub use scheduler::*;
mod configured;
pub use configured::*;
+
+pub mod util;
+
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 34a79d7..5f0882b 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -17,14 +17,12 @@ use anyhow::Error;
use anyhow::Result;
use colored::Colorize;
use diesel::PgConnection;
-use futures::FutureExt;
use indicatif::ProgressBar;
use itertools::Itertools;
use log::trace;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
-use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::db::models as dbmodels;
@@ -57,7 +55,7 @@ impl EndpointScheduler {
submit: crate::db::models::Submit,
log_dir: Option<PathBuf>,
) -> Result<Self> {
- let endpoints = Self::setup_endpoints(endpoints).await?;
+ let endpoints = crate::endpoint::util::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
log_dir,
@@ -69,17 +67,6 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
- let unordered = futures::stream::FuturesUnordered::new();
-
- for cfg in endpoints.into_iter() {
- unordered
- .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
- }
-
- unordered.collect().await
- }
-
/// Schedule a Job
///
/// # Warning
diff --git a/src/endpoint/util.rs b/src/endpoint/util.rs
new file mode 100644
index 0000000..5f6bbc8
--- /dev/null
+++ b/src/endpoint/util.rs
@@ -0,0 +1,30 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::FutureExt;
+use tokio_stream::StreamExt;
+
+use crate::endpoint::Endpoint;
+use crate::endpoint::EndpointConfiguration;
+
+pub async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> {
+ let unordered = futures::stream::FuturesUnordered::new();
+
+ for cfg in endpoints.into_iter() {
+ unordered
+ .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new)));
+ }
+
+ unordered.collect().await
+}
+