diff options
Diffstat (limited to 'src/endpoint')
-rw-r--r-- | src/endpoint/configured.rs | 108 | ||||
-rw-r--r-- | src/endpoint/mod.rs | 3 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 15 | ||||
-rw-r--r-- | src/endpoint/util.rs | 30 |
4 files changed, 142 insertions, 14 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 0d3927b..7265dd6 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -241,6 +241,114 @@ impl Endpoint { let run_jobs = self.running_jobs() as f64; 100.0 / max_jobs * run_jobs } + + /// Ping the endpoint (once) + pub async fn ping(&self) -> Result<String> { + self.docker.ping().await.map_err(Error::from) + } + + pub async fn stats(&self) -> Result<EndpointStats> { + self.docker + .info() + .await + .map(EndpointStats::from) + .map_err(Error::from) + } + + pub async fn container_stats(&self) -> Result<Vec<ContainerStat>> { + self.docker + .containers() + .list({ + &shiplift::builder::ContainerListOptions::builder() + .all() + .build() + }) + .await + .map_err(Error::from) + .map(|containers| { + containers + .into_iter() + .map(ContainerStat::from) + .collect() + }) + } + + pub async fn has_container_with_id(&self, id: &str) -> Result<bool> { + self.container_stats() + .await? + .iter() + .find(|st| st.id == id) + .map(Ok) + .transpose() + .map(|o| o.is_some()) + } + + pub async fn get_container_by_id(&self, id: &str) -> Result<Option<Container<'_>>> { + if self.has_container_with_id(id).await? { + Ok(Some(self.docker.containers().get(id))) + } else { + Ok(None) + } + } +} + +/// Helper type to store endpoint statistics +/// +/// Currently, this can only be generated from a shiplift::rep::Info, but it does not hold all +/// values the shiplift::rep::Info type holds, because some of these are not relevant for us. +/// +/// Later, this might hold endpoint stats from other endpoint implementations as well +pub struct EndpointStats { + pub name: String, + pub containers: u64, + pub images: u64, + pub id: String, + pub kernel_version: String, + pub mem_total: u64, + pub memory_limit: bool, + pub n_cpu: u64, + pub operating_system: String, + pub system_time: Option<String>, +} + +impl From<shiplift::rep::Info> for EndpointStats { + fn from(info: shiplift::rep::Info) -> Self { + EndpointStats { + name: info.name, + containers: info.containers, + images: info.images, + id: info.id, + kernel_version: info.kernel_version, + mem_total: info.mem_total, + memory_limit: info.memory_limit, + n_cpu: info.n_cpu, + operating_system: info.operating_system, + system_time: info.system_time, + } + } +} + +/// Helper type to store stats about a container +pub struct ContainerStat { + pub created: chrono::DateTime<chrono::Utc>, + pub id: String, + pub image: String, + pub image_id: String, + pub state: String, + pub status: String, +} + +impl From<shiplift::rep::Container> for ContainerStat { + fn from(cont: shiplift::rep::Container) -> Self { + ContainerStat { + created: cont.created, + id: cont.id, + image: cont.image, + image_id: cont.image_id, + state: cont.state, + status: cont.status, + } + } } pub struct EndpointHandle(Arc<Endpoint>); diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index b87a303..4d24ff5 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -16,3 +16,6 @@ pub use scheduler::*; mod configured; pub use configured::*; + +pub mod util; + diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 34a79d7..5f0882b 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -17,14 +17,12 @@ use anyhow::Error; use anyhow::Result; use colored::Colorize; use diesel::PgConnection; -use futures::FutureExt; use indicatif::ProgressBar; use itertools::Itertools; use log::trace; use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedReceiver; -use tokio_stream::StreamExt; use uuid::Uuid; use crate::db::models as dbmodels; @@ -57,7 +55,7 @@ impl EndpointScheduler { submit: crate::db::models::Submit, log_dir: Option<PathBuf>, ) -> Result<Self> { - let endpoints = Self::setup_endpoints(endpoints).await?; + let endpoints = crate::endpoint::util::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { log_dir, @@ -69,17 +67,6 @@ impl EndpointScheduler { }) } - async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { - let unordered = futures::stream::FuturesUnordered::new(); - - for cfg in endpoints.into_iter() { - unordered - .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); - } - - unordered.collect().await - } - /// Schedule a Job /// /// # Warning diff --git a/src/endpoint/util.rs b/src/endpoint/util.rs new file mode 100644 index 0000000..5f6bbc8 --- /dev/null +++ b/src/endpoint/util.rs @@ -0,0 +1,30 @@ +// +// Copyright (c) 2020-2021 science+computing ag and other contributors +// +// This program and the accompanying materials are made +// available under the terms of the Eclipse Public License 2.0 +// which is available at https://www.eclipse.org/legal/epl-2.0/ +// +// SPDX-License-Identifier: EPL-2.0 +// + +use std::sync::Arc; + +use anyhow::Result; +use futures::FutureExt; +use tokio_stream::StreamExt; + +use crate::endpoint::Endpoint; +use crate::endpoint::EndpointConfiguration; + +pub async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<Endpoint>>> { + let unordered = futures::stream::FuturesUnordered::new(); + + for cfg in endpoints.into_iter() { + unordered + .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(Arc::new))); + } + + unordered.collect().await +} + |