summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-02-10 13:32:18 +0000
committerinitard <solo@softwareag.com>2022-02-15 11:43:49 +0000
commitd07c04771f017760667c0cae2ec76113c8ee23c4 (patch)
tree3a23c1be14e8f19adbe89e36a2302b3f74d9d323
parent0320741b109f50d1b0f7cda44e33dc31ba04902d (diff)
moving implemenation specific code to plugin (#790)
- Moved mqtt client and http client to pluing main.rs - Removed empty test file and default cargo lib test - Moved log specific messages from topic.rs to plugin - Fixed mapper session name Signed-off-by: initard <solo@softwareag.com>
-rw-r--r--Cargo.lock30
-rw-r--r--Cargo.toml1
-rw-r--r--crates/core/c8y_api/Cargo.toml5
-rw-r--r--crates/core/c8y_api/src/error.rs1
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs36
-rw-r--r--crates/core/c8y_api/src/lib.rs10
-rw-r--r--crates/core/c8y_smartrest/src/topic.rs34
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs3
-rw-r--r--plugins/log_request_plugin/Cargo.toml4
-rw-r--r--plugins/log_request_plugin/src/main.rs40
-rw-r--r--plugins/log_request_plugin/src/smartrest.rs106
11 files changed, 172 insertions, 98 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1654ca24..a20b2fd9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -335,23 +335,18 @@ version = "0.1.0"
dependencies = [
"agent_interface",
"async-trait",
- "batcher",
"c8y_smartrest",
- "c8y_translator",
"chrono",
"clock",
"csv",
"download",
- "flockfile",
"futures",
"mockall",
"mqtt_channel",
"reqwest",
"serde",
"serde_json",
- "structopt",
"tedge_config",
- "tedge_users",
"tedge_utils",
"tempfile",
"test-case",
@@ -464,6 +459,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
+ "serde",
"time 0.1.43",
"winapi",
]
@@ -1315,6 +1311,30 @@ dependencies = [
]
[[package]]
+name = "log_request_plugin"
+version = "0.5.2"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "c8y_api",
+ "c8y_smartrest",
+ "chrono",
+ "csv",
+ "futures",
+ "mockall",
+ "mqtt_channel",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "tedge_config",
+ "tempfile",
+ "thiserror",
+ "tokio",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 9cba68c9..658590e0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,6 +7,7 @@ members = [
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
"plugins/tedge_apama_plugin",
+ "plugins/log_request_plugin",
]
[profile.release]
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml
index 1e3cb0f5..ea116f84 100644
--- a/crates/core/c8y_api/Cargo.toml
+++ b/crates/core/c8y_api/Cargo.toml
@@ -8,23 +8,18 @@ edition = "2021"
[dependencies]
agent_interface = { path = "../agent_interface"}
async-trait = "0.1"
-batcher = { path = "../../common/batcher" }
c8y_smartrest = { path = "../c8y_smartrest" }
-c8y_translator = { path = "../c8y_translator" }
chrono = "0.4"
clock = { path = "../../common/clock" }
csv = "1.1"
download = { path = "../../common/download" }
-flockfile = { path = "../../common/flockfile" }
futures = "0.3"
mockall = "0.10"
mqtt_channel = { path = "../../common/mqtt_channel" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
-structopt = "0.3"
tedge_config = { path = "../../common/tedge_config" }
-tedge_users = { path = "../../common/tedge_users" }
tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
thin_edge_json = { path = "../thin_edge_json" }
thiserror = "1.0"
diff --git a/crates/core/c8y_api/src/error.rs b/crates/core/c8y_api/src/error.rs
deleted file mode 100644
index 8b137891..00000000
--- a/crates/core/c8y_api/src/error.rs
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 2a56946a..0cd25f20 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -2,15 +2,14 @@ use crate::json_c8y::{
C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
};
use async_trait::async_trait;
-use c8y_smartrest::{
- error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse, topic::C8yTopic,
-};
+use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse};
+use chrono::{DateTime, Local};
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
use std::time::Duration;
use tedge_config::{
- get_tedge_config, C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt,
- DeviceIdSetting, MqttPortSetting, TEdgeConfig,
+ C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
+ MqttPortSetting, TEdgeConfig,
};
use time::{format_description, OffsetDateTime};
@@ -19,33 +18,6 @@ use tracing::{error, info, instrument};
const RETRY_TIMEOUT_SECS: u64 = 60;
-/// creates an mqtt client with a given `session_name`
-pub async fn create_mqtt_client(
- session_name: &str,
-) -> Result<mqtt_channel::Connection, SMCumulocityMapperError> {
- let tedge_config = get_tedge_config()?;
- let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
- let mqtt_config = mqtt_channel::Config::default()
- .with_port(mqtt_port)
- .with_session_name(session_name)
- .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
- C8yTopic::SmartRestResponse.as_str(),
- ));
-
- let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
- Ok(mqtt_client)
-}
-
-/// creates an http client with a given `session_name`
-pub async fn create_http_client(
- session_name: &str,
-) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> {
- let config = get_tedge_config()?;
- let mut http_proxy = JwtAuthHttpProxy::try_new(&config, &session_name).await?;
- let () = http_proxy.init().await?;
- Ok(http_proxy)
-}
-
/// An HttpProxy handles http requests to C8y on behalf of the device.
#[async_trait]
pub trait C8YHttpProxy {
diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs
index 2a0286e0..7eb6352c 100644
--- a/crates/core/c8y_api/src/lib.rs
+++ b/crates/core/c8y_api/src/lib.rs
@@ -1,12 +1,2 @@
-pub mod error;
pub mod http_proxy;
pub mod json_c8y;
-
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
- }
-}
diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs
index 491c7a4b..2c6c96b0 100644
--- a/crates/core/c8y_smartrest/src/topic.rs
+++ b/crates/core/c8y_smartrest/src/topic.rs
@@ -1,13 +1,7 @@
use agent_interface::topic::ResponseTopic;
use agent_interface::TopicError;
+use mqtt_channel::MqttError;
use mqtt_channel::Topic;
-use mqtt_channel::{Message, MqttError};
-
-use crate::error::SMCumulocityMapperError;
-use crate::smartrest_serializer::{
- CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
- SmartRestSetOperationToSuccessful,
-};
#[derive(Debug, Clone, PartialEq)]
pub enum C8yTopic {
@@ -100,32 +94,6 @@ impl TryFrom<Topic> for MapperSubscribeTopic {
}
}
-/// returns a c8y message specifying to set log status to executing.
-///
-/// example message: '501,c8y_LogfileRequest'
-pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> {
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let smartrest_set_operation_status =
- SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
- .to_smartrest()?;
- Ok(Message::new(&topic, smartrest_set_operation_status))
-}
-
-/// returns a c8y message specifying to set log status to successful.
-///
-/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...'
-pub async fn get_log_file_request_done_message(
- binary_upload_event_url: &str,
-) -> Result<Message, SMCumulocityMapperError> {
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let smartrest_set_operation_status =
- SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
- .with_response_parameter(binary_upload_event_url)
- .to_smartrest()?;
-
- Ok(Message::new(&topic, smartrest_set_operation_status))
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index 703e3cad..0ade06a2 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -79,8 +79,7 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper {
#[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")]
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let session_name = "SM-Tedge-Mapper";
- let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config, &session_name).await?;
+ let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config, SM_MAPPER).await?;
let mut sm_mapper =
CumulocitySoftwareManagement::try_new(&tedge_config, http_proxy, operations).await?;
diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/log_request_plugin/Cargo.toml
index ee26d4e0..456d3dce 100644
--- a/plugins/log_request_plugin/Cargo.toml
+++ b/plugins/log_request_plugin/Cargo.toml
@@ -5,7 +5,7 @@ authors = ["thin-edge.io team <info@thin-edge.io>"]
edition = "2021"
rust-version = "1.58"
license = "Apache-2.0"
-# TODO: add description?
+description = "Thin.edge.io operation plugin for Cumulocity log request"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -21,12 +21,10 @@ mockall = "0.10"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
-structopt = "0.3"
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
-tedge_mapper = { path = "../../crates/core/tedge_mapper" }
tedge_config = { path = "../../crates/common/tedge_config" }
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
diff --git a/plugins/log_request_plugin/src/main.rs b/plugins/log_request_plugin/src/main.rs
index f5109589..274090ce 100644
--- a/plugins/log_request_plugin/src/main.rs
+++ b/plugins/log_request_plugin/src/main.rs
@@ -1,12 +1,14 @@
mod smartrest;
-use c8y_api::http_proxy::{create_http_client, create_mqtt_client, C8YHttpProxy};
-use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
-use c8y_smartrest::topic::{get_log_file_request_done_message, get_log_file_request_executing};
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::{smartrest_deserializer::SmartRestLogRequest, topic::C8yTopic};
+use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting};
use futures::SinkExt;
-use smartrest::read_tedge_logs;
+use smartrest::{
+ get_log_file_request_done_message, get_log_file_request_executing, read_tedge_logs,
+};
use std::time::Duration;
use tokio::time::sleep;
@@ -15,14 +17,39 @@ const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
const MQTT_SESSION_NAME: &str = "log plugin mqtt session";
const HTTP_SESSION_NAME: &str = "log plugin http session";
+/// creates an mqtt client with a given `session_name`
+pub async fn create_mqtt_client(
+ session_name: &str,
+) -> Result<mqtt_channel::Connection, anyhow::Error> {
+ let tedge_config = get_tedge_config()?;
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(mqtt_port)
+ .with_session_name(session_name)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestResponse.as_str(),
+ ));
+
+ let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ Ok(mqtt_client)
+}
+
+/// creates an http client with a given `session_name`
+pub async fn create_http_client(session_name: &str) -> Result<JwtAuthHttpProxy, anyhow::Error> {
+ let config = get_tedge_config()?;
+ let mut http_proxy = JwtAuthHttpProxy::try_new(&config, session_name).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// reading payload from command line arguments
let payload = std::env::args().nth(1).expect("no payload given");
// creating required clients
- let mut mqtt_client = create_mqtt_client(&MQTT_SESSION_NAME).await?;
- let mut http_client = create_http_client(&HTTP_SESSION_NAME).await?;
+ let mut mqtt_client = create_mqtt_client(MQTT_SESSION_NAME).await?;
+ let mut http_client = create_http_client(HTTP_SESSION_NAME).await?;
// retrieve smartrest object from payload
let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
@@ -51,6 +78,7 @@ async fn main() -> Result<(), anyhow::Error> {
/// NOTE: this is a quick-fix to enable step 4. in main to be executed.
/// if `do_one_second_pause()` is not called, step 4. in main
/// does not get triggered.
+// see #846
async fn do_one_second_pause() {
sleep(Duration::from_secs(1)).await;
}
diff --git a/plugins/log_request_plugin/src/smartrest.rs b/plugins/log_request_plugin/src/smartrest.rs
index b9debc19..187daa44 100644
--- a/plugins/log_request_plugin/src/smartrest.rs
+++ b/plugins/log_request_plugin/src/smartrest.rs
@@ -1,4 +1,39 @@
-use c8y_smartrest::smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest};
+use c8y_smartrest::{
+ smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToSuccessful,
+ },
+ topic::C8yTopic,
+};
+use mqtt_channel::Message;
+
+/// returns a c8y message specifying to set log status to executing.
+///
+/// example message: '501,c8y_LogfileRequest'
+pub async fn get_log_file_request_executing() -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()?;
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+/// returns a c8y message specifying to set log status to successful.
+///
+/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...'
+pub async fn get_log_file_request_done_message(
+ binary_upload_event_url: &str,
+) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(binary_upload_event_url)
+ .to_smartrest()?;
+
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
/// Reads tedge logs according to `SmartRestLogRequest`.
///
/// If needed, logs are concatenated.
@@ -81,3 +116,72 @@ pub fn read_tedge_logs(
}
Ok(output)
}
+
+#[cfg(test)]
+mod tests {
+ use crate::smartrest::read_tedge_logs;
+ use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
+ use std::fs::File;
+ use std::io::Write;
+
+ fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] {
+ let mut files: Vec<&str> = vec![];
+ for line in log_content.lines() {
+ if line.contains("filename: ") {
+ let filename: &str = line.split("filename: ").last().unwrap();
+ files.push(filename);
+ }
+ }
+ match files.try_into() {
+ Ok(arr) => arr,
+ Err(_) => panic!("Could not convert to Array &str, size 5"),
+ }
+ }
+
+ #[test]
+ /// testing read_tedge_logs
+ ///
+ /// this test creates 5 fake log files in a temporary directory.
+ /// files are dated 2021-01-0XT01:00Z, where X = a different day.
+ ///
+ /// this tests will assert that files are read alphanumerically from oldest to newest
+ fn test_read_logs() {
+ // order in which files are created
+ const LOG_FILE_NAMES: [&str; 5] = [
+ "software-list-2021-01-03T01:00:00Z.log",
+ "software-list-2021-01-02T01:00:00Z.log",
+ "software-list-2021-01-01T01:00:00Z.log",
+ "software-update-2021-01-03T01:00:00Z.log",
+ "software-update-2021-01-02T01:00:00Z.log",
+ ];
+
+ // expected (sorted) output
+ const EXPECTED_OUTPUT: [&str; 5] = [
+ "software-list-2021-01-01T01:00:00Z",
+ "software-list-2021-01-02T01:00:00Z",
+ "software-list-2021-01-03T01:00:00Z",
+ "software-update-2021-01-02T01:00:00Z",
+ "software-update-2021-01-03T01:00:00Z",
+ ];
+
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(
+ "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+ )
+ .unwrap();
+
+ let temp_dir = tempfile::tempdir().unwrap();
+ // creating the files
+ for (idx, file) in LOG_FILE_NAMES.iter().enumerate() {
+ let file_path = &temp_dir.path().join(file);
+ let mut file = File::create(file_path).unwrap();
+ writeln!(file, "file num {}", idx).unwrap();
+ }
+
+ // reading the logs and extracting the file names from the log output.
+ let output = read_tedge_logs(&smartrest_obj, temp_dir.path().to_str().unwrap()).unwrap();
+ let parsed_values = parse_file_names_from_log_content(&output);
+
+ // asserting the order = `EXPECTED_OUTPUT`
+ assert!(parsed_values.eq(&EXPECTED_OUTPUT));
+ }
+}