diff options
-rw-r--r-- | Cargo.lock | 30 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/c8y_api/Cargo.toml | 5 | ||||
-rw-r--r-- | crates/core/c8y_api/src/error.rs | 1 | ||||
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs | 36 | ||||
-rw-r--r-- | crates/core/c8y_api/src/lib.rs | 10 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/topic.rs | 34 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 3 | ||||
-rw-r--r-- | plugins/log_request_plugin/Cargo.toml | 4 | ||||
-rw-r--r-- | plugins/log_request_plugin/src/main.rs | 40 | ||||
-rw-r--r-- | plugins/log_request_plugin/src/smartrest.rs | 106 |
11 files changed, 172 insertions, 98 deletions
@@ -335,23 +335,18 @@ version = "0.1.0" dependencies = [ "agent_interface", "async-trait", - "batcher", "c8y_smartrest", - "c8y_translator", "chrono", "clock", "csv", "download", - "flockfile", "futures", "mockall", "mqtt_channel", "reqwest", "serde", "serde_json", - "structopt", "tedge_config", - "tedge_users", "tedge_utils", "tempfile", "test-case", @@ -464,6 +459,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "serde", "time 0.1.43", "winapi", ] @@ -1315,6 +1311,30 @@ dependencies = [ ] [[package]] +name = "log_request_plugin" +version = "0.5.2" +dependencies = [ + "anyhow", + "async-trait", + "c8y_api", + "c8y_smartrest", + "chrono", + "csv", + "futures", + "mockall", + "mqtt_channel", + "reqwest", + "serde", + "serde_json", + "tedge_config", + "tempfile", + "thiserror", + "tokio", + "toml", + "tracing", +] + +[[package]] name = "matchers" version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -7,6 +7,7 @@ members = [ "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", "plugins/tedge_apama_plugin", + "plugins/log_request_plugin", ] [profile.release] diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml index 1e3cb0f5..ea116f84 100644 --- a/crates/core/c8y_api/Cargo.toml +++ b/crates/core/c8y_api/Cargo.toml @@ -8,23 +8,18 @@ edition = "2021" [dependencies] agent_interface = { path = "../agent_interface"} async-trait = "0.1" -batcher = { path = "../../common/batcher" } c8y_smartrest = { path = "../c8y_smartrest" } -c8y_translator = { path = "../c8y_translator" } chrono = "0.4" clock = { path = "../../common/clock" } csv = "1.1" download = { path = "../../common/download" } -flockfile = { path = "../../common/flockfile" } futures = "0.3" mockall = "0.10" mqtt_channel = { path = "../../common/mqtt_channel" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -structopt = "0.3" tedge_config = { path = "../../common/tedge_config" } -tedge_users = { path = "../../common/tedge_users" } tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } thin_edge_json = { path = "../thin_edge_json" } thiserror = "1.0" diff --git a/crates/core/c8y_api/src/error.rs b/crates/core/c8y_api/src/error.rs deleted file mode 100644 index 8b137891..00000000 --- a/crates/core/c8y_api/src/error.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 2a56946a..0cd25f20 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -2,15 +2,14 @@ use crate::json_c8y::{ C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, }; use async_trait::async_trait; -use c8y_smartrest::{ - error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse, topic::C8yTopic, -}; +use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse}; +use chrono::{DateTime, Local}; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; use std::time::Duration; use tedge_config::{ - get_tedge_config, C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, - DeviceIdSetting, MqttPortSetting, TEdgeConfig, + C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, + MqttPortSetting, TEdgeConfig, }; use time::{format_description, OffsetDateTime}; @@ -19,33 +18,6 @@ use tracing::{error, info, instrument}; const RETRY_TIMEOUT_SECS: u64 = 60; -/// creates an mqtt client with a given `session_name` -pub async fn create_mqtt_client( - session_name: &str, -) -> Result<mqtt_channel::Connection, SMCumulocityMapperError> { - let tedge_config = get_tedge_config()?; - let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - let mqtt_config = mqtt_channel::Config::default() - .with_port(mqtt_port) - .with_session_name(session_name) - .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( - C8yTopic::SmartRestResponse.as_str(), - )); - - let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; - Ok(mqtt_client) -} - -/// creates an http client with a given `session_name` -pub async fn create_http_client( - session_name: &str, -) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> { - let config = get_tedge_config()?; - let mut http_proxy = JwtAuthHttpProxy::try_new(&config, &session_name).await?; - let () = http_proxy.init().await?; - Ok(http_proxy) -} - /// An HttpProxy handles http requests to C8y on behalf of the device. #[async_trait] pub trait C8YHttpProxy { diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs index 2a0286e0..7eb6352c 100644 --- a/crates/core/c8y_api/src/lib.rs +++ b/crates/core/c8y_api/src/lib.rs @@ -1,12 +1,2 @@ -pub mod error; pub mod http_proxy; pub mod json_c8y; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); - } -} diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs index 491c7a4b..2c6c96b0 100644 --- a/crates/core/c8y_smartrest/src/topic.rs +++ b/crates/core/c8y_smartrest/src/topic.rs @@ -1,13 +1,7 @@ use agent_interface::topic::ResponseTopic; use agent_interface::TopicError; +use mqtt_channel::MqttError; use mqtt_channel::Topic; -use mqtt_channel::{Message, MqttError}; - -use crate::error::SMCumulocityMapperError; -use crate::smartrest_serializer::{ - CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, - SmartRestSetOperationToSuccessful, -}; #[derive(Debug, Clone, PartialEq)] pub enum C8yTopic { @@ -100,32 +94,6 @@ impl TryFrom<Topic> for MapperSubscribeTopic { } } -/// returns a c8y message specifying to set log status to executing. -/// -/// example message: '501,c8y_LogfileRequest' -pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = - SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) - .to_smartrest()?; - Ok(Message::new(&topic, smartrest_set_operation_status)) -} - -/// returns a c8y message specifying to set log status to successful. -/// -/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' -pub async fn get_log_file_request_done_message( - binary_upload_event_url: &str, -) -> Result<Message, SMCumulocityMapperError> { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = - SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) - .with_response_parameter(binary_upload_event_url) - .to_smartrest()?; - - Ok(Message::new(&topic, smartrest_set_operation_status)) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index 703e3cad..0ade06a2 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -79,8 +79,7 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")] async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let session_name = "SM-Tedge-Mapper"; - let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config, &session_name).await?; + let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config, SM_MAPPER).await?; let mut sm_mapper = CumulocitySoftwareManagement::try_new(&tedge_config, http_proxy, operations).await?; diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/log_request_plugin/Cargo.toml index ee26d4e0..456d3dce 100644 --- a/plugins/log_request_plugin/Cargo.toml +++ b/plugins/log_request_plugin/Cargo.toml @@ -5,7 +5,7 @@ authors = ["thin-edge.io team <info@thin-edge.io>"] edition = "2021" rust-version = "1.58" license = "Apache-2.0" -# TODO: add description? +description = "Thin.edge.io operation plugin for Cumulocity log request" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -21,12 +21,10 @@ mockall = "0.10" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -structopt = "0.3" thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } -tedge_mapper = { path = "../../crates/core/tedge_mapper" } tedge_config = { path = "../../crates/common/tedge_config" } mqtt_channel = { path = "../../crates/common/mqtt_channel" } diff --git a/plugins/log_request_plugin/src/main.rs b/plugins/log_request_plugin/src/main.rs index f5109589..274090ce 100644 --- a/plugins/log_request_plugin/src/main.rs +++ b/plugins/log_request_plugin/src/main.rs @@ -1,12 +1,14 @@ mod smartrest; -use c8y_api::http_proxy::{create_http_client, create_mqtt_client, C8YHttpProxy}; -use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest; -use c8y_smartrest::topic::{get_log_file_request_done_message, get_log_file_request_executing}; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::{smartrest_deserializer::SmartRestLogRequest, topic::C8yTopic}; +use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting}; use futures::SinkExt; -use smartrest::read_tedge_logs; +use smartrest::{ + get_log_file_request_done_message, get_log_file_request_executing, read_tedge_logs, +}; use std::time::Duration; use tokio::time::sleep; @@ -15,14 +17,39 @@ const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; const MQTT_SESSION_NAME: &str = "log plugin mqtt session"; const HTTP_SESSION_NAME: &str = "log plugin http session"; +/// creates an mqtt client with a given `session_name` +pub async fn create_mqtt_client( + session_name: &str, +) -> Result<mqtt_channel::Connection, anyhow::Error> { + let tedge_config = get_tedge_config()?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_session_name(session_name) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestResponse.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +/// creates an http client with a given `session_name` +pub async fn create_http_client(session_name: &str) -> Result<JwtAuthHttpProxy, anyhow::Error> { + let config = get_tedge_config()?; + let mut http_proxy = JwtAuthHttpProxy::try_new(&config, session_name).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { // reading payload from command line arguments let payload = std::env::args().nth(1).expect("no payload given"); // creating required clients - let mut mqtt_client = create_mqtt_client(&MQTT_SESSION_NAME).await?; - let mut http_client = create_http_client(&HTTP_SESSION_NAME).await?; + let mut mqtt_client = create_mqtt_client(MQTT_SESSION_NAME).await?; + let mut http_client = create_http_client(HTTP_SESSION_NAME).await?; // retrieve smartrest object from payload let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; @@ -51,6 +78,7 @@ async fn main() -> Result<(), anyhow::Error> { /// NOTE: this is a quick-fix to enable step 4. in main to be executed. /// if `do_one_second_pause()` is not called, step 4. in main /// does not get triggered. +// see #846 async fn do_one_second_pause() { sleep(Duration::from_secs(1)).await; } diff --git a/plugins/log_request_plugin/src/smartrest.rs b/plugins/log_request_plugin/src/smartrest.rs index b9debc19..187daa44 100644 --- a/plugins/log_request_plugin/src/smartrest.rs +++ b/plugins/log_request_plugin/src/smartrest.rs @@ -1,4 +1,39 @@ -use c8y_smartrest::smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}; +use c8y_smartrest::{ + smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToSuccessful, + }, + topic::C8yTopic, +}; +use mqtt_channel::Message; + +/// returns a c8y message specifying to set log status to executing. +/// +/// example message: '501,c8y_LogfileRequest' +pub async fn get_log_file_request_executing() -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest()?; + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y message specifying to set log status to successful. +/// +/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' +pub async fn get_log_file_request_done_message( + binary_upload_event_url: &str, +) -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(binary_upload_event_url) + .to_smartrest()?; + + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + /// Reads tedge logs according to `SmartRestLogRequest`. /// /// If needed, logs are concatenated. @@ -81,3 +116,72 @@ pub fn read_tedge_logs( } Ok(output) } + +#[cfg(test)] +mod tests { + use crate::smartrest::read_tedge_logs; + use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest; + use std::fs::File; + use std::io::Write; + + fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] { + let mut files: Vec<&str> = vec![]; + for line in log_content.lines() { + if line.contains("filename: ") { + let filename: &str = line.split("filename: ").last().unwrap(); + files.push(filename); + } + } + match files.try_into() { + Ok(arr) => arr, + Err(_) => panic!("Could not convert to Array &str, size 5"), + } + } + + #[test] + /// testing read_tedge_logs + /// + /// this test creates 5 fake log files in a temporary directory. + /// files are dated 2021-01-0XT01:00Z, where X = a different day. + /// + /// this tests will assert that files are read alphanumerically from oldest to newest + fn test_read_logs() { + // order in which files are created + const LOG_FILE_NAMES: [&str; 5] = [ + "software-list-2021-01-03T01:00:00Z.log", + "software-list-2021-01-02T01:00:00Z.log", + "software-list-2021-01-01T01:00:00Z.log", + "software-update-2021-01-03T01:00:00Z.log", + "software-update-2021-01-02T01:00:00Z.log", + ]; + + // expected (sorted) output + const EXPECTED_OUTPUT: [&str; 5] = [ + "software-list-2021-01-01T01:00:00Z", + "software-list-2021-01-02T01:00:00Z", + "software-list-2021-01-03T01:00:00Z", + "software-update-2021-01-02T01:00:00Z", + "software-update-2021-01-03T01:00:00Z", + ]; + + let smartrest_obj = SmartRestLogRequest::from_smartrest( + "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", + ) + .unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + // creating the files + for (idx, file) in LOG_FILE_NAMES.iter().enumerate() { + let file_path = &temp_dir.path().join(file); + let mut file = File::create(file_path).unwrap(); + writeln!(file, "file num {}", idx).unwrap(); + } + + // reading the logs and extracting the file names from the log output. + let output = read_tedge_logs(&smartrest_obj, temp_dir.path().to_str().unwrap()).unwrap(); + let parsed_values = parse_file_names_from_log_content(&output); + + // asserting the order = `EXPECTED_OUTPUT` + assert!(parsed_values.eq(&EXPECTED_OUTPUT)); + } +} |