summaryrefslogtreecommitdiffstats
path: root/plugins/c8y_log_plugin
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/c8y_log_plugin')
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml41
-rw-r--r--plugins/c8y_log_plugin/src/config.rs101
-rw-r--r--plugins/c8y_log_plugin/src/error.rs8
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs171
-rw-r--r--plugins/c8y_log_plugin/src/main.rs232
5 files changed, 553 insertions, 0 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
new file mode 100644
index 00000000..d8158abe
--- /dev/null
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -0,0 +1,41 @@
+[package]
+name = "c8y_log_plugin"
+version = "0.6.4"
+authors = ["thin-edge.io team <info@thin-edge.io>"]
+edition = "2021"
+rust-version = "1.58.1"
+license = "Apache-2.0"
+description = "Thin-edge device log file retriever for Cumulocity"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+[package.metadata.deb]
+maintainer-scripts = "../../configuration/debian/c8y_log_plugin"
+assets = [
+ ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"],
+ ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"],
+]
+
+[dependencies]
+anyhow = "1.0"
+c8y_api = { path = "../../crates/core/c8y_api" }
+c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
+clap = { version = "3.0", features = ["cargo", "derive"] }
+csv = "1.1"
+glob = "0.3"
+inotify = "0.10"
+mqtt_channel = { path = "../../crates/common/mqtt_channel" }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+tedge_config = { path = "../../crates/common/tedge_config" }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+thiserror = "1.0"
+tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
+toml = "0.5"
+tracing = { version = "0.1", features = ["attributes", "log"] }
+
+[dev-dependencies]
+assert_matches = "1.5"
+mockall = "0.11"
+tempfile = "3.3"
+test-case = "2.0"
+serial_test = "0.6"
diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs
new file mode 100644
index 00000000..580fba95
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/config.rs
@@ -0,0 +1,101 @@
+use c8y_smartrest::topic::C8yTopic;
+use mqtt_channel::Message;
+use serde::Deserialize;
+use std::{borrow::Borrow, path::Path};
+use std::{collections::HashSet, fs};
+use tracing::warn;
+
+#[derive(Deserialize, Debug, Eq, PartialEq, Default)]
+#[serde(deny_unknown_fields)]
+pub struct LogPluginConfig {
+ pub files: Vec<FileEntry>,
+}
+
+#[derive(Deserialize, Debug, Eq, Default, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct FileEntry {
+ pub(crate) path: String,
+ #[serde(rename = "type")]
+ pub config_type: String,
+}
+
+impl PartialEq for FileEntry {
+ fn eq(&self, other: &Self) -> bool {
+ self.config_type == other.config_type
+ }
+}
+
+impl Borrow<String> for FileEntry {
+ fn borrow(&self) -> &String {
+ &self.config_type
+ }
+}
+
+impl LogPluginConfig {
+ pub fn new(config_file_path: &Path) -> Self {
+ let config = Self::read_config(config_file_path);
+ config
+ }
+
+ pub fn read_config(path: &Path) -> Self {
+ let path_str = path.display().to_string();
+ match fs::read_to_string(path) {
+ Ok(contents) => match toml::from_str(contents.as_str()) {
+ Ok(config) => config,
+ _ => {
+ warn!("The config file {} is malformed.", path_str);
+ Self::default()
+ }
+ },
+ Err(_) => {
+ warn!(
+ "The config file {} does not exist or is not readable.",
+ path_str
+ );
+ Self::default()
+ }
+ }
+ }
+
+ pub fn to_supported_config_types_message(&self) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ Ok(Message::new(&topic, self.to_smartrest_payload()))
+ }
+
+ pub fn get_all_file_types(&self) -> Vec<String> {
+ self.files
+ .iter()
+ .map(|x| x.config_type.to_string())
+ .collect::<HashSet<_>>()
+ .iter()
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ }
+
+ // 118,typeA,typeB,...
+ fn to_smartrest_payload(&self) -> String {
+ let mut config_types = self.get_all_file_types();
+ let () = config_types.sort();
+ let supported_config_types = config_types.join(",");
+ format!("118,{supported_config_types}")
+ }
+}
+
+#[test]
+fn test_no_duplicated_file_types() {
+ let files = vec![
+ FileEntry {
+ path: "a/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ FileEntry {
+ path: "some/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ ];
+ let logs_config = LogPluginConfig { files: files };
+ assert_eq!(
+ logs_config.get_all_file_types(),
+ vec!["type_one".to_string()]
+ );
+}
diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs
new file mode 100644
index 00000000..045d4bf3
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/error.rs
@@ -0,0 +1,8 @@
+#[derive(thiserror::Error, Debug)]
+pub enum LogRetrievalError {
+ #[error(transparent)]
+ FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError),
+
+ #[error(transparent)]
+ FromConfigSetting(#[from] tedge_config::ConfigSettingError),
+}
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
new file mode 100644
index 00000000..35867948
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -0,0 +1,171 @@
+use std::path::Path;
+
+use glob::glob;
+
+use crate::config::LogPluginConfig;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::{
+ smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ TryIntoOperationStatusMessage,
+ },
+};
+use mqtt_channel::{Connection, SinkExt};
+
+pub struct LogfileRequest {}
+
+impl TryIntoOperationStatusMessage for LogfileRequest {
+ /// returns a c8y message specifying to set log status to executing.
+ ///
+ /// example message: '501,c8y_LogfileRequest'
+ fn status_executing() -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()
+ }
+
+ fn status_successful(
+ parameter: Option<String>,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(&parameter.unwrap())
+ .to_smartrest()
+ }
+
+ fn status_failed(
+ failure_reason: String,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yLogFileRequest,
+ failure_reason,
+ )
+ .to_smartrest()
+ }
+}
+/// Reads tedge logs according to `SmartRestLogRequest`.
+///
+/// If needed, logs are concatenated.
+///
+/// Logs are sorted alphanumerically from oldest to newest.
+///
+/// # Examples
+///
+/// ```
+/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
+/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+/// )
+/// .unwrap();
+///
+/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
+/// ```
+pub fn read_tedge_logs(
+ smartrest_obj: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+) -> Result<String, anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(&plugin_config_path);
+ let mut output = String::new();
+
+ let mut files_to_send = Vec::new();
+ for files in &plugin_config.files {
+ let maybe_file_path = files.path.as_str(); // because it can be a glob pattern
+ let file_type = files.config_type.as_str();
+
+ if !file_type.eq(&smartrest_obj.log_type) {
+ continue;
+ }
+
+ // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see:
+ // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24
+ for entry in glob(maybe_file_path)? {
+ let file_path = entry?;
+ if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) {
+ if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to)
+ {
+ files_to_send.push(file_path);
+ }
+ } else {
+ files_to_send.push(file_path);
+ }
+ }
+ }
+
+ // loop sorted vector and push store log file to `output`
+ let mut line_counter: usize = 0;
+ for entry in files_to_send {
+ dbg!("files to read:", &entry);
+ let file_content = std::fs::read_to_string(&entry)?;
+ if file_content.is_empty() {
+ continue;
+ }
+
+ // adding file header only if line_counter permits more lines to be added
+ match &entry.file_stem().and_then(|f| f.to_str()) {
+ Some(file_name) if line_counter < smartrest_obj.lines => {
+ output.push_str(&format!("filename: {}\n", file_name));
+ }
+ _ => {}
+ }
+
+ // split at new line delimiter ("\n")
+ let mut lines = file_content.lines().rev();
+ while line_counter < smartrest_obj.lines {
+ if let Some(haystack) = lines.next() {
+ if let Some(needle) = &smartrest_obj.needle {
+ if haystack.contains(needle) {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no lines.next()
+ break;
+ }
+ }
+ }
+ Ok(output)
+}
+
+pub async fn handle_logfile_request_operation(
+ smartrest_request: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ // executing
+ let executing = LogfileRequest::executing()?;
+ let () = mqtt_client.published.send(executing).await?;
+
+ let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?;
+
+ let upload_event_url = http_client
+ .upload_log_binary(&smartrest_request.log_type, &log_content)
+ .await?;
+
+ let successful = LogfileRequest::successful(Some(upload_event_url))?;
+ let () = mqtt_client.published.send(successful).await?;
+
+ Ok(())
+}
+
+pub async fn handle_dynamic_log_type_update(
+ mqtt_client: &mut Connection,
+ config_dir: &Path,
+) -> Result<(), anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(config_dir);
+ let msg = plugin_config.to_supported_config_types_message()?;
+ let () = mqtt_client.published.send(msg).await?;
+ Ok(())
+}
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
new file mode 100644
index 00000000..a799a9b2
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -0,0 +1,232 @@
+mod config;
+mod error;
+mod logfile_request;
+
+use anyhow::Result;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
+use c8y_smartrest::topic::C8yTopic;
+use clap::Parser;
+
+use inotify::{EventMask, EventStream};
+use inotify::{Inotify, WatchMask};
+use mqtt_channel::{Connection, StreamExt};
+use std::path::{Path, PathBuf};
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
+ DEFAULT_TEDGE_CONFIG_PATH,
+};
+use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use tracing::{error, info};
+
+use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation};
+
+const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml";
+const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`.
+`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`).
+
+The thin-edge `CONFIG_DIR` is used to store:
+ * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#;
+
+#[derive(Debug, clap::Parser, Clone)]
+#[clap(
+name = clap::crate_name!(),
+version = clap::crate_version!(),
+about = clap::crate_description!(),
+after_help = AFTER_HELP_TEXT
+)]
+pub struct LogfileRequestPluginOpt {
+ /// Turn-on the debug log level.
+ ///
+ /// If off only reports ERROR, WARN, and INFO
+ /// If on also reports DEBUG and TRACE
+ #[clap(long)]
+ pub debug: bool,
+
+ /// Create supported operation files
+ #[clap(short, long)]
+ pub init: bool,
+
+ #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
+ pub config_dir: PathBuf,
+}
+
+async fn create_mqtt_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<mqtt_channel::Connection, anyhow::Error> {
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(mqtt_port)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestRequest.as_str(),
+ ));
+
+ let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ Ok(mqtt_client)
+}
+
+pub async fn create_http_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<JwtAuthHttpProxy, anyhow::Error> {
+ let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
+fn create_inofity_file_watch_stream(
+ config_file: &Path,
+) -> Result<EventStream<[u8; 1024]>, anyhow::Error> {
+ let buffer = [0; 1024];
+ let mut inotify = Inotify::init().expect("Error while initializing inotify instance");
+
+ inotify
+ .add_watch(&config_file, WatchMask::CLOSE_WRITE)
+ .expect("Failed to add file watch");
+
+ Ok(inotify.event_stream(buffer)?)
+}
+
+async fn run(
+ config_file: &Path,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+
+ let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+
+ loop {
+ tokio::select! {
+ message = mqtt_client.received.next() => {
+ if let Some(message) = message {
+ if let Ok(payload) = message.payload_str() {
+ let result = match payload.split(',').next().unwrap_or_default() {
+ "522" => {
+ // retrieve smartrest object from payload
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
+ handle_logfile_request_operation(
+ &smartrest_obj,
+ &config_file,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
+ };
+
+ if let Err(err) = result {
+ error!("Handling of operation: '{}' failed with {}", payload, err);
+ }
+ }
+ }
+ else {
+ // message is None and the connection has been closed
+ return Ok(());
+ }
+ }
+ Some(event_or_error) = inotify_stream.next() => {
+ if let Ok(event) = event_or_error {
+ match event.mask {
+ EventMask::CLOSE_WRITE => {
+ let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+ }
+ _ => {}
+ }
+ }
+
+ }
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), anyhow::Error> {
+ let config_plugin_opt = LogfileRequestPluginOpt::parse();
+ let config_file = PathBuf::from(&format!(
+ "{}/{DEFAULT_PLUGIN_CONFIG_FILE}",
+ &config_plugin_opt
+ .config_dir
+ .to_str()
+ .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH)
+ ));
+
+ tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug);
+
+ // Load tedge config from the provided location
+ let tedge_config_location =
+ tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir);
+ let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
+ let tedge_config = config_repository.load()?;
+
+ let logs_dir = tedge_config.query(LogPathSetting)?;
+ let logs_dir = PathBuf::from(logs_dir.to_string());
+
+ if config_plugin_opt.init {
+ let () = init(&config_plugin_opt.config_dir, &logs_dir)?;
+ return Ok(());
+ }
+
+ // Create required clients
+ let mut mqtt_client = create_mqtt_client(&tedge_config).await?;
+ let mut http_client = create_http_client(&tedge_config).await?;
+
+ let () = run(&config_file, &mut mqtt_client, &mut http_client).await?;
+ Ok(())
+}
+
+fn init(config_dir: &PathBuf, logs_dir: &PathBuf) -> Result<(), anyhow::Error> {
+ info!("Creating supported operation files");
+ let config_dir = config_dir.as_path().display().to_string();
+ let logs_dir = logs_dir.as_path().display().to_string();
+ let () = create_init_logs_directories_and_files(config_dir.as_str(), logs_dir.as_str())?;
+ Ok(())
+}
+
+/// for the log plugin to work the following directories and files are needed:
+///
+/// Directories:
+/// - LOGS_DIR/tedge/agent
+/// - config_dir/operations/c8y
+/// - CONFIG_DIR/c8y
+///
+/// Files:
+/// - config_dir/operations/c8y/c8y_LogfileRequest
+/// - CONFIG_DIR/c8y/log/c8y-log-plugin.toml
+fn create_init_logs_directories_and_files(
+ config_dir: &str,
+ logs_dir: &str,
+) -> Result<(), anyhow::Error> {
+ // creating logs_dir
+ create_directory_with_user_group(&format!("{logs_dir}/tedge"), "tedge", "tedge", 0o755)?;
+ create_directory_with_user_group(&format!("{logs_dir}/tedge/agent"), "tedge", "tedge", 0o755)?;
+ // creating /operations/c8y directories
+ create_directory_with_user_group(&format!("{config_dir}/operations"), "tedge", "tedge", 0o755)?;
+ create_directory_with_user_group(
+ &format!("{config_dir}/operations/c8y"),
+ "tedge",
+ "tedge",
+ 0o755,
+ )?;
+ // creating c8y_LogfileRequest operation file
+ create_file_with_user_group(
+ &format!("{config_dir}/operations/c8y/c8y_LogfileRequest"),
+ "tedge",
+ "tedge",
+ 0o755,
+ )?;
+ // creating c8y directory
+ create_directory_with_user_group(&format!("{config_dir}/c8y"), "tedge", "tedge", 0o755)?;
+ // creating c8y-log-plugin.toml
+ // NOTE: file needs 775 permission or inotify can not watch for changes inside the file
+ create_file_with_user_group(
+ &format!("{config_dir}/{DEFAULT_PLUGIN_CONFIG_FILE}"),
+ "tedge",
+ "tedge",
+ 0o775,
+ )?;
+ Ok(())
+}