diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-04-28 14:06:30 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-05-11 10:15:50 +0200 |
commit | aa3b529f3f23275ff14a222401af80d63e293697 (patch) | |
tree | 00deddcd6f603026cc34304aad4c8e7552e5ec17 /src/commands/endpoint.rs | |
parent | 2c0c0eb515aa4d3d3de1ab47351733ba59c983f1 (diff) |
Add subcommand to get top output of containers
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/commands/endpoint.rs')
-rw-r--r-- | src/commands/endpoint.rs | 102 |
1 files changed, 101 insertions, 1 deletions
diff --git a/src/commands/endpoint.rs b/src/commands/endpoint.rs index f7f57b1..79f1d98 100644 --- a/src/commands/endpoint.rs +++ b/src/commands/endpoint.rs @@ -8,14 +8,17 @@ // SPDX-License-Identifier: EPL-2.0 // +use std::collections::HashMap; +use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; use anyhow::Error; +use anyhow::Context; use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; -use log::{debug, info}; +use log::{debug, info, trace}; use itertools::Itertools; use tokio_stream::StreamExt; @@ -164,6 +167,7 @@ async fn containers(endpoint_names: Vec<EndpointName>, match matches.subcommand() { Some(("list", matches)) => containers_list(endpoint_names, matches, config).await, Some(("prune", matches)) => containers_prune(endpoint_names, matches, config).await, + Some(("top", matches)) => containers_top(endpoint_names, matches, config).await, Some((other, _)) => Err(anyhow!("Unknown subcommand: {}", other)), None => Err(anyhow!("No subcommand")), } @@ -267,6 +271,102 @@ async fn containers_prune(endpoint_names: Vec<EndpointName>, .await } +async fn containers_top(endpoint_names: Vec<EndpointName>, + matches: &ArgMatches, + config: &Configuration, +) -> Result<()> { + let limit = matches.value_of("limit").map(usize::from_str).transpose()?; + let older_than_filter = get_date_filter("older_than", matches)?; + let newer_than_filter = get_date_filter("newer_than", matches)?; + let csv = matches.is_present("csv"); + + let data = connect_to_endpoints(config, &endpoint_names) + .await? + .into_iter() + .inspect(|ep| trace!("Fetching stats for endpoint: {}", ep.name())) + .map(move |ep| async move { + let stats = ep.container_stats() + .await? + .into_iter() + .inspect(|stat| trace!("Fetching stats for container: {}", stat.id)) + .filter(|stat| stat.state == "running") + .filter(|stat| older_than_filter.as_ref().map(|time| time > &stat.created).unwrap_or(true)) + .filter(|stat| newer_than_filter.as_ref().map(|time| time < &stat.created).unwrap_or(true)) + .map(|stat| (ep.clone(), stat)) + .collect::<Vec<(_, _)>>(); + Ok(stats) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<_>>>() + .await? + .into_iter() + .map(Vec::into_iter) + .flatten() + .inspect(|(_ep, stat)| trace!("Fetching container: {}", stat.id)) + .map(|(ep, stat)| async move { + ep.get_container_by_id(&stat.id) + .await? + .ok_or_else(|| anyhow!("Failed to find existing container {}", stat.id))? + .top(None) + .await + .with_context(|| anyhow!("Fetching 'top' for {}", stat.id)) + .map_err(Error::from) + .map(|top| (stat.id, top)) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<_>>>() + .await? + .into_iter() + .inspect(|(cid, _top)| trace!("Processing top of container: {}", cid)) + .map(|(container_id, top)| { + let processes = if let Some(limit) = limit { + top.processes.into_iter().take(limit).collect() + } else { + top.processes + }; + + let hm = top.titles + .into_iter() + .zip(processes.into_iter()) + .collect::<HashMap<String, Vec<String>>>(); + (container_id, hm) + }) + .collect::<HashMap<String, HashMap<String, Vec<String>>>>(); + + let hdr = crate::commands::util::mk_header({ + std::iter::once("Container ID") + .chain({ + data.values() + .map(|hm| hm.keys()) + .flatten() + .map(|s| s.deref()) + }) + .collect::<Vec<&str>>() + .into_iter() + .unique() + .collect() + }); + + let data = data.into_iter() + .map(|(container_id, top_hm)| { + top_hm.values() + .map(|t| std::iter::once(container_id.clone()).chain(t.iter().map(String::clone)).collect()) + .collect::<Vec<Vec<String>>>() + }) + .flatten() + + // ugly hack to bring order to the galaxy + .sorted_by(|v1, v2| if let (Some(f1), Some(f2)) = (v1.iter().next(), v2.iter().next()) { + f1.cmp(f2) + } else { + std::cmp::Ordering::Less + }) + .collect::<Vec<Vec<String>>>(); + + crate::commands::util::display_data(hdr, data, csv) +} + + fn get_date_filter(name: &str, matches: &ArgMatches) -> Result<Option<chrono::DateTime::<chrono::Local>>> { matches.value_of(name) .map(humantime::parse_rfc3339_weak) |