summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-02-01 17:49:53 +0100
committerinitard <solo@softwareag.com>2022-02-15 11:43:49 +0000
commit2411743f17e14fa6031d4940d6a0135abdc00baf (patch)
treeddc7782f8e72a4a3b4531585ec93779bfaf0c05c
parentae402f67527d022a3cedd60f049d73724079850a (diff)
c8y_api lib, tedge config, mqtt/http utilities (#790)
Preparing the repo for the log request plugin. Restructuring folders, moving code out of sm_c8y_mapper and into c8y_api, c8y_smartrest or tedge_config. Signed-off-by: initard <solo@softwareag.com>
-rw-r--r--Cargo.lock48
-rw-r--r--Cargo.toml2
-rw-r--r--crates/common/tedge_config/src/tedge_config.rs7
-rw-r--r--crates/core/c8y_api/Cargo.toml38
-rw-r--r--crates/core/c8y_api/src/error.rs1
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs)52
-rw-r--r--crates/core/c8y_api/src/json_c8y.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs)0
-rw-r--r--crates/core/c8y_api/src/lib.rs12
-rw-r--r--crates/core/c8y_smartrest/Cargo.toml6
-rw-r--r--crates/core/c8y_smartrest/src/error.rs70
-rw-r--r--crates/core/c8y_smartrest/src/lib.rs2
-rw-r--r--crates/core/c8y_smartrest/src/operations.rs215
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs79
-rw-r--r--crates/core/c8y_smartrest/src/topic.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs)39
-rw-r--r--crates/core/tedge_mapper/Cargo.toml1
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs299
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs4
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs10
18 files changed, 574 insertions, 311 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0632b98a..1654ca24 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -330,6 +330,40 @@ dependencies = [
]
[[package]]
+name = "c8y_api"
+version = "0.1.0"
+dependencies = [
+ "agent_interface",
+ "async-trait",
+ "batcher",
+ "c8y_smartrest",
+ "c8y_translator",
+ "chrono",
+ "clock",
+ "csv",
+ "download",
+ "flockfile",
+ "futures",
+ "mockall",
+ "mqtt_channel",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "structopt",
+ "tedge_config",
+ "tedge_users",
+ "tedge_utils",
+ "tempfile",
+ "test-case",
+ "thin_edge_json",
+ "thiserror",
+ "time 0.3.5",
+ "tokio",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "c8y_smartrest"
version = "0.5.2"
dependencies = [
@@ -339,12 +373,18 @@ dependencies = [
"assert_matches",
"csv",
"download",
+ "mqtt_channel",
+ "reqwest",
"serde",
"serde_json",
+ "tedge_config",
+ "tempfile",
"test-case",
"thin_edge_json",
"thiserror",
"time 0.3.5",
+ "tokio",
+ "toml",
]
[[package]]
@@ -424,6 +464,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
+ "time 0.1.43",
"winapi",
]
@@ -2700,6 +2741,7 @@ dependencies = [
"assert_matches",
"async-trait",
"batcher",
+ "c8y_api",
"c8y_smartrest",
"c8y_translator",
"clock",
@@ -2753,13 +2795,13 @@ dependencies = [
[[package]]
name = "tempfile"
-version = "3.2.0"
+version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
+checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [
"cfg-if 1.0.0",
+ "fastrand",
"libc",
- "rand",
"redox_syscall",
"remove_dir_all",
"winapi",
diff --git a/Cargo.toml b/Cargo.toml
index 0bd9467e..9cba68c9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,7 +6,7 @@ members = [
"crates/tests/*",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
- "plugins/tedge_apama_plugin"
+ "plugins/tedge_apama_plugin",
]
[profile.release]
diff --git a/crates/common/tedge_config/src/tedge_config.rs b/crates/common/tedge_config/src/tedge_config.rs
index 52e0178e..32587a15 100644
--- a/crates/common/tedge_config/src/tedge_config.rs
+++ b/crates/common/tedge_config/src/tedge_config.rs
@@ -2,6 +2,13 @@ use crate::*;
use certificate::{CertificateError, PemCertificate};
use std::convert::{TryFrom, TryInto};
+/// loads tedge config from system default
+pub fn get_tedge_config() -> Result<TEdgeConfig, TEdgeConfigError> {
+ let tedge_config_location = TEdgeConfigLocation::from_default_system_location();
+ let config_repository = TEdgeConfigRepository::new(tedge_config_location);
+ Ok(config_repository.load()?)
+}
+
/// Represents the complete configuration of a thin edge device.
/// This configuration is a wrapper over the device specific configurations
/// as well as the IoT cloud provider specific configurations.
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml
new file mode 100644
index 00000000..1e3cb0f5
--- /dev/null
+++ b/crates/core/c8y_api/Cargo.toml
@@ -0,0 +1,38 @@
+[package]
+name = "c8y_api"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+agent_interface = { path = "../agent_interface"}
+async-trait = "0.1"
+batcher = { path = "../../common/batcher" }
+c8y_smartrest = { path = "../c8y_smartrest" }
+c8y_translator = { path = "../c8y_translator" }
+chrono = "0.4"
+clock = { path = "../../common/clock" }
+csv = "1.1"
+download = { path = "../../common/download" }
+flockfile = { path = "../../common/flockfile" }
+futures = "0.3"
+mockall = "0.10"
+mqtt_channel = { path = "../../common/mqtt_channel" }
+reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+structopt = "0.3"
+tedge_config = { path = "../../common/tedge_config" }
+tedge_users = { path = "../../common/tedge_users" }
+tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
+thin_edge_json = { path = "../thin_edge_json" }
+thiserror = "1.0"
+time = { version = "0.3", features = ["formatting"] }
+tokio = { version = "1.8", features = ["rt", "sync", "time"] }
+toml = "0.5"
+tracing = { version = "0.1", features = ["attributes", "log"] }
+
+[dev-dependencies]
+tempfile = "3.3"
+test-case = "1.2"
diff --git a/crates/core/c8y_api/src/error.rs b/crates/core/c8y_api/src/error.rs
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/crates/core/c8y_api/src/error.rs
@@ -0,0 +1 @@
+
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 4013bc0a..2a56946a 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -1,22 +1,51 @@
-use crate::sm_c8y_mapper::error::SMCumulocityMapperError;
-use crate::sm_c8y_mapper::json_c8y::{
+use crate::json_c8y::{
C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
};
-use crate::sm_c8y_mapper::mapper::SmartRestLogEvent;
use async_trait::async_trait;
-use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
+use c8y_smartrest::{
+ error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse, topic::C8yTopic,
+};
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
use std::time::Duration;
use tedge_config::{
- C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
- MqttPortSetting, TEdgeConfig,
+ get_tedge_config, C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt,
+ DeviceIdSetting, MqttPortSetting, TEdgeConfig,
};
use time::{format_description, OffsetDateTime};
+
+use serde::{Deserialize, Serialize};
use tracing::{error, info, instrument};
const RETRY_TIMEOUT_SECS: u64 = 60;
+/// creates an mqtt client with a given `session_name`
+pub async fn create_mqtt_client(
+ session_name: &str,
+) -> Result<mqtt_channel::Connection, SMCumulocityMapperError> {
+ let tedge_config = get_tedge_config()?;
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(mqtt_port)
+ .with_session_name(session_name)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestResponse.as_str(),
+ ));
+
+ let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ Ok(mqtt_client)
+}
+
+/// creates an http client with a given `session_name`
+pub async fn create_http_client(
+ session_name: &str,
+) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> {
+ let config = get_tedge_config()?;
+ let mut http_proxy = JwtAuthHttpProxy::try_new(&config, &session_name).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
/// An HttpProxy handles http requests to C8y on behalf of the device.
#[async_trait]
pub trait C8YHttpProxy {
@@ -117,6 +146,13 @@ impl C8yEndPoint {
}
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+#[serde(rename_all = "camelCase")]
+/// used to retrieve the id of a log event
+pub struct SmartRestLogEvent {
+ pub id: String,
+}
+
/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device
///
/// - Keep the connection info to c8y and the internal Id of the device
@@ -147,6 +183,7 @@ impl JwtAuthHttpProxy {
pub async fn try_new(
tedge_config: &TEdgeConfig,
+ session_name: &str,
) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> {
let c8y_host = tedge_config.query_string(C8yUrlSetting)?;
let device_id = tedge_config.query_string(DeviceIdSetting)?;
@@ -157,7 +194,7 @@ impl JwtAuthHttpProxy {
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
.with_clean_session(true)
- .with_session_name("JWT-Requester")
+ .with_session_name(session_name)
.with_subscriptions(topic);
let mut mqtt_con = Connection::new(&mqtt_config).await?;
@@ -233,6 +270,7 @@ impl JwtAuthHttpProxy {
.build()?;
let response = self.http_con.execute(request).await?;
+ dbg!(&response);
let event_response_body = response.json::<SmartRestLogEvent>().await?;
Ok(event_response_body.id)
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs
index 8566a84c..8566a84c 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs
+++ b/crates/core/c8y_api/src/json_c8y.rs
diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs
new file mode 100644
index 00000000..2a0286e0
--- /dev/null
+++ b/crates/core/c8y_api/src/lib.rs
@@ -0,0 +1,12 @@
+pub mod error;
+pub mod http_proxy;
+pub mod json_c8y;
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn it_works() {
+ let result = 2 + 2;
+ assert_eq!(result, 4);
+ }
+}
diff --git a/crates/core/c8y_smartrest/Cargo.toml b/crates/core/c8y_smartrest/Cargo.toml
index 03f575bb..a25ef0da 100644
--- a/crates/core/c8y_smartrest/Cargo.toml
+++ b/crates/core/c8y_smartrest/Cargo.toml
@@ -9,14 +9,20 @@ rust-version = "1.58.1"
agent_interface = { path = "../agent_interface" }
csv = "1.1"
download = { path = "../../common/download" }
+mqtt_channel = { path = "../../common/mqtt_channel" }
+reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
+tedge_config = { path = "../../common/tedge_config" }
thin_edge_json = { path = "../thin_edge_json" }
thiserror = "1.0"
time = { version = "0.3", features = ["formatting", "macros", "parsing", "serde"] }
+tokio = { version = "1.8", features = ["rt", "sync", "time"] }
+toml = "0.5"
[dev-dependencies]
anyhow = "1.0"
assert_matches = "1.5"
assert-json-diff = "2.0"
serde_json = "1.0"
+tempfile = "3.3"
test-case = "1.2.1"
diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs
index ee52f984..9531f372 100644
--- a/crates/core/c8y_smartrest/src/error.rs
+++ b/crates/core/c8y_smartrest/src/error.rs
@@ -1,4 +1,5 @@
use agent_interface::SoftwareUpdateResponse;
+use std::path::PathBuf;
#[derive(thiserror::Error, Debug)]
pub enum SmartRestSerializerError {
@@ -39,3 +40,72 @@ pub enum SmartRestDeserializerError {
#[error("Empty request")]
EmptyRequest,
}
+
+#[derive(Debug, thiserror::Error)]
+pub enum OperationsError {
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
+
+ #[error("Cannot extract the operation name from the path: {0}")]
+ InvalidOperationName(PathBuf),
+
+ #[error("Error while parsing operation file: '{0}': {1}.")]
+ TomlError(PathBuf, #[source] toml::de::Error),
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum SMCumulocityMapperError {
+ #[error("Invalid MQTT Message.")]
+ InvalidMqttMessage,
+
+ #[error(transparent)]
+ InvalidTopicError(#[from] agent_interface::TopicError),
+
+ #[error(transparent)]
+ InvalidThinEdgeJson(#[from] agent_interface::SoftwareError),
+
+ #[error(transparent)]
+ FromElapsed(#[from] tokio::time::error::Elapsed),
+
+ #[error(transparent)]
+ FromMqttClient(#[from] mqtt_channel::MqttError),
+
+ #[error(transparent)]
+ FromReqwest(#[from] reqwest::Error),
+
+ #[error(transparent)]
+ FromSmartRestSerializer(#[from] SmartRestSerializerError),
+
+ #[error(transparent)]
+ FromSmartRestDeserializer(#[from] SmartRestDeserializerError),
+
+ #[error(transparent)]
+ FromTedgeConfig(#[from] tedge_config::ConfigSettingError),
+
+ #[error(transparent)]
+ FromLoadTedgeConfigError(#[from] tedge_config::TEdgeConfigError),
+
+ #[error("Invalid date in file name: {0}")]
+ InvalidDateInFileName(String),
+
+ #[error("Invalid path. Not UTF-8.")]
+ InvalidUtf8Path,
+
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
+
+ #[error("Request timed out")]
+ RequestTimeout,
+
+ #[error("Operation execution failed: {0}")]
+ ExecuteFailed(String),
+
+ #[error("An unknown operation template: {0}")]
+ UnknownOperation(String),
+
+ #[error(transparent)]
+ FromTimeFormat(#[from] time::error::Format),
+
+ #[error(transparent)]
+ FromTimeParse(#[from] time::error::Parse),
+}
diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs
index 53976e8d..b6338b06 100644
--- a/crates/core/c8y_smartrest/src/lib.rs
+++ b/crates/core/c8y_smartrest/src/lib.rs
@@ -1,5 +1,7 @@
pub mod alarm;
pub mod error;
pub mod event;
+pub mod operations;
pub mod smartrest_deserializer;
pub mod smartrest_serializer;
+pub mod topic;
diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs
new file mode 100644
index 00000000..b3909aac
--- /dev/null
+++ b/crates/core/c8y_smartrest/src/operations.rs
@@ -0,0 +1,215 @@
+use std::{
+ collections::{HashMap, HashSet},
+ fs,
+ path::{Path, PathBuf},
+};
+
+use serde::Deserialize;
+
+use crate::error::OperationsError;
+
+/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory
+/// Each operation is a file name in one of the subdirectories
+/// The file name is the operation name
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub struct OnMessageExec {
+ command: Option<String>,
+ on_message: Option<String>,
+ topic: Option<String>,
+ user: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "lowercase")]
+pub struct Operation {
+ #[serde(skip)]
+ name: String,
+ exec: Option<OnMessageExec>,
+}
+
+impl Operation {
+ pub fn exec(&self) -> Option<&OnMessageExec> {
+ self.exec.as_ref()
+ }
+
+ pub fn command(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.command.clone())
+ }
+
+ pub fn topic(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.topic.clone())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Operations {
+ operations: Vec<Operation>,
+ operations_by_trigger: HashMap<String, usize>,
+}
+
+impl Operations {
+ pub fn new() -> Self {
+ Self {
+ operations: vec![],
+ operations_by_trigger: HashMap::new(),
+ }
+ }
+
+ pub fn add(&mut self, operation: Operation) {
+ if let Some(detail) = operation.exec() {
+ if let Some(on_message) = &detail.on_message {
+ self.operations_by_trigger
+ .insert(on_message.clone(), self.operations.len());
+ }
+ }
+ self.operations.push(operation);
+ }
+
+ pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> {
+ get_operations(dir.as_ref(), cloud_name)
+ }
+
+ pub fn get_operations_list(&self) -> Vec<String> {
+ self.operations
+ .iter()
+ .map(|operation| operation.name.clone())
+ .collect::<Vec<String>>()
+ }
+
+ pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> {
+ self.operations_by_trigger
+ .get(operation_template)
+ .and_then(|index| self.operations.get(*index))
+ }
+
+ pub fn topics_for_operations(&self) -> HashSet<String> {
+ self.operations
+ .iter()
+ .filter_map(|operation| operation.topic())
+ .collect::<HashSet<String>>()
+ }
+}
+
+fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> {
+ let mut operations = Operations::new();
+
+ let path = dir.as_ref().join(&cloud_name);
+ let dir_entries = fs::read_dir(&path)?
+ .map(|entry| entry.map(|e| e.path()))
+ .collect::<Result<Vec<PathBuf>, _>>()?
+ .into_iter()
+ .filter(|path| path.is_file())
+ .collect::<Vec<PathBuf>>();
+
+ for path in dir_entries {
+ let mut details = match fs::read(&path) {
+ Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
+ .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
+
+ Err(err) => return Err(OperationsError::FromIo(err)),
+ };
+
+ details.name = path
+ .file_name()
+ .and_then(|filename| filename.to_str())
+ .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
+ .to_owned();
+
+ operations.add(details);
+ }
+ Ok(operations)
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Write;
+
+ use super::*;
+ use test_case::test_case;
+
+ // Structs for state change with the builder pattern
+ // Structs for Operations
+ struct Ops(Vec<PathBuf>);
+ struct NoOps;
+
+ struct TestOperationsBuilder<O> {
+ temp_dir: tempfile::TempDir,
+ operations: O,
+ }
+
+ impl TestOperationsBuilder<NoOps> {
+ fn new() -> Self {
+ Self {
+ temp_dir: tempfile::tempdir().unwrap(),
+ operations: NoOps,
+ }
+ }
+ }
+
+ impl TestOperationsBuilder<NoOps> {
+ fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> {
+ let Self { temp_dir, .. } = self;
+
+ let mut operations = Vec::new();
+ for i in 0..operations_count {
+ let file_path = temp_dir.path().join(format!("operation{}", i));
+ let mut file = fs::File::create(&file_path).unwrap();
+ file.write_all(
+ br#"[exec]
+ command = "echo"
+ on_message = "511""#,
+ )
+ .unwrap();
+ operations.push(file_path);
+ }
+
+ TestOperationsBuilder {
+ operations: Ops(operations),
+ temp_dir,
+ }
+ }
+ }
+
+ impl TestOperationsBuilder<Ops> {
+ fn build(self) -> TestOperations {
+ let Self {
+ temp_dir,
+ operations,
+ } = self;
+
+ TestOperations {
+ temp_dir,
+ operations: operations.0,
+ }
+ }
+ }
+
+ struct TestOperations {
+ temp_dir: tempfile::TempDir,
+ operations: Vec<PathBuf>,
+ }
+
+ impl TestOperations {
+ fn builder() -> TestOperationsBuilder<NoOps> {
+ TestOperationsBuilder::new()
+ }
+
+ fn temp_dir(&self) -> &tempfile::TempDir {
+ &self.temp_dir
+ }
+ }
+
+ #[test_case(0)]
+ #[test_case(1)]
+ #[test_case(5)]
+ fn get_operations_all(ops_count: usize) {
+ let test_operations = TestOperations::builder().with_operations(ops_count).build();
+
+ let operations = get_operations(test_operations.temp_dir(), "").unwrap();
+ dbg!(&operations);
+
+ assert_eq!(operations.operations.len(), ops_count);
+ }
+}
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index c995c7d1..fea01538 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -1,10 +1,11 @@
-use crate::error::SmartRestDeserializerError;
+use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError};
use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest};
use csv::ReaderBuilder;
use download::DownloadInfo;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use std::convert::{TryFrom, TryInto};
+use std::path::PathBuf;
use time::{format_description, OffsetDateTime};
#[derive(Debug)]
@@ -287,12 +288,50 @@ impl SmartRestJwtResponse {
}
}
+/// Returns a date time object from a file path or file-path-like string
+/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
+///
+/// # Examples:
+/// ```
+/// use std::path::PathBuf;
+/// use crate::c8y_smartrest::smartrest_deserializer::get_datetime_from_file_path;
+///
+/// let mut path = PathBuf::new();
+/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z");
+/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap();
+/// ```
+pub fn get_datetime_from_file_path(
+ log_path: &PathBuf,
+) -> Result<OffsetDateTime, SMCumulocityMapperError> {
+ if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) {
+ // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z.
+ // to extract the date, rsplit string on "-" and take (last) 3
+ let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>();
+ // reverse back the order (because of rsplit)
+ stem_string_vec.reverse();
+ // join on '-' to get the date string
+ let date_string = stem_string_vec.join("-");
+ let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?;
+
+ return Ok(dt);
+ }
+ match log_path.to_str() {
+ Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName(
+ path.to_string(),
+ ))?,
+ None => Err(SMCumulocityMapperError::InvalidUtf8Path)?,
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use agent_interface::*;
use assert_json_diff::*;
use serde_json::json;
+ use std::fs::File;
+ use std::io::Write;
+ use std::str::FromStr;
use test_case::test_case;
// To avoid using an ID randomly generated, which is not convenient for testing.
@@ -568,4 +607,42 @@ mod tests {
let log = SmartRestRestartRequest::from_smartrest(&smartrest);
assert!(log.is_ok());
}
+
+ #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")]
+ fn test_datetime_parsing_from_path(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an Ok Result.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_ok());
+ }
+
+ #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")]
+ fn test_datetime_parsing_from_path_fail(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an err.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_err());
+ }
+
+ fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] {
+ let mut files: Vec<&str> = vec![];
+ for line in log_content.lines() {
+ if line.contains("filename: ") {
+ let filename: &str = line.split("filename: ").last().unwrap();
+ files.push(filename);
+ }
+ }
+ match files.try_into() {
+ Ok(arr) => arr,
+ Err(_) => panic!("Could not convert to Array &str, size 5"),
+ }
+ }
}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/c8y_smartrest/src/topic.rs
index 4fe0069e..491c7a4b 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
+++ b/crates/core/c8y_smartrest/src/topic.rs
@@ -1,6 +1,13 @@
-use agent_interface::{error::*, topic::ResponseTopic};
-use mqtt_channel::{MqttError, Topic};
-use std::convert::{TryFrom, TryInto};
+use agent_interface::topic::ResponseTopic;
+use agent_interface::TopicError;
+use mqtt_channel::Topic;
+use mqtt_channel::{Message, MqttError};
+
+use crate::error::SMCumulocityMapperError;
+use crate::smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToSuccessful,
+};
#[derive(Debug, Clone, PartialEq)]
pub enum C8yTopic {
@@ -93,6 +100,32 @@ impl TryFrom<Topic> for MapperSubscribeTopic {
}
}
+/// returns a c8y message specifying to set log status to executing.
+///
+/// example message: '501,c8y_LogfileRequest'
+pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()?;
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+/// returns a c8y message specifying to set log status to successful.
+///
+/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...'
+pub async fn get_log_file_request_done_message(
+ binary_upload_event_url: &str,
+) -> Result<Message, SMCumulocityMapperError> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(binary_upload_event_url)