summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml (renamed from plugins/log_request_plugin/Cargo.toml)27
-rw-r--r--plugins/c8y_log_plugin/src/config.rs101
-rw-r--r--plugins/c8y_log_plugin/src/error.rs8
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs171
-rw-r--r--plugins/c8y_log_plugin/src/main.rs232
-rw-r--r--plugins/log_request_plugin/src/main.rs66
-rw-r--r--plugins/log_request_plugin/src/smartrest.rs191
7 files changed, 527 insertions, 269 deletions
diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 4cb07317..d8158abe 100644
--- a/plugins/log_request_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -1,38 +1,41 @@
[package]
-name = "tedge_logfile_request_plugin"
+name = "c8y_log_plugin"
version = "0.6.4"
authors = ["thin-edge.io team <info@thin-edge.io>"]
edition = "2021"
rust-version = "1.58.1"
license = "Apache-2.0"
-description = "Thin.edge.io operation plugin for Cumulocity log request"
+description = "Thin-edge device log file retriever for Cumulocity"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
[package.metadata.deb]
-pre-depends = "tedge_mapper"
+maintainer-scripts = "../../configuration/debian/c8y_log_plugin"
assets = [
- ["../../configuration/contrib/operations/c8y/c8y_LogfileRequest", "/etc/tedge/operations/c8y/", "644"],
- ["target/release/tedge_logfile_request_plugin", "/usr/bin/tedge_logfile_request_plugin", "755"],
+ ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"],
+ ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"],
]
[dependencies]
anyhow = "1.0"
-async-trait = "0.1"
c8y_api = { path = "../../crates/core/c8y_api" }
c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
+clap = { version = "3.0", features = ["cargo", "derive"] }
csv = "1.1"
-futures = "0.3"
-mockall = "0.10"
-reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
+glob = "0.3"
+inotify = "0.10"
+mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
+tedge_config = { path = "../../crates/common/tedge_config" }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
-tedge_config = { path = "../../crates/common/tedge_config" }
-mqtt_channel = { path = "../../crates/common/mqtt_channel" }
[dev-dependencies]
+assert_matches = "1.5"
+mockall = "0.11"
tempfile = "3.3"
+test-case = "2.0"
+serial_test = "0.6"
diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs
new file mode 100644
index 00000000..580fba95
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/config.rs
@@ -0,0 +1,101 @@
+use c8y_smartrest::topic::C8yTopic;
+use mqtt_channel::Message;
+use serde::Deserialize;
+use std::{borrow::Borrow, path::Path};
+use std::{collections::HashSet, fs};
+use tracing::warn;
+
+#[derive(Deserialize, Debug, Eq, PartialEq, Default)]
+#[serde(deny_unknown_fields)]
+pub struct LogPluginConfig {
+ pub files: Vec<FileEntry>,
+}
+
+#[derive(Deserialize, Debug, Eq, Default, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct FileEntry {
+ pub(crate) path: String,
+ #[serde(rename = "type")]
+ pub config_type: String,
+}
+
+impl PartialEq for FileEntry {
+ fn eq(&self, other: &Self) -> bool {
+ self.config_type == other.config_type
+ }
+}
+
+impl Borrow<String> for FileEntry {
+ fn borrow(&self) -> &String {
+ &self.config_type
+ }
+}
+
+impl LogPluginConfig {
+ pub fn new(config_file_path: &Path) -> Self {
+ let config = Self::read_config(config_file_path);
+ config
+ }
+
+ pub fn read_config(path: &Path) -> Self {
+ let path_str = path.display().to_string();
+ match fs::read_to_string(path) {
+ Ok(contents) => match toml::from_str(contents.as_str()) {
+ Ok(config) => config,
+ _ => {
+ warn!("The config file {} is malformed.", path_str);
+ Self::default()
+ }
+ },
+ Err(_) => {
+ warn!(
+ "The config file {} does not exist or is not readable.",
+ path_str
+ );
+ Self::default()
+ }
+ }
+ }
+
+ pub fn to_supported_config_types_message(&self) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ Ok(Message::new(&topic, self.to_smartrest_payload()))
+ }
+
+ pub fn get_all_file_types(&self) -> Vec<String> {
+ self.files
+ .iter()
+ .map(|x| x.config_type.to_string())
+ .collect::<HashSet<_>>()
+ .iter()
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ }
+
+ // 118,typeA,typeB,...
+ fn to_smartrest_payload(&self) -> String {
+ let mut config_types = self.get_all_file_types();
+ let () = config_types.sort();
+ let supported_config_types = config_types.join(",");
+ format!("118,{supported_config_types}")
+ }
+}
+
+#[test]
+fn test_no_duplicated_file_types() {
+ let files = vec![
+ FileEntry {
+ path: "a/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ FileEntry {
+ path: "some/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ ];
+ let logs_config = LogPluginConfig { files: files };
+ assert_eq!(
+ logs_config.get_all_file_types(),
+ vec!["type_one".to_string()]
+ );
+}
diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs
new file mode 100644
index 00000000..045d4bf3
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/error.rs
@@ -0,0 +1,8 @@
+#[derive(thiserror::Error, Debug)]
+pub enum LogRetrievalError {
+ #[error(transparent)]
+ FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError),
+
+ #[error(transparent)]
+ FromConfigSetting(#[from] tedge_config::ConfigSettingError),
+}
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
new file mode 100644
index 00000000..35867948
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -0,0 +1,171 @@
+use std::path::Path;
+
+use glob::glob;
+
+use crate::config::LogPluginConfig;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::{
+ smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ TryIntoOperationStatusMessage,
+ },
+};
+use mqtt_channel::{Connection, SinkExt};
+
+pub struct LogfileRequest {}
+
+impl TryIntoOperationStatusMessage for LogfileRequest {
+ /// returns a c8y message specifying to set log status to executing.
+ ///
+ /// example message: '501,c8y_LogfileRequest'
+ fn status_executing() -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()
+ }
+
+ fn status_successful(
+ parameter: Option<String>,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(&parameter.unwrap())
+ .to_smartrest()
+ }
+
+ fn status_failed(
+ failure_reason: String,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yLogFileRequest,
+ failure_reason,
+ )
+ .to_smartrest()
+ }
+}
+/// Reads tedge logs according to `SmartRestLogRequest`.
+///
+/// If needed, logs are concatenated.
+///
+/// Logs are sorted alphanumerically from oldest to newest.
+///
+/// # Examples
+///
+/// ```
+/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
+/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+/// )
+/// .unwrap();
+///
+/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
+/// ```
+pub fn read_tedge_logs(
+ smartrest_obj: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+) -> Result<String, anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(&plugin_config_path);
+ let mut output = String::new();
+
+ let mut files_to_send = Vec::new();
+ for files in &plugin_config.files {
+ let maybe_file_path = files.path.as_str(); // because it can be a glob pattern
+ let file_type = files.config_type.as_str();
+
+ if !file_type.eq(&smartrest_obj.log_type) {
+ continue;
+ }
+
+ // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see:
+ // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24
+ for entry in glob(maybe_file_path)? {
+ let file_path = entry?;
+ if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) {
+ if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to)
+ {
+ files_to_send.push(file_path);
+ }
+ } else {
+ files_to_send.push(file_path);
+ }
+ }
+ }
+
+ // loop sorted vector and push store log file to `output`
+ let mut line_counter: usize = 0;
+ for entry in files_to_send {
+ dbg!("files to read:", &entry);
+ let file_content = std::fs::read_to_string(&entry)?;
+ if file_content.is_empty() {
+ continue;
+ }
+
+ // adding file header only if line_counter permits more lines to be added
+ match &entry.file_stem().and_then(|f| f.to_str()) {
+ Some(file_name) if line_counter < smartrest_obj.lines => {
+ output.push_str(&format!("filename: {}\n", file_name));
+ }
+ _ => {}
+ }
+
+ // split at new line delimiter ("\n")
+ let mut lines = file_content.lines().rev();
+ while line_counter < smartrest_obj.lines {
+ if let Some(haystack) = lines.next() {
+ if let Some(needle) = &smartrest_obj.needle {
+ if haystack.contains(needle) {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no lines.next()
+ break;
+ }
+ }
+ }
+ Ok(output)
+}
+
+pub async fn handle_logfile_request_operation(
+ smartrest_request: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ // executing
+ let executing = LogfileRequest::executing()?;
+ let () = mqtt_client.published.send(executing).await?;
+
+ let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?;
+
+ let upload_event_url = http_client
+ .upload_log_binary(&smartrest_request.log_type, &log_content)
+ .await?;
+
+ let successful = LogfileRequest::successful(Some(upload_event_url))?;
+ let () = mqtt_client.published.send(successful).await?;
+
+ Ok(())
+}
+
+pub async fn handle_dynamic_log_type_update(
+ mqtt_client: &mut Connection,
+ config_dir: &Path,
+) -> Result<(), anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(config_dir);
+ let msg = plugin_config.to_supported_config_types_message()?;
+ let () = mqtt_client.published.send(msg).await?;
+ Ok(())
+}
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
new file mode 100644
index 00000000..a799a9b2
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -0,0 +1,232 @@
+mod config;
+mod error;
+mod logfile_request;
+
+use anyhow::Result;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
+use c8y_smartrest::topic::C8yTopic;
+use clap::Parser;
+
+use inotify::{EventMask, EventStream};
+use inotify::{Inotify, WatchMask};
+use mqtt_channel::{Connection, StreamExt};
+use std::path::{Path, PathBuf};
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
+ DEFAULT_TEDGE_CONFIG_PATH,
+};
+use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use tracing::{error, info};
+
+use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation};
+
+const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml";
+const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`.
+`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`).
+
+The thin-edge `CONFIG_DIR` is used to store:
+ * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#;
+
+#[derive(Debug, clap::Parser, Clone)]
+#[clap(
+name = clap::crate_name!(),
+version = clap::crate_version!(),
+about = clap::crate_description!(),
+after_help = AFTER_HELP_TEXT
+)]
+pub struct LogfileRequestPluginOpt {
+ /// Turn-on the debug log level.
+ ///
+ /// If off only reports ERROR, WARN, and INFO
+ /// If on also reports DEBUG and TRACE
+ #[clap(long)]
+ pub debug: bool,
+
+ /// Create supported operation files
+ #[clap(short, long)]
+ pub init: bool,
+
+ #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
+ pub config_dir: PathBuf,
+}
+
+async fn create_mqtt_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<mqtt_channel::Connection, anyhow::Error> {
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(mqtt_port)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestRequest.as_str(),
+ ));
+
+ let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ Ok(mqtt_client)
+}
+
+pub async fn create_http_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<JwtAuthHttpProxy, anyhow::Error> {
+ let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
+fn create_inofity_file_watch_stream(
+ config_file: &Path,
+) -> Result<EventStream<[u8; 1024]>, anyhow::Error> {
+ let buffer = [0; 1024];
+ let mut inotify = Inotify::init().expect("Error while initializing inotify instance");
+
+ inotify
+ .add_watch(&config_file, WatchMask::CLOSE_WRITE)
+ .expect("Failed to add file watch");
+
+ Ok(inotify.event_stream(buffer)?)
+}
+
+async fn run(
+ config_file: &Path,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+
+ let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+
+ loop {
+ tokio::select! {
+ message = mqtt_client.received.next() => {
+ if let Some(message) = message {
+ if let Ok(payload) = message.payload_str() {
+ let result = match payload.split(',').next().unwrap_or_default() {
+ "522" => {
+ // retrieve smartrest object from payload
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
+ handle_logfile_request_operation(
+ &smartrest_obj,
+ &config_file,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
+ };
+
+ if let Err(err) = result {
+ error!("Handling of operation: '{}' failed with {}", payload, err);
+ }
+ }
+ }
+ else {
+ // message is None and the connection has been closed
+ return Ok(());
+ }
+ }
+ Some(event_or_error) = inotify_stream.next() => {
+ if let Ok(event) = event_or_error {
+ match event.mask {
+ EventMask::CLOSE_WRITE => {
+ let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+ }
+ _ => {}
+ }
+ }
+
+ }
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), anyhow::Error> {
+ let config_plugin_opt = LogfileRequestPluginOpt::parse();
+ let config_file = PathBuf::from(&format!(
+ "{}/{DEFAULT_PLUGIN_CONFIG_FILE}",
+ &config_plugin_opt
+ .config_dir
+ .to_str()
+ .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH)
+ ));
+
+ tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug);
+
+ // Load tedge config from the provided location
+ let tedge_config_location =
+ tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir);
+ let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
+ let tedge_config = config_repository.load()?;
+
+ let logs_dir = tedge_config.query(LogPathSetting)?;
+ let logs_dir = PathBuf::from(logs_dir.to_string());
+
+ if config_plugin_opt.init {
+ let () = init(&config_plugin_opt.config_dir, &logs_dir)?;
+ return Ok(());
+ }
+
+ // Create required clients
+ let mut mqtt_client = create_mqtt_client(&tedge_config).await?;
+ let mut http_client = create_http_client(&tedge_config).await?;
+
+ let () = run(&config_file, &mut mqtt_client, &mut http_client).await?;
+ Ok(())
+}
+
+fn init(config_dir: &PathBuf, logs_dir: &PathBuf) -> Result<(), anyhow::Error> {
+ info!("Creating supported operation files");
+ let config_dir = config_dir.as_path().display().to_string();
+ let logs_dir = logs_dir.as_path().display().to_string();
+ let () = create_init_logs_directories_and_files(config_dir.as_str(), logs_dir.as_str())?;
+ Ok(())
+}
+
+/// for the log plugin to work the following directories and files are needed:
+///
+/// Directories:
+/// - LOGS_DIR/tedge/agent
+/// - config_dir/operations/c8y
+/// - CONFIG_DIR/c8y
+///
+/// Files:
+/// - config_dir/operations/c8y/c8y_LogfileRequest
+/// - CONFIG_DIR/c8y/log/c8y-log-plugin.toml
+fn create_init_logs_directories_and_files(
+ config_dir: &str,
+ logs_dir: &str,
+) -> Result<(), anyhow::Error> {
+ // creating logs_dir
+ create_directory_with_user_group(&format!("{logs_dir}/tedge"), "tedge", "tedge", 0o755)?;
+ create_directory_with_user_group(&format!("{logs_dir}/tedge/agent"), "tedge", "tedge", 0o755)?;
+ // creating /operations/c8y directories
+ create_directory_with_user_group(&format!("{config_dir}/operations"), "tedge", "tedge", 0o755)?;
+ create_directory_with_user_group(
+ &format!("{config_dir}/operations/c8y"),
+ "tedge",
+ "tedge",
+ 0o755,
+ )?;
+ // creating c8y_LogfileRequest operation file
+ create_file_with_user_group(
+ &format!("{config_dir}/operations/c8y/c8y_LogfileRequest"),
+ "tedge",
+ "tedge",
+ 0o755,
+ )?;
+ // creating c8y directory
+ create_directory_with_user_group(&format!("{config_dir}/c8y"), "tedge", "tedge", 0o755)?;
+ // creating c8y-log-plugin.toml
+ // NOTE: file needs 775 permission or inotify can not watch for changes inside the file
+ create_file_with_user_group(
+ &format!("{config_dir}/{DEFAULT_PLUGIN_CONFIG_FILE}"),
+ "tedge",
+ "tedge",
+ 0o775,
+ )?;
+ Ok(())
+}
diff --git a/plugins/log_request_plugin/src/main.rs b/plugins/log_request_plugin/src/main.rs
deleted file mode 100644
index d448124b..00000000
--- a/plugins/log_request_plugin/src/main.rs
+++ /dev/null
@@ -1,66 +0,0 @@
-mod smartrest;
-
-use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
-use c8y_smartrest::{smartrest_deserializer::SmartRestLogRequest, topic::C8yTopic};
-use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting};
-
-use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric;
-use futures::SinkExt;
-
-use smartrest::{
- get_log_file_request_done_message, get_log_file_request_executing, read_tedge_logs,
-};
-
-const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
-
-/// creates an mqtt client
-pub async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error> {
- let tedge_config = get_tedge_config()?;
- let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
- let mqtt_config = mqtt_channel::Config::default()
- .with_port(mqtt_port)
- .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
- C8yTopic::SmartRestResponse.as_str(),
- ));
-
- let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
- Ok(mqtt_client)
-}
-
-/// creates an http client
-pub async fn create_http_client() -> Result<JwtAuthHttpProxy, anyhow::Error> {
- let config = get_tedge_config()?;
- let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?;
- let () = http_proxy.init().await?;
- Ok(http_proxy)
-}
-
-#[tokio::main]
-async fn main() -> Result<(), anyhow::Error> {
- // reading payload from command line arguments
- let payload = std::env::args().nth(1).expect("no payload given");
-
- // creating required clients
- let mut mqtt_client = create_mqtt_client().await?;
- let mut http_client = create_http_client().await?;
-
- // retrieve smartrest object from payload
- let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
-
- // 1. set log file request to executing
- let msg = get_log_file_request_executing().await?;
- let () = mqtt_client.published.send(msg).await?;
- // 2. read logs
- let log_content = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?;
-
- // 3. upload log file
- let upload_event_url = http_client.upload_log_binary(&log_content).await?;
-
- // 4. set log file request to done
- let msg = get_log_file_request_done_message(&upload_event_url).await?;
- let () = mqtt_client.published.send(msg).await?;
-
- mqtt_client.close().await;
-
- Ok(())
-}
diff --git a/plugins/log_request_plugin/src/smartrest.rs b/plugins/log_request_plugin/src/smartrest.rs
deleted file mode 100644
index b9155030..00000000
--- a/plugins/log_request_plugin/src/smartrest.rs
+++ /dev/null
@@ -1,191 +0,0 @@
-use c8y_smartrest::{
- smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
- smartrest_serializer::{
- CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
- SmartRestSetOperationToSuccessful,
- },
- topic::C8yTopic,
-};
-use mqtt_channel::Message;
-
-/// returns a c8y message specifying to set log status to executing.
-///
-/// example message: '501,c8y_LogfileRequest'
-pub async fn get_log_file_request_executing() -> Result<Message, anyhow::Error> {
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let smartrest_set_operation_status =
- SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
- .to_smartrest()?;
- Ok(Message::new(&topic, smartrest_set_operation_status))
-}
-
-/// returns a c8y message specifying to set log status to successful.
-///
-/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...'
-pub async fn get_log_file_request_done_message(
- binary_upload_event_url: &str,
-) -> Result<Message, anyhow::Error> {
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let smartrest_set_operation_status =
- SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
- .with_response_parameter(binary_upload_event_url)
- .to_smartrest()?;
-
- Ok(Message::new(&topic, smartrest_set_operation_status))
-}
-
-/// Reads tedge logs according to `SmartRestLogRequest`.
-///
-/// If needed, logs are concatenated.
-///
-/// Logs are sorted alphanumerically from oldest to newest.
-///
-/// # Examples
-///
-/// ```
-/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
-/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
-/// )
-/// .unwrap();
-///
-/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
-/// ```
-pub fn read_tedge_logs(
- smartrest_obj: &SmartRestLogRequest,
- logs_dir: &str,
-) -> Result<String, anyhow::Error> {
- let mut output = String::new();
-
- // NOTE: As per documentation of std::fs::read_dir:
- // "The order in which this iterator returns entries is platform and filesystem dependent."
- // Therefore, files are sorted by date.
- let mut read_vector: Vec<_> = std::fs::read_dir(logs_dir)?
- .filter_map(|r| r.ok())
- .filter(|dir_entry| {
- get_datetime_from_file_path(&dir_entry.path())
- .map(|dt| !(dt < smartrest_obj.date_from || dt > smartrest_obj.date_to))
- .unwrap_or(false)
- })
- .filter(|dir_entry| {
- let file_name = &dir_entry.file_name();
- let mut file_name = file_name.to_str().unwrap();
-
- // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
- // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077
- if file_name.starts_with("software-list") | file_name.starts_with("software-update") {
- file_name = "software-management";
- }
-
- file_name.starts_with(&smartrest_obj.log_type)
- })
- .collect();
-
- read_vector.sort_by_key(|dir| dir.path());
-
- // loop sorted vector and push store log file to `output`
- let mut line_counter: usize = 0;
- for entry in read_vector {
- let file_path = entry.path();
- let file_content = std::fs::read_to_string(&file_path)?;
- if file_content.is_empty() {
- continue;
- }
-
- // adding file header only if line_counter permits more lines to be added
- match &file_path.file_stem().and_then(|f| f.to_str()) {
- Some(file_name) if line_counter < smartrest_obj.lines => {
- output.push_str(&format!("filename: {}\n", file_name));
- }
- _ => {}
- }
-
- // split at new line delimiter ("\n")
- let mut lines = file_content.lines();
- while line_counter < smartrest_obj.lines {
- if let Some(haystack) = lines.next() {
- if let Some(needle) = &smartrest_obj.needle {
- if haystack.contains(needle) {
- output.push_str(&format!("{}\n", haystack));
- line_counter += 1;
- }
- } else {
- output.push_str(&format!("{}\n", haystack));
- line_counter += 1;
- }
- } else {
- // there are no lines.next()
- break;
- }
- }
- }
- Ok(output)
-}
-
-#[cfg(test)]
-mod tests {
- use super::read_tedge_logs;
- use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
- use std::fs::File;
- use std::io::Write;
-
- fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] {
- let mut files: Vec<&str> = vec![];
- for line in log_content.lines() {
- if line.contains("filename: ") {
- let filename: &str = line.split("filename: ").last().unwrap();
- files.push(filename);
- }
- }
- match files.try_into() {
- Ok(arr) => arr,
- Err(_) => panic!("Could not convert to Array &str, size 5"),
- }
- }
-
- #[test]
- /// testing read_tedge_logs
- ///
- /// this test creates 5 fake log files in a temporary directory.
- /// files are dated 2021-01-0XT01:00Z, where X = a different day.
- ///
- /// this tests will assert that files are read alphanumerically from oldest to newest
- fn test_read_logs() {
- // order in which files are created
- const LOG_FILE_NAMES: [&str; 5] = [
- "software-list-2021-01-03T01:00:00Z.log",
- "software-list-2021-01-02T01:00:00Z.log",
- "software-list-2021-01-01T01:00:00Z.log",
- "software-update-2021-01-03T01:00:00Z.log",
- "software-update-2021-01-02T01:00:00Z.log",
- ];
-
- // expected (sorted) output
- const EXPECTED_OUTPUT: [&str; 5] = [
- "software-list-2021-01-01T01:00:00Z",
- "software-list-2021-01-02T01:00:00Z",
- "software-list-2021-01-03T01:00:00Z",
- "software-update-2021-01-02T01:00:00Z",
- "software-update-2021-01-03T01:00:00Z",
- ];
-
- let smartrest_obj = SmartRestLogRequest::from_smartrest(
- "522,DeviceSerial,software-management,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
- )
- .unwrap();
-
- let temp_dir = tempfile::tempdir().unwrap();
- // creating the files
- for (idx, file) in LOG_FILE_NAMES.iter().enumerate() {
- let file_path = &temp_dir.path().join(file);
- let mut file = File::create(file_path).unwrap();
- writeln!(file, "file num {}", idx).unwrap();
- }
-
- // reading the logs and extracting the file names from the log output.
- let output = read_tedge_logs(&smartrest_obj, temp_dir.path().to_str().unwrap()).unwrap();
- let parsed_values = parse_file_names_from_log_content(&output);
-
- // asserting the order = `EXPECTED_OUTPUT`
- assert!(parsed_values.eq(&EXPECTED_OUTPUT));
- }
-}