summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-03-10 10:32:24 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-03-10 11:48:45 +0100
commit0ecac14b26d6eea26328aefff1c4e88d25cd643c (patch)
treeba35fcf8ccbdc36384794826dfb7b05918351580
parenta554772d25026a9cf223514d9422c70fc9b15f69 (diff)
parent9a79643ced98567ab7b0c742f0d161cb5dd43578 (diff)
Merge branch 'subcommand-endpoint'
Conflicts: src/cli.rs src/main.rs from merging the "metrics" subcommand implementation branch first. Conflicts were trivial, so I resolved them here in the merge commit. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml4
-rw-r--r--src/cli.rs207
-rw-r--r--src/commands/db.rs87
-rw-r--r--src/commands/endpoint.rs300
-rw-r--r--src/commands/endpoint_container.rs465
-rw-r--r--src/commands/mod.rs4
-rw-r--r--src/commands/util.rs68
-rw-r--r--src/endpoint/configured.rs108
-rw-r--r--src/endpoint/mod.rs3
-rw-r--r--src/endpoint/scheduler.rs15
-rw-r--r--src/endpoint/util.rs30
-rw-r--r--src/main.rs1
12 files changed, 1203 insertions, 89 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 2ca07fe..638160a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,7 @@ anyhow = "1"
aquamarine = "0.1.6"
ascii_table = ">= 3.0.2"
atty = "0.2"
+bytesize = "1"
chrono = "0.4"
clap = "3.0.0-beta.2"
clap_generate = "3.0.0-beta.2"
@@ -30,6 +31,7 @@ futures = "0.3"
getset = "0.1"
git2 = "0.13"
handlebars = { version = "3", features = ["no_logging"] }
+humantime = "2.1"
indicatif = "0.15"
indoc = "1"
itertools = "0.10"
@@ -51,7 +53,7 @@ shiplift = "0.7"
syntect = "4.4"
tar = "0.4"
terminal_size = "0.1"
-tokio = { version = "1.0", features = ["macros", "fs", "process", "io-util"] }
+tokio = { version = "1.0", features = ["macros", "fs", "process", "io-util", "time"] }
tokio-stream = "0.1"
typed-builder = "0.9"
unindent = "0.1"
diff --git a/src/cli.rs b/src/cli.rs
index bc59171..22aae6d 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -837,6 +837,209 @@ pub fn cli<'a>() -> App<'a> {
.version(crate_version!())
.about("Print metrics about butido")
)
+
+ .subcommand(App::new("endpoint")
+ .version(crate_version!())
+ .about("Endpoint maintentance commands")
+ .arg(Arg::new("endpoint_name")
+ .required(false)
+ .multiple(false)
+ .index(1)
+ .value_name("ENDPOINT_NAME")
+ .about("Endpoint to talk to, or all if not given")
+ )
+
+ .subcommand(App::new("ping")
+ .version(crate_version!())
+ .about("Ping the endpoint(s)")
+ .arg(Arg::new("ping_n")
+ .required(false)
+ .multiple(false)
+ .long("times")
+ .short('n')
+ .value_name("N")
+ .default_value("10")
+ .about("How often to ping")
+ )
+ .arg(Arg::new("ping_sleep")
+ .required(false)
+ .multiple(false)
+ .long("sleep")
+ .value_name("N")
+ .default_value("1")
+ .about("How long to sleep between pings")
+ )
+ )
+ .subcommand(App::new("stats")
+ .version(crate_version!())
+ .about("Get stats for the endpoint(s)")
+ .arg(Arg::new("csv")
+ .required(false)
+ .multiple(false)
+ .long("csv")
+ .takes_value(false)
+ .about("Format output as CSV")
+ )
+ )
+ .subcommand(App::new("containers")
+ .version(crate_version!())
+ .about("Work with the containers of the endpoint(s)")
+ .subcommand(App::new("prune")
+ .version(crate_version!())
+ .about("Remove exited containers")
+ .arg(Arg::new("older_than")
+ .required(false)
+ .multiple(false)
+ .long("older-than")
+ .takes_value(true)
+ .value_name("DATE")
+ .about("List only containers that are older than DATE")
+ .validator(parse_date_from_string)
+ .conflicts_with("newer_than")
+ )
+
+ .arg(Arg::new("newer_than")
+ .required(false)
+ .multiple(false)
+ .long("newer-than")
+ .takes_value(true)
+ .value_name("DATE")
+ .about("List only containers that are newer than DATE")
+ .validator(parse_date_from_string)
+ .conflicts_with("older_than")
+ )
+ )
+ .subcommand(App::new("list")
+ .version(crate_version!())
+ .about("List the containers and stats about them")
+ .arg(Arg::new("csv")
+ .required(false)
+ .multiple(false)
+ .long("csv")
+ .takes_value(false)
+ .about("Format output as CSV")
+ )
+
+ .arg(Arg::new("list_stopped")
+ .required(false)
+ .multiple(false)
+ .long("list-stopped")
+ .takes_value(false)
+ .about("List stopped containers too")
+ )
+
+ .arg(Arg::new("filter_image")
+ .required(false)
+ .multiple(false)
+ .long("image")
+ .takes_value(true)
+ .value_name("IMAGE")
+ .about("List only containers of IMAGE")
+ )
+
+ .arg(Arg::new("older_than")
+ .required(false)
+ .multiple(false)
+ .long("older-than")
+ .takes_value(true)
+ .value_name("DATE")
+ .about("List only containers that are older than DATE")
+ .validator(parse_date_from_string)
+ .conflicts_with("newer_than")
+ )
+
+ .arg(Arg::new("newer_than")
+ .required(false)
+ .multiple(false)
+ .long("newer-than")
+ .takes_value(true)
+ .value_name("DATE")
+ .about("List only containers that are newer than DATE")
+ .validator(parse_date_from_string)
+ .conflicts_with("older_than")
+ )
+ )
+ )
+ .subcommand(App::new("container")
+ .version(crate_version!())
+ .about("Work with a specific container")
+ .arg(Arg::new("container_id")
+ .required(true)
+ .multiple(false)
+ .index(1)
+ .takes_value(true)
+ .value_name("CONTAINER_ID")
+ .about("Work with container CONTAINER_ID")
+ )
+ .subcommand(App::new("top")
+ .version(crate_version!())
+ .about("List the container processes")
+ .arg(Arg::new("csv")
+ .required(false)
+ .multiple(false)
+ .long("csv")
+ .takes_value(false)
+ .about("List top output as CSV")
+ )
+ )
+ .subcommand(App::new("kill")
+ .version(crate_version!())
+ .about("Kill the container")
+ .arg(Arg::new("signal")
+ .required(false)
+ .multiple(false)
+ .index(1)
+ .takes_value(true)
+ .value_name("SIGNAL")
+ .about("Kill container with this signal")
+ )
+ )
+ .subcommand(App::new("delete")
+ .version(crate_version!())
+ .about("Delete the container")
+ )
+ .subcommand(App::new("start")
+ .version(crate_version!())
+ .about("Start the container")
+ )
+ .subcommand(App::new("stop")
+ .version(crate_version!())
+ .about("Stop the container")
+ .arg(Arg::new("timeout")
+ .required(false)
+ .multiple(false)
+ .long("timeout")
+ .takes_value(true)
+ .value_name("DURATION")
+ .about("Timeout")
+ )
+ )
+ .subcommand(App::new("exec")
+ .version(crate_version!())
+ .about("Execute commands in the container")
+ .arg(Arg::new("commands")
+ .required(true)
+ .multiple(true)
+ .index(1)
+ .takes_value(true)
+ .value_name("CMD")
+ .about("Commands to execute in the container")
+ .long_about(indoc::indoc!(r#"
+ Execute a command in the container.
+
+ This does not handle TTY forwarding, so you cannot execute interactive commands in the container (e.g. htop).
+ For executing interactive things, you have to login to the container.
+ "#))
+ )
+ )
+
+ .subcommand(App::new("inspect")
+ .version(crate_version!())
+ .about("Display details about the container")
+ .long_about("Display details about the container. Do not assume the output format to be stable.")
+ )
+ )
+ )
}
fn script_arg_line_numbers<'a>() -> clap::Arg<'a> {
@@ -914,6 +1117,10 @@ fn dir_exists_validator(s: &str) -> Result<(), String> {
}
}
+fn parse_date_from_string(s: &str) -> std::result::Result<(), String> {
+ humantime::parse_rfc3339_weak(s).map_err(|e| e.to_string()).map(|_| ())
+}
+
#[cfg(test)]
mod tests {
use super::env_pass_validator;
diff --git a/src/commands/db.rs b/src/commands/db.rs
index 349fa79..0f11837 100644
--- a/src/commands/db.rs
+++ b/src/commands/db.rs
@@ -8,7 +8,6 @@
// SPDX-License-Identifier: EPL-2.0
//
-use std::fmt::Display;
use std::io::Write;
use std::path::PathBuf;
use std::process::Command;
@@ -141,7 +140,7 @@ fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::artifacts::dsl;
let csv = matches.is_present("csv");
- let hdrs = mk_header(vec!["id", "path", "released", "job id"]);
+ let hdrs = crate::commands::util::mk_header(vec!["id", "path", "released", "job id"]);
let conn = crate::db::establish_connection(conn_cfg)?;
let data = matches
.value_of("job_uuid")
@@ -180,7 +179,7 @@ fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
if data.is_empty() {
info!("No artifacts in database");
} else {
- display_data(hdrs, data, csv)?;
+ crate::commands::util::display_data(hdrs, data, csv)?;
}
Ok(())
@@ -190,7 +189,7 @@ fn envvars(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::envvars::dsl;
let csv = matches.is_present("csv");
- let hdrs = mk_header(vec!["id", "name", "value"]);
+ let hdrs = crate::commands::util::mk_header(vec!["id", "name", "value"]);
let conn = crate::db::establish_connection(conn_cfg)?;
let data = dsl::envvars
.load::<models::EnvVar>(&conn)?
@@ -201,7 +200,7 @@ fn envvars(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
if data.is_empty() {
info!("No environment variables in database");
} else {
- display_data(hdrs, data, csv)?;
+ crate::commands::util::display_data(hdrs, data, csv)?;
}
Ok(())
@@ -211,7 +210,7 @@ fn images(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::images::dsl;
let csv = matches.is_present("csv");
- let hdrs = mk_header(vec!["id", "name"]);
+ let hdrs = crate::commands::util::mk_header(vec!["id", "name"]);
let conn = crate::db::establish_connection(conn_cfg)?;
let data = dsl::images
.load::<models::Image>(&conn)?
@@ -222,7 +221,7 @@ fn images(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
if data.is_empty() {
info!("No images in database");
} else {
- display_data(hdrs, data, csv)?;
+ crate::commands::util::display_data(hdrs, data, csv)?;
}
Ok(())
@@ -230,7 +229,7 @@ fn images(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
fn submits(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
let csv = matches.is_present("csv");
- let hdrs = mk_header(vec!["id", "time", "uuid"]);
+ let hdrs = crate::commands::util::mk_header(vec!["id", "time", "uuid"]);
let conn = crate::db::establish_connection(conn_cfg)?;
// Helper to map Submit -> Vec<String>
@@ -284,7 +283,7 @@ fn submits(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
if data.is_empty() {
info!("No submits in database");
} else {
- display_data(hdrs, data, csv)?;
+ crate::commands::util::display_data(hdrs, data, csv)?;
}
Ok(())
@@ -294,7 +293,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::jobs::dsl;
let csv = matches.is_present("csv");
- let hdrs = mk_header(vec![
+ let hdrs = crate::commands::util::mk_header(vec![
"id",
"submit uuid",
"job uuid",
@@ -410,7 +409,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
if data.is_empty() {
info!("No submits in database");
} else {
- display_data(hdrs, data, csv)?;
+ crate::commands::util::display_data(hdrs, data, csv)?;
}
Ok(())
@@ -448,7 +447,7 @@ fn job(conn_cfg: DbConnectionConfig, config: &Configuration, matches: &ArgMatche
let success = parsed_log.is_successfull();
if csv {
- let hdrs = mk_header(vec![
+ let hdrs = crate::commands::util::mk_header(vec![
"UUID",
"success",
"Package Name",
@@ -471,7 +470,7 @@ fn job(conn_cfg: DbConnectionConfig, config: &Configuration, matches: &ArgMatche
data.4.name.to_string(),
data.0.container_hash,
]];
- display_data(hdrs, data, csv)
+ crate::commands::util::display_data(hdrs, data, csv)
} else {
let env_vars = if matches.is_present("show_env") {
Some({
@@ -595,7 +594,7 @@ fn job(conn_cfg: DbConnectionConfig, config: &Configuration, matches: &ArgMatche
fn releases(conn_cfg: DbConnectionConfig, config: &Configuration, matches: &ArgMatches) -> Result<()> {
let csv = matches.is_present("csv");
let conn = crate::db::establish_connection(conn_cfg)?;
- let header = mk_header(["Package", "Version", "Date", "Path"].to_vec());
+ let header = crate::commands::util::mk_header(["Package", "Version", "Date", "Path"].to_vec());
let data = schema::jobs::table
.inner_join(schema::packages::table)
.inner_join(schema::artifacts::table)
@@ -632,64 +631,6 @@ fn releases(conn_cfg: DbConnectionConfig, config: &Configuration, matches: &ArgM
})
.collect::<Vec<Vec<_>>>();
- display_data(header, data, csv)
-}
-
-
-fn mk_header(vec: Vec<&str>) -> Vec<ascii_table::Column> {
- vec.into_iter()
- .map(|name| ascii_table::Column {
- header: name.into(),
- align: ascii_table::Align::Left,
- ..Default::default()
- })
- .collect()
-}
-
-/// Display the passed data as nice ascii table,
-/// or, if stdout is a pipe, print it nicely parseable
-fn display_data<D: Display>(
- headers: Vec<ascii_table::Column>,
- data: Vec<Vec<D>>,
- csv: bool,
-) -> Result<()> {
- if csv {
- use csv::WriterBuilder;
- let mut wtr = WriterBuilder::new().from_writer(vec![]);
- for record in data.into_iter() {
- let r: Vec<String> = record.into_iter().map(|e| e.to_string()).collect();
-
- wtr.write_record(&r)?;
- }
-
- let out = std::io::stdout();
- let mut lock = out.lock();
-
- wtr.into_inner()
- .map_err(Error::from)
- .and_then(|t| String::from_utf8(t).map_err(Error::from))
- .and_then(|text| writeln!(lock, "{}", text).map_err(Error::from))
- } else if atty::is(atty::Stream::Stdout) {
- let mut ascii_table = ascii_table::AsciiTable {
- columns: Default::default(),
- max_width: terminal_size::terminal_size()
- .map(|tpl| tpl.0 .0 as usize) // an ugly interface indeed!
- .unwrap_or(80),
- };
-
- headers.into_iter().enumerate().for_each(|(i, c)| {
- ascii_table.columns.insert(i, c);
- });
-
- ascii_table.print(data);
- Ok(())
- } else {
- let out = std::io::stdout();
- let mut lock = out.lock();
- for list in data {
- writeln!(lock, "{}", list.iter().map(|d| d.to_string()).join(" "))?;
- }
- Ok(())
- }
+ crate::commands::util::display_data(header, data, csv)
}
diff --git a/src/commands/endpoint.rs b/src/commands/endpoint.rs
new file mode 100644
index 0000000..c1632e6
--- /dev/null
+++ b/src/commands/endpoint.rs
@@ -0,0 +1,300 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::str::FromStr;
+use std::sync::Arc;
+
+use anyhow::Error;
+use anyhow::Result;
+use anyhow::anyhow;
+use clap::ArgMatches;
+use log::{debug, info};
+use itertools::Itertools;
+use tokio_stream::StreamExt;
+
+use crate::config::Configuration;
+use crate::util::progress::ProgressBars;
+use crate::endpoint::Endpoint;
+
+pub async fn endpoint(matches: &ArgMatches, config: &Configuration, progress_generator: ProgressBars) -> Result<()> {
+ let endpoint_names = matches
+ .value_of("endpoint_name")
+ .map(String::from)
+ .map(|ep| vec![ep])
+ .unwrap_or_else(|| {
+ config.docker()
+ .endpoints()
+ .iter()
+ .map(|ep| ep.name())
+ .cloned()
+ .collect()
+ });
+
+ match matches.subcommand() {
+ Some(("ping", matches)) => ping(endpoint_names, matches, config, progress_generator).await,
+ Some(("stats", matches)) => stats(endpoint_names, matches, config, progress_generator).await,
+ Some(("container", matches)) => crate::commands::endpoint_container::container(endpoint_names, matches, config).await,
+ Some(("containers", matches)) => containers(endpoint_names, matches, config).await,
+ Some((other, _)) => Err(anyhow!("Unknown subcommand: {}", other)),
+ None => Err(anyhow!("No subcommand")),
+ }
+}
+
+async fn ping(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+ progress_generator: ProgressBars
+) -> Result<()> {
+ let n_pings = matches.value_of("ping_n").map(u64::from_str).transpose()?.unwrap(); // safe by clap
+ let sleep = matches.value_of("ping_sleep").map(u64::from_str).transpose()?.unwrap(); // safe by clap
+ let endpoints = connect_to_endpoints(config, &endpoint_names).await?;
+ let multibar = Arc::new({
+ let mp = indicatif::MultiProgress::new();
+ if progress_generator.hide() {
+ mp.set_draw_target(indicatif::ProgressDrawTarget::hidden());
+ }
+ mp
+ });
+
+ let ping_process = endpoints
+ .iter()
+ .map(|endpoint| {
+ let bar = multibar.add(progress_generator.bar());
+ bar.set_length(n_pings);
+ bar.set_message(&format!("Pinging {}", endpoint.name()));
+
+ async move {
+ for i in 1..(n_pings + 1) {
+ debug!("Pinging {} for the {} time", endpoint.name(), i);
+ let r = endpoint.ping().await;
+ bar.inc(1);
+ if let Err(e) = r {
+ bar.finish_with_message(&format!("Pinging {} failed", endpoint.name()));
+ return Err(e)
+ }
+
+ tokio::time::sleep(tokio::time::Duration::from_secs(sleep)).await;
+ }
+
+ bar.finish_with_message(&format!("Pinging {} successful", endpoint.name()));
+ Ok(())
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<()>>();
+
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+ tokio::join!(ping_process, multibar_block).0
+}
+
+async fn stats(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+ progress_generator: ProgressBars
+) -> Result<()> {
+ let csv = matches.is_present("csv");
+ let endpoints = connect_to_endpoints(config, &endpoint_names).await?;
+ let bar = progress_generator.bar();
+ bar.set_length(endpoint_names.len() as u64);
+ bar.set_message("Fetching stats");
+
+ let hdr = crate::commands::util::mk_header([
+ "Name",
+ "Containers",
+ "Images",
+ "Kernel",
+ "Memory",
+ "Memory limit",
+ "Cores",
+ "OS",
+ "System Time",
+ ].to_vec());
+
+ let data = endpoints
+ .into_iter()
+ .map(|endpoint| {
+ let bar = bar.clone();
+ async move {
+ let r = endpoint.stats().await;
+ bar.inc(1);
+ r
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await
+ .map_err(|e| {
+ bar.finish_with_message("Fetching stats errored");
+ e
+ })?
+ .into_iter()
+ .map(|stat| {
+ vec![
+ stat.name,
+ stat.containers.to_string(),
+ stat.images.to_string(),
+ stat.kernel_version,
+ bytesize::ByteSize::b(stat.mem_total).to_string(),
+ stat.memory_limit.to_string(),
+ stat.n_cpu.to_string(),
+ stat.operating_system.to_string(),
+ stat.system_time.unwrap_or_else(|| String::from("unknown")),
+ ]
+ })
+ .collect();
+
+ bar.finish_with_message("Fetching stats successful");
+ crate::commands::util::display_data(hdr, data, csv)
+}
+
+
+async fn containers(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+) -> Result<()> {
+ match matches.subcommand() {
+ Some(("list", matches)) => containers_list(endpoint_names, matches, config).await,
+ Some(("prune", matches)) => containers_prune(endpoint_names, matches, config).await,
+ Some((other, _)) => Err(anyhow!("Unknown subcommand: {}", other)),
+ None => Err(anyhow!("No subcommand")),
+ }
+}
+
+async fn containers_list(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+) -> Result<()> {
+ let list_stopped = matches.is_present("list_stopped");
+ let filter_image = matches.value_of("filter_image");
+ let older_than_filter = get_date_filter("older_than", matches)?;
+ let newer_than_filter = get_date_filter("newer_than", matches)?;
+ let csv = matches.is_present("csv");
+ let hdr = crate::commands::util::mk_header([
+ "Endpoint",
+ "Container id",
+ "Image",
+ "Created",
+ "Status",
+ ].to_vec());
+
+ let data = connect_to_endpoints(config, &endpoint_names)
+ .await?
+ .into_iter()
+ .map(|ep| async move {
+ ep.container_stats().await.map(|stats| (ep.name().clone(), stats))
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<(_, _)>>>()
+ .await?
+ .into_iter()
+ .map(|tpl| {
+ let endpoint_name = tpl.0;
+ tpl.1
+ .into_iter()
+ .filter(|stat| list_stopped || stat.state != "exited")
+ .filter(|stat| filter_image.map(|fim| fim == stat.image).unwrap_or(true))
+ .filter(|stat| older_than_filter.as_ref().map(|time| time > &stat.created).unwrap_or(true))
+ .filter(|stat| newer_than_filter.as_ref().map(|time| time < &stat.created).unwrap_or(true))
+ .map(|stat| {
+ vec![
+ endpoint_name.clone(),
+ stat.id,
+ stat.image,
+ stat.created.to_string(),
+ stat.status,
+ ]
+ })
+ .collect::<Vec<Vec<String>>>()
+ })
+ .flatten()
+ .collect::<Vec<Vec<String>>>();
+
+ crate::commands::util::display_data(hdr, data, csv)
+}
+
+async fn containers_prune(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+) -> Result<()> {
+ let older_than_filter = get_date_filter("older_than", matches)?;
+ let newer_than_filter = get_date_filter("newer_than", matches)?;
+
+ let stats = connect_to_endpoints(config, &endpoint_names)
+ .await?
+ .into_iter()
+ .map(move |ep| async move {
+ let stats = ep.container_stats()
+ .await?
+ .into_iter()
+ .filter(|stat| stat.state == "exited")
+ .filter(|stat| older_than_filter.as_ref().map(|time| time > &stat.created).unwrap_or(true))
+ .filter(|stat| newer_than_filter.as_ref().map(|time| time < &stat.created).unwrap_or(true))
+ .map(|stat| (ep.clone(), stat))
+ .collect::<Vec<(_, _)>>();
+ Ok(stats)
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<_>>>()
+ .await?;
+
+ let prompt = format!("Really delete {} Containers?", stats.iter().flatten().count());
+ dialoguer::Confirm::new().with_prompt(prompt).interact()?;
+
+ stats.into_iter()
+ .map(Vec::into_iter)
+ .flatten()
+ .map(|(ep, stat)| async move {
+ ep.get_container_by_id(&stat.id)
+ .await?
+ .ok_or_else(|| anyhow!("Failed to find existing container {}", stat.id))?
+ .delete()
+ .await
+ .map_err(Error::from)
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<()>>()
+ .await
+}
+
+fn get_date_filter(name: &str, matches: &ArgMatches) -> Result<Option<chrono::DateTime::<chrono::Local>>> {
+ matches.value_of(name)
+ .map(humantime::parse_rfc3339_weak)
+ .transpose()?
+ .map(chrono::DateTime::<chrono::Local>::from)
+ .map(Ok)
+ .transpose()
+}
+
+/// Helper function to connect to all endpoints from the configuration, that appear (by name) in
+/// the `endpoint_names` list
+pub(super) async fn connect_to_endpoints(config: &Configuration, endpoint_names: &[String]) -> Result<Vec<Arc<Endpoint>>> {
+ let endpoint_configurations = config
+ .docker()
+ .endpoints()
+ .iter()
+ .filter(|ep| endpoint_names.contains(ep.name()))
+ .cloned()
+ .map(|ep_cfg| {
+ crate::endpoint::EndpointConfiguration::builder()
+ .endpoint(ep_cfg)
+ .required_images(config.docker().images().clone())
+ .required_docker_versions(config.docker().docker_versions().clone())
+ .required_docker_api_versions(config.docker().docker_api_versions().clone())
+ .build()
+ })
+ .collect::<Vec<_>>();
+
+ info!("Endpoint config build");
+ info!("Connecting to {n} endpoints: {eps}",
+ n = endpoint_configurations.len(),
+ eps = endpoint_configurations.iter().map(|epc| epc.endpoint().name()).join(", "));
+
+ crate::endpoint::util::setup_endpoints(endpoint_configurations).await
+}
diff --git a/src/commands/endpoint_container.rs b/src/commands/endpoint_container.rs
new file mode 100644
index 0000000..1861cc7
--- /dev/null
+++ b/src/commands/endpoint_container.rs
@@ -0,0 +1,465 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::str::FromStr;
+
+use anyhow::Error;
+use anyhow::Result;
+use anyhow::anyhow;
+use clap::ArgMatches;
+use tokio_stream::StreamExt;
+use shiplift::Container;
+
+use crate::config::Configuration;
+
+pub async fn container(endpoint_names: Vec<String>,
+ matches: &ArgMatches,
+ config: &Configuration,
+) -> Result<()> {
+ let container_id = matches.value_of("container_id").unwrap();
+ let endpoints = crate::commands::endpoint::connect_to_endpoints(config, &endpoint_names).await?;
+ let relevant_endpoints = endpoints.into_iter()
+ .map(|ep| async {
+ ep.has_container_with_id(container_id)
+ .await
+ .map(|b| (ep, b))
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<(_, bool)>>>()
+ .await?
+ .into_iter()
+ .filter_map(|tpl| {
+ if tpl.1 {
+ Some(tpl.0)
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ if relevant_endpoints.len() > 1 {
+ return Err(anyhow!("Found more than one container for id {}", container_id))
+ }
+
+ let relevant_endpoint = relevant_endpoints.get(0).ok_or_else(|| {
+ anyhow!("Found no container for id {}", container_id)
+ })?;
+
+ let container = relevant_endpoint.get_container_by_id(container_id)
+ .await?
+ .ok_or_else(|| anyhow!("Cannot find container {} on {}", container_id, relevant_endpoint.name()))?;
+
+ let confirm = |prompt: String| dialoguer::Confirm::new().with_prompt(prompt).interact();
+
+ match matches.subcommand() {
+ Some(("top", matches)) => top(matches, container).await,
+ Some(("kill", matches)) => {
+ confirm({
+ if let Some(sig) = matches.value_of("signal").as_ref() {
+ format!("Really kill {} with {}?", container_id, sig)
+ } else {
+ format!("Really kill {}?", container_id)
+ }
+ })?;
+
+ kill(matches, container).await
+ },
+ Some(("delete", _)) => {
+ confirm(format!("Really delete {}?", container_id))?;
+ delete(container).await
+ },
+ Some(("start", _)) => {
+ confirm(format!("Really start {}?", container_id))?;
+ start(container).await
+ },
+ Some(("stop", matches)) => {
+ confirm(format!("Really stop {}?", container_id))?;
+ stop(matches, container).await
+ },
+ Some(("exec", matches)) => {
+ confirm({
+ let commands = matches.values_of("commands").unwrap().collect::<Vec<&str>>();
+ format!("Really run '{}' in {}?", commands.join(" "), container_id)
+ })?;
+ exec(matches, container).await
+ },
+ Some(("inspect", _)) => inspect(container).await,
+ Some((other, _)) => Err(anyhow!("Unknown subcommand: {}", other)),
+ None => Err(anyhow!("No subcommand")),
+ }
+}
+
+async fn top(matches: &ArgMatches, container: Container<'_>) -> Result<()> {
+ let top = container.top(None).await?;
+ let hdr = crate::commands::util::mk_header(top.titles.iter().map(|s| s.as_ref()).collect());
+ crate::commands::util::display_data(hdr, top.processes, matches.is_present("csv"))
+}
+
+async fn kill(matches: &ArgMatches, container: Container<'_>) -> Result<()> {
+ let signal = matches.value_of("signal");
+ container.kill(signal).await.map_err(Error::from)
+}
+
+async fn delete(container: Contain