summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock78
-rw-r--r--Cargo.toml2
-rw-r--r--configuration/debian/c8y_log_plugin/postinst8
-rw-r--r--configuration/init/systemd/c8y-log-plugin.service12
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs18
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs29
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs44
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs12
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml (renamed from plugins/log_request_plugin/Cargo.toml)27
-rw-r--r--plugins/c8y_log_plugin/src/config.rs101
-rw-r--r--plugins/c8y_log_plugin/src/error.rs8
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs171
-rw-r--r--plugins/c8y_log_plugin/src/main.rs232
-rw-r--r--plugins/log_request_plugin/src/main.rs66
-rw-r--r--plugins/log_request_plugin/src/smartrest.rs191
15 files changed, 627 insertions, 372 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 045cbbd1..7b1a5310 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -416,6 +416,33 @@ dependencies = [
]
[[package]]
+name = "c8y_log_plugin"
+version = "0.6.4"
+dependencies = [
+ "anyhow",
+ "assert_matches",
+ "c8y_api",
+ "c8y_smartrest",
+ "clap 3.1.18",
+ "csv",
+ "glob",
+ "inotify",
+ "mockall 0.11.0",
+ "mqtt_channel",
+ "serde",
+ "serde_json",
+ "serial_test",
+ "tedge_config",
+ "tedge_utils",
+ "tempfile",
+ "test-case",
+ "thiserror",
+ "tokio",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "c8y_smartrest"
version = "0.6.4"
dependencies = [
@@ -1124,6 +1151,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
+name = "glob"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
+
+[[package]]
name = "h2"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1321,6 +1354,28 @@ dependencies = [
]
[[package]]
+name = "inotify"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9"
+dependencies = [
+ "bitflags",
+ "futures-core",
+ "inotify-sys",
+ "libc",
+ "tokio",
+]
+
+[[package]]
+name = "inotify-sys"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2902,29 +2957,6 @@ dependencies = [
]
[[package]]
-name = "tedge_logfile_request_plugin"
-version = "0.6.4"
-dependencies = [
- "anyhow",
- "async-trait",
- "c8y_api",
- "c8y_smartrest",
- "csv",
- "futures",
- "mockall 0.10.2",
- "mqtt_channel",
- "reqwest",
- "serde",
- "serde_json",
- "tedge_config",
- "tempfile",
- "thiserror",
- "tokio",
- "toml",
- "tracing",
-]
-
-[[package]]
name = "tedge_mapper"
version = "0.6.4"
dependencies = [
diff --git a/Cargo.toml b/Cargo.toml
index 9f2297fd..ee342054 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,10 +4,10 @@ members = [
"crates/core/*",
"crates/tests/*",
"plugins/c8y_configuration_plugin",
+ "plugins/c8y_log_plugin",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
"plugins/tedge_apama_plugin",
- "plugins/log_request_plugin",
]
resolver = "2"
diff --git a/configuration/debian/c8y_log_plugin/postinst b/configuration/debian/c8y_log_plugin/postinst
new file mode 100644
index 00000000..7f28420f
--- /dev/null
+++ b/configuration/debian/c8y_log_plugin/postinst
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+set -e
+
+### Create supported operation files
+c8y_log_plugin --init
+
+#DEBHELPER#
diff --git a/configuration/init/systemd/c8y-log-plugin.service b/configuration/init/systemd/c8y-log-plugin.service
new file mode 100644
index 00000000..d1f758e9
--- /dev/null
+++ b/configuration/init/systemd/c8y-log-plugin.service
@@ -0,0 +1,12 @@
+[Unit]
+Description=Thin-edge logfile retriever for Cumulocity
+After=syslog.target network.target mosquitto.service
+
+[Service]
+User=root
+ExecStart=/usr/bin/c8y_log_plugin
+Restart=on-failure
+RestartPreventExitStatus=255
+
+[Install]
+WantedBy=multi-user.target
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 78d8280b..b9f78e32 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -42,6 +42,7 @@ pub trait C8YHttpProxy: Send + Sync {
async fn upload_log_binary(
&mut self,
+ log_type: &str,
log_content: &str,
) -> Result<String, SMCumulocityMapperError>;
@@ -269,20 +270,6 @@ impl JwtAuthHttpProxy {
Ok(internal_id)
}
- fn create_log_event(&self) -> C8yCreateEvent {
- let c8y_managed_object = C8yManagedObject {
- id: self.end_point.c8y_internal_id.clone(),
- };
-
- C8yCreateEvent::new(
- Some(c8y_managed_object),
- "c8y_Logfile".to_string(),
- OffsetDateTime::now_utc(),
- "software-management".to_string(),
- HashMap::new(),
- )
- }
-
fn create_event(
&self,
event_type: String,
@@ -388,11 +375,12 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
async fn upload_log_binary(
&mut self,
+ log_type: &str,
log_content: &str,
) -> Result<String, SMCumulocityMapperError> {
let token = self.get_jwt_token().await?;
- let log_event = self.create_log_event();
+ let log_event = self.create_event(log_type.to_string(), None, None);
let event_response_id = self.send_event_internal(log_event).await?;
let binary_upload_event_url = self
.end_point
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index ff5c177c..0d2fb0c5 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -1,4 +1,4 @@
-use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError};
+use crate::error::SmartRestDeserializerError;
use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest};
use csv::ReaderBuilder;
use download::DownloadInfo;
@@ -306,9 +306,7 @@ impl SmartRestJwtResponse {
/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z");
/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap();
/// ```
-pub fn get_datetime_from_file_path(
- log_path: &Path,
-) -> Result<OffsetDateTime, SMCumulocityMapperError> {
+pub fn get_datetime_from_file_path(log_path: &Path) -> Option<OffsetDateTime> {
if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) {
// a typical file stem looks like this: software-list-2021-10-27T10:29:58Z.
// to extract the date, rsplit string on "-" and take (last) 3
@@ -317,16 +315,17 @@ pub fn get_datetime_from_file_path(
stem_string_vec.reverse();
// join on '-' to get the date string
let date_string = stem_string_vec.join("-");
- let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?;
-
- return Ok(dt);
- }
- match log_path.to_str() {
- Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName(
- path.to_string(),
- )),
- None => Err(SMCumulocityMapperError::InvalidUtf8Path),
+ let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339);
+ match dt {
+ Ok(dt) => {
+ return Some(dt);
+ }
+ Err(_) => {
+ return None;
+ }
+ }
}
+ None
}
#[cfg(test)]
@@ -650,7 +649,7 @@ mod tests {
// this should return an Ok Result.
let path_buf = PathBuf::from_str(file_path).unwrap();
let path_buf_datetime = get_datetime_from_file_path(&path_buf);
- assert!(path_buf_datetime.is_ok());
+ assert!(path_buf_datetime.is_some());
}
#[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")]
@@ -662,6 +661,6 @@ mod tests {
// this should return an err.
let path_buf = PathBuf::from_str(file_path).unwrap();
let path_buf_datetime = get_datetime_from_file_path(&path_buf);
- assert!(path_buf_datetime.is_err());
+ assert!(path_buf_datetime.is_none());
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index e830a7a4..dc407100 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -18,18 +18,17 @@ use c8y_smartrest::{
smartrest_serializer::{
CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer,
SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed,
- SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType,
- SmartRestSetSupportedOperations,
+ SmartRestSetOperationToSuccessful, SmartRestSetSupportedOperations,
},
};
use c8y_translator::json;
use logged_command::LoggedCommand;
use mqtt_channel::{Message, Topic, TopicFilter};
-use plugin_sm::operation_logs::{OperationLogs, OperationLogsError};
+use plugin_sm::operation_logs::OperationLogs;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
- fs::{self, File},
+ fs::File,
io::Read,
path::{Path, PathBuf},
};
@@ -333,9 +332,6 @@ where
&self.device_type,
));
- let supported_log_types_message = self.wrap_error(create_supported_log_types_message(
- &self.operation_logs.log_dir,
- ));
let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let software_list_message = self.wrap_error(create_get_software_list_message());
@@ -343,7 +339,6 @@ where
inventory_fragments_message,
supported_operations_message,
device_data_message,
- supported_log_types_message,
pending_operations_message,
software_list_message,
])
@@ -556,39 +551,6 @@ fn create_get_pending_operations_message() -> Result<Message, ConversionError> {
Ok(Message::new(&topic, payload))
}
-fn get_supported_log_types(log_dir: &Path) -> Result<Vec<String>, ConversionError> {
- let mut result = HashSet::new();
-
- let paths = fs::read_dir(&log_dir).unwrap();
- for path in paths {
- let path = path?;
- if fs::metadata(path.path())?.is_file() {
- let file_name = path.file_name();
- let file_name = file_name
- .to_str()
- .ok_or(OperationLogsError::FileFormatError)?;
-
- // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
- // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077
- if file_name.starts_with("software-list") | file_name.starts_with("software-update") {
- result.insert("software-management".to_string());
- } else {
- let log_type = file_name.split('-').next();
- let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?;
- result.insert(log_type.to_string());
- }
- }
- }
- Ok(Vec::from_iter(result))
-}
-
-fn create_supported_log_types_message(log_dir: &Path) -> Result<Message, ConversionError> {
- let supported_operation_types = get_supported_log_types(log_dir)?;
- let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?;
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- Ok(Message::new(&topic, payload))
-}
-
fn create_supported_operations_fragments_message() -> Result<Message, ConversionError> {
let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?;
let ops = ops.get_operations_list();
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index c15ce8f9..117bdced 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -57,13 +57,8 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
// Start SM Mapper
let sm_mapper = start_c8y_mapper(broker.port).await;
- // Expect both 118 and 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails.
- mqtt_tests::assert_received_all_expected(
- &mut messages,
- TEST_TIMEOUT_MS,
- &["software-management", "500\n"],
- )
- .await;
+ // Expect 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails.
+ mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &["500\n"]).await;
sm_mapper.unwrap().abort();
}
@@ -300,7 +295,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
mqtt_tests::assert_received_all_expected(
&mut responses,
TEST_TIMEOUT_MS,
- &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"],
+ &["500\n", "503,c8y_SoftwareUpdate,\n"],
)
.await;
@@ -896,6 +891,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
async fn upload_log_binary(
&mut self,
+ _log_type: &str,
_log_content: &str,
) -> Result<String, SMCumulocityMapperError> {
Ok("fake/upload/url".into())
diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 4cb07317..d8158abe 100644
--- a/plugins/log_request_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -1,38 +1,41 @@
[package]
-name = "tedge_logfile_request_plugin"
+name = "c8y_log_plugin"
version = "0.6.4"
authors = ["thin-edge.io team <info@thin-edge.io>"]
edition = "2021"
rust-version = "1.58.1"
license = "Apache-2.0"
-description = "Thin.edge.io operation plugin for Cumulocity log request"
+description = "Thin-edge device log file retriever for Cumulocity"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
[package.metadata.deb]
-pre-depends = "tedge_mapper"
+maintainer-scripts = "../../configuration/debian/c8y_log_plugin"
assets = [
- ["../../configuration/contrib/operations/c8y/c8y_LogfileRequest", "/etc/tedge/operations/c8y/", "644"],
- ["target/release/tedge_logfile_request_plugin", "/usr/bin/tedge_logfile_request_plugin", "755"],
+ ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"],
+ ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"],
]
[dependencies]
anyhow = "1.0"
-async-trait = "0.1"
c8y_api = { path = "../../crates/core/c8y_api" }
c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
+clap = { version = "3.0", features = ["cargo", "derive"] }
csv = "1.1"
-futures = "0.3"
-mockall = "0.10"
-reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
+glob = "0.3"
+inotify = "0.10"
+mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
+tedge_config = { path = "../../crates/common/tedge_config" }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
-tedge_config = { path = "../../crates/common/tedge_config" }
-mqtt_channel = { path = "../../crates/common/mqtt_channel" }
[dev-dependencies]
+assert_matches = "1.5"
+mockall = "0.11"
tempfile = "3.3"
+test-case = "2.0"
+serial_test = "0.6"
diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs
new file mode 100644
index 00000000..580fba95
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/config.rs
@@ -0,0 +1,101 @@
+use c8y_smartrest::topic::C8yTopic;
+use mqtt_channel::Message;
+use serde::Deserialize;
+use std::{borrow::Borrow, path::Path};
+use std::{collections::HashSet, fs};
+use tracing::warn;
+
+#[derive(Deserialize, Debug, Eq, PartialEq, Default)]
+#[serde(deny_unknown_fields)]
+pub struct LogPluginConfig {
+ pub files: Vec<FileEntry>,
+}
+
+#[derive(Deserialize, Debug, Eq, Default, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct FileEntry {
+ pub(crate) path: String,
+ #[serde(rename = "type")]
+ pub config_type: String,
+}
+
+impl PartialEq for FileEntry {
+ fn eq(&self, other: &Self) -> bool {
+ self.config_type == other.config_type
+ }
+}
+
+impl Borrow<String> for FileEntry {
+ fn borrow(&self) -> &String {
+ &self.config_type
+ }
+}
+
+impl LogPluginConfig {
+ pub fn new(config_file_path: &Path) -> Self {
+ let config = Self::read_config(config_file_path);
+ config
+ }
+
+ pub fn read_config(path: &Path) -> Self {
+ let path_str = path.display().to_string();
+ match fs::read_to_string(path) {
+ Ok(contents) => match toml::from_str(contents.as_str()) {
+ Ok(config) => config,
+ _ => {
+ warn!("The config file {} is malformed.", path_str);
+ Self::default()
+ }
+ },
+ Err(_) => {
+ warn!(
+ "The config file {} does not exist or is not readable.",
+ path_str
+ );
+ Self::default()
+ }
+ }
+ }
+
+ pub fn to_supported_config_types_message(&self) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ Ok(Message::new(&topic, self.to_smartrest_payload()))
+ }
+
+ pub fn get_all_file_types(&self) -> Vec<String> {
+ self.files
+ .iter()
+ .map(|x| x.config_type.to_string())
+ .collect::<HashSet<_>>()
+ .iter()
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ }
+
+ // 118,typeA,typeB,...
+ fn to_smartrest_payload(&self) -> String {
+ let mut config_types = self.get_all_file_types();
+ let () = config_types.sort();
+ let supported_config_types = config_types.join(",");
+ format!("118,{supported_config_types}")
+ }
+}
+
+#[test]
+fn test_no_duplicated_file_types() {
+ let files = vec![
+ FileEntry {
+ path: "a/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ FileEntry {
+ path: "some/path".to_string(),
+ config_type: "type_one".to_string(),
+ },
+ ];
+ let logs_config = LogPluginConfig { files: files };
+ assert_eq!(
+ logs_config.get_all_file_types(),
+ vec!["type_one".to_string()]
+ );
+}
diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs
new file mode 100644
index 00000000..045d4bf3
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/error.rs
@@ -0,0 +1,8 @@
+#[derive(thiserror::Error, Debug)]
+pub enum LogRetrievalError {
+ #[error(transparent)]
+ FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError),
+
+ #[error(transparent)]
+ FromConfigSetting(#[from] tedge_config::ConfigSettingError),
+}
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
new file mode 100644
index 00000000..35867948
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -0,0 +1,171 @@
+use std::path::Path;
+
+use glob::glob;
+
+use crate::config::LogPluginConfig;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::{
+ smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ TryIntoOperationStatusMessage,
+ },
+};
+use mqtt_channel::{Connection, SinkExt};
+
+pub struct LogfileRequest {}
+
+impl TryIntoOperationStatusMessage for LogfileRequest {
+ /// returns a c8y message specifying to set log status to executing.
+ ///
+ /// example message: '501,c8y_LogfileRequest'
+ fn status_executing() -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()
+ }
+
+ fn status_successful(
+ parameter: Option<String>,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(&parameter.unwrap())
+ .to_smartrest()
+ }
+
+ fn status_failed(
+ failure_reason: String,
+ ) -> Result<
+ c8y_smartrest::smartrest_serializer::SmartRest,
+ c8y_smartrest::error::SmartRestSerializerError,
+ > {
+ SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yLogFileRequest,
+ failure_reason,
+ )
+ .to_smartrest()
+ }
+}
+/// Reads tedge logs according to `SmartRestLogRequest`.
+///
+/// If needed, logs are concatenated.
+///
+/// Logs are sorted alphanumerically from oldest to newest.
+///
+/// # Examples
+///
+/// ```
+/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
+/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+/// )
+/// .unwrap();
+///
+/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
+/// ```
+pub fn read_tedge_logs(
+ smartrest_obj: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+) -> Result<String, anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(&plugin_config_path);
+ let mut output = String::new();
+
+ let mut files_to_send = Vec::new();
+ for files in &plugin_config.files {
+ let maybe_file_path = files.path.as_str(); // because it can be a glob pattern
+ let file_type = files.config_type.as_str();
+
+ if !file_type.eq(&smartrest_obj.log_type) {
+ continue;
+ }
+
+ // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see:
+ // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24
+ for entry in glob(maybe_file_path)? {
+ let file_path = entry?;
+ if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) {
+ if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to)
+ {
+ files_to_send.push(file_path);
+ }
+ } else {
+ files_to_send.push(file_path);
+ }
+ }
+ }
+
+ // loop sorted vector and push store log file to `output`
+ let mut line_counter: usize = 0;
+ for entry in files_to_send {
+ dbg!("files to read:", &entry);
+ let file_content = std::fs::read_to_string(&entry)?;
+ if file_content.is_empty() {
+ continue;
+ }
+
+ // adding file header only if line_counter permits more lines to be added
+ match &entry.file_stem().and_then(|f| f.to_str()) {
+ Some(file_name) if line_counter < smartrest_obj.lines => {
+ output.push_str(&format!("filename: {}\n", file_name));
+ }
+ _ => {}
+ }
+
+ // split at new line delimiter ("\n")
+ let mut lines = file_content.lines().rev();
+ while line_counter < smartrest_obj.lines {
+ if let Some(haystack) = lines.next() {
+ if let Some(needle) = &smartrest_obj.needle {
+ if haystack.contains(needle) {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no lines.next()
+ break;
+ }
+ }
+ }
+ Ok(output)
+}
+
+pub async fn handle_logfile_request_operation(
+ smartrest_request: &SmartRestLogRequest,
+ plugin_config_path: &Path,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ // executing
+ let executing = LogfileRequest::executing()?;
+ let () = mqtt_client.published.send(executing).await?;
+
+ let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?;
+
+ let upload_event_url = http_client
+ .upload_log_binary(&smartrest_request.log_type, &log_content)
+ .await?;
+
+ let successful = LogfileRequest::successful(Some(upload_event_url))?;
+ let () = mqtt_client.published.send(successful).await?;
+
+ Ok(())
+}
+
+pub async fn handle_dynamic_log_type_update(
+ mqtt_client: &mut Connection,
+ config_dir: &Path,
+) -> Result<(), anyhow::Error> {
+ let plugin_config = LogPluginConfig::new(config_dir);
+ let msg = plugin_config.to_supported_config_types_message()?;
+ let () = mqtt_client.published.send(msg).await?;
+ Ok(())
+}
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
new file mode 100644
index 00000000..a799a9b2
--- /dev/null
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -0,0 +1,232 @@
+mod config;
+mod error;
+mod logfile_request;
+
+use anyhow::Result;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
+use c8y_smartrest::topic::C8yTopic;
+use clap::Parser;
+
+use inotify::{EventMask, EventStream};
+use inotify::{Inotify, WatchMask};
+use mqtt_channel::{Connection, StreamExt};
+use std::path::{Path, PathBuf};
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
+ DEFAULT_TEDGE_CONFIG_PATH,
+};
+use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use tracing::{error, info};
+
+use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation};
+
+const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml";
+const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`.
+`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`).
+
+The thin-edge `CONFIG_DIR` is used to store:
+ * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#;
+
+#[derive(Debug, clap::Parser, Clone)]
+#[clap(
+name = clap::crate_name!(),
+version = clap::crate_version!(),
+about = clap::crate_description!(),
+after_help = AFTER_HELP_TEXT
+)]
+pub struct LogfileRequestPluginOpt {
+ /// Turn-on the debug log level.
+ ///
+ /// If off only reports ERROR, WARN, and INFO
+ /// If on also reports DEBUG and TRACE
+ #[clap(long)]
+ pub debug: bool,
+
+ /// Create supported operation files
+ #[clap(short, long)]
+ pub init: bool,
+
+ #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
+ pub config_dir: PathBuf,
+}
+
+async fn create_mqtt_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<mqtt_channel::Connection, anyhow::Error> {
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(mqtt_port)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestRequest.as_str(),
+ ));
+
+ let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ Ok(mqtt_client)
+}
+
+pub async fn create_http_client(
+ tedge_config: &TEdgeConfig,
+) -> Result<JwtAuthHttpProxy, anyhow::Error> {
+ let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
+fn create_inofity_file_watch_stream(
+ config_file: &Path,
+) -> Result<EventStream<[u8; 1024]>, anyhow::Error> {
+ let buffer = [0; 1024];
+ let mut inotify =