summaryrefslogtreecommitdiffstats
path: root/src/commands/endpoint.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-04-28 14:06:30 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-05-11 10:15:50 +0200
commitaa3b529f3f23275ff14a222401af80d63e293697 (patch)
tree00deddcd6f603026cc34304aad4c8e7552e5ec17 /src/commands/endpoint.rs
parent2c0c0eb515aa4d3d3de1ab47351733ba59c983f1 (diff)
Add subcommand to get top output of containers
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/commands/endpoint.rs')
-rw-r--r--src/commands/endpoint.rs102
1 files changed, 101 insertions, 1 deletions
diff --git a/src/commands/endpoint.rs b/src/commands/endpoint.rs
index f7f57b1..79f1d98 100644
--- a/src/commands/endpoint.rs
+++ b/src/commands/endpoint.rs
@@ -8,14 +8,17 @@
// SPDX-License-Identifier: EPL-2.0
//
+use std::collections::HashMap;
+use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Error;
+use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
-use log::{debug, info};
+use log::{debug, info, trace};
use itertools::Itertools;
use tokio_stream::StreamExt;
@@ -164,6 +167,7 @@ async fn containers(endpoint_names: Vec<EndpointName>,
match matches.subcommand() {
Some(("list", matches)) => containers_list(endpoint_names, matches, config).await,
Some(("prune", matches)) => containers_prune(endpoint_names, matches, config).await,
+ Some(("top", matches)) => containers_top(endpoint_names, matches, config).await,
Some((other, _)) => Err(anyhow!("Unknown subcommand: {}", other)),
None => Err(anyhow!("No subcommand")),
}
@@ -267,6 +271,102 @@ async fn containers_prune(endpoint_names: Vec<EndpointName>,
.await
}
+async fn containers_top(endpoint_names: Vec<EndpointName>,
+ matches: &ArgMatches,
+ config: &Configuration,
+) -> Result<()> {
+ let limit = matches.value_of("limit").map(usize::from_str).transpose()?;
+ let older_than_filter = get_date_filter("older_than", matches)?;
+ let newer_than_filter = get_date_filter("newer_than", matches)?;
+ let csv = matches.is_present("csv");
+
+ let data = connect_to_endpoints(config, &endpoint_names)
+ .await?
+ .into_iter()
+ .inspect(|ep| trace!("Fetching stats for endpoint: {}", ep.name()))
+ .map(move |ep| async move {
+ let stats = ep.container_stats()
+ .await?
+ .into_iter()
+ .inspect(|stat| trace!("Fetching stats for container: {}", stat.id))
+ .filter(|stat| stat.state == "running")
+ .filter(|stat| older_than_filter.as_ref().map(|time| time > &stat.created).unwrap_or(true))
+ .filter(|stat| newer_than_filter.as_ref().map(|time| time < &stat.created).unwrap_or(true))
+ .map(|stat| (ep.clone(), stat))
+ .collect::<Vec<(_, _)>>();
+ Ok(stats)
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await?
+ .into_iter()
+ .map(Vec::into_iter)
+ .flatten()
+ .inspect(|(_ep, stat)| trace!("Fetching container: {}", stat.id))
+ .map(|(ep, stat)| async move {
+ ep.get_container_by_id(&stat.id)
+ .await?
+ .ok_or_else(|| anyhow!("Failed to find existing container {}", stat.id))?
+ .top(None)
+ .await
+ .with_context(|| anyhow!("Fetching 'top' for {}", stat.id))
+ .map_err(Error::from)
+ .map(|top| (stat.id, top))
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await?
+ .into_iter()
+ .inspect(|(cid, _top)| trace!("Processing top of container: {}", cid))
+ .map(|(container_id, top)| {
+ let processes = if let Some(limit) = limit {
+ top.processes.into_iter().take(limit).collect()
+ } else {
+ top.processes
+ };
+
+ let hm = top.titles
+ .into_iter()
+ .zip(processes.into_iter())
+ .collect::<HashMap<String, Vec<String>>>();
+ (container_id, hm)
+ })
+ .collect::<HashMap<String, HashMap<String, Vec<String>>>>();
+
+ let hdr = crate::commands::util::mk_header({
+ std::iter::once("Container ID")
+ .chain({
+ data.values()
+ .map(|hm| hm.keys())
+ .flatten()
+ .map(|s| s.deref())
+ })
+ .collect::<Vec<&str>>()
+ .into_iter()
+ .unique()
+ .collect()
+ });
+
+ let data = data.into_iter()
+ .map(|(container_id, top_hm)| {
+ top_hm.values()
+ .map(|t| std::iter::once(container_id.clone()).chain(t.iter().map(String::clone)).collect())
+ .collect::<Vec<Vec<String>>>()
+ })
+ .flatten()
+
+ // ugly hack to bring order to the galaxy
+ .sorted_by(|v1, v2| if let (Some(f1), Some(f2)) = (v1.iter().next(), v2.iter().next()) {
+ f1.cmp(f2)
+ } else {
+ std::cmp::Ordering::Less
+ })
+ .collect::<Vec<Vec<String>>>();
+
+ crate::commands::util::display_data(hdr, data, csv)
+}
+
+
fn get_date_filter(name: &str, matches: &ArgMatches) -> Result<Option<chrono::DateTime::<chrono::Local>>> {
matches.value_of(name)
.map(humantime::parse_rfc3339_weak)