diff options
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_converter.rs | 7 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/error.rs | 8 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/operations.rs | 272 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 79 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 26 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs | 21 | ||||
-rw-r--r-- | docs/src/tutorials/supported_operations.md | 66 |
10 files changed, 299 insertions, 188 deletions
@@ -2728,6 +2728,7 @@ dependencies = [ "thiserror", "tokio", "tokio-test", + "toml", "tracing", ] diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index 6384e076..25f2a14d 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -49,6 +49,7 @@ tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } thin_edge_json = { path = "../thin_edge_json" } thiserror = "1.0" tokio = { version = "1.8", features = ["rt", "sync", "time"] } +toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs index 6fb9997a..ee85ade7 100644 --- a/crates/core/tedge_mapper/src/c8y_converter.rs +++ b/crates/core/tedge_mapper/src/c8y_converter.rs @@ -103,13 +103,14 @@ impl Converter for CumulocityConverter { } else if input.topic.name.starts_with("tedge/alarms") { self.try_convert_alarm(input) } else { - return Err(ConversionError::UnsupportedTopic(input.topic.name.clone())); + Err(ConversionError::UnsupportedTopic(input.topic.name.clone())) } } fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> { - let ops = Operations::try_new("/etc/tedge/operations")?; - let ops = ops.get_operations_list("c8y"); + let ops = Operations::try_new("/etc/tedge/operations", "c8y")?; + let ops = ops.get_operations_list(); + let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>(); let ops_msg = SmartRestSetSupportedOperations::new(&ops); let topic = Topic::new_unchecked("c8y/s/us"); diff --git a/crates/core/tedge_mapper/src/error.rs b/crates/core/tedge_mapper/src/error.rs index b1aa90b5..93bc1041 100644 --- a/crates/core/tedge_mapper/src/error.rs +++ b/crates/core/tedge_mapper/src/error.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use crate::size_threshold::SizeThresholdExceeded; use mqtt_client::MqttClientError; use tedge_config::TEdgeConfigError; @@ -61,4 +63,10 @@ pub enum ConversionError { pub enum OperationsError { #[error(transparent)] FromIo(#[from] std::io::Error), + + #[error("Cannot extract the operation name from the path: {0}")] + InvalidOperationName(PathBuf), + + #[error("Error while parsing operation file: '{0}': {1}.")] + TomlError(PathBuf, #[source] toml::de::Error), } diff --git a/crates/core/tedge_mapper/src/operations.rs b/crates/core/tedge_mapper/src/operations.rs index b4dbe2a7..b3909aac 100644 --- a/crates/core/tedge_mapper/src/operations.rs +++ b/crates/core/tedge_mapper/src/operations.rs @@ -4,216 +4,183 @@ use std::{ path::{Path, PathBuf}, }; +use serde::Deserialize; + use crate::error::OperationsError; /// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory /// Each operation is a file name in one of the subdirectories /// The file name is the operation name -type Cloud = String; -type OperationName = String; -type Operation = HashSet<OperationName>; -type OperationsMap = HashMap<Cloud, Operation>; +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub struct OnMessageExec { + command: Option<String>, + on_message: Option<String>, + topic: Option<String>, + user: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub struct Operation { + #[serde(skip)] + name: String, + exec: Option<OnMessageExec>, +} + +impl Operation { + pub fn exec(&self) -> Option<&OnMessageExec> { + self.exec.as_ref() + } + + pub fn command(&self) -> Option<String> { + self.exec().and_then(|exec| exec.command.clone()) + } -#[derive(Debug, Clone, PartialEq)] + pub fn topic(&self) -> Option<String> { + self.exec().and_then(|exec| exec.topic.clone()) + } +} + +#[derive(Debug, Clone)] pub struct Operations { - cloud: PathBuf, - operations: OperationsMap, + operations: Vec<Operation>, + operations_by_trigger: HashMap<String, usize>, } impl Operations { - pub fn try_new(dir: impl AsRef<Path>) -> Result<Self, OperationsError> { - let operations = get_operations(dir.as_ref())?; + pub fn new() -> Self { + Self { + operations: vec![], + operations_by_trigger: HashMap::new(), + } + } - Ok(Self { - cloud: dir.as_ref().to_path_buf(), - operations, - }) + pub fn add(&mut self, operation: Operation) { + if let Some(detail) = operation.exec() { + if let Some(on_message) = &detail.on_message { + self.operations_by_trigger + .insert(on_message.clone(), self.operations.len()); + } + } + self.operations.push(operation); } - pub fn get_operations_list(&self, cloud: &str) -> Vec<&str> { + pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> { + get_operations(dir.as_ref(), cloud_name) + } + + pub fn get_operations_list(&self) -> Vec<String> { self.operations - .get(cloud) - .map(|operations| operations.iter().map(|k| k.as_str()).collect()) - .unwrap_or_default() + .iter() + .map(|operation| operation.name.clone()) + .collect::<Vec<String>>() + } + + pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> { + self.operations_by_trigger + .get(operation_template) + .and_then(|index| self.operations.get(*index)) + } + + pub fn topics_for_operations(&self) -> HashSet<String> { + self.operations + .iter() + .filter_map(|operation| operation.topic()) + .collect::<HashSet<String>>() } } -fn get_clouds(dir: impl AsRef<Path>) -> Result<Vec<String>, OperationsError> { - Ok(fs::read_dir(dir)? +fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> { + let mut operations = Operations::new(); + + let path = dir.as_ref().join(&cloud_name); + let dir_entries = fs::read_dir(&path)? .map(|entry| entry.map(|e| e.path())) .collect::<Result<Vec<PathBuf>, _>>()? .into_iter() - .filter(|path| path.is_dir()) - .map(|path| { - let filename = path.file_name(); - filename.unwrap().to_str().unwrap().to_string() - }) - .collect()) -} + .filter(|path| path.is_file()) + .collect::<Vec<PathBuf>>(); -fn get_operations(dir: impl AsRef<Path>) -> Result<OperationsMap, OperationsError> { - let mut operations = OperationsMap::new(); - for cloud in get_clouds(&dir)? { - let path = dir.as_ref().join(cloud.as_str()); - let operations_map = fs::read_dir(&path)? - .map(|entry| entry.map(|e| e.path())) - .collect::<Result<Vec<PathBuf>, _>>()? - .into_iter() - .filter(|path| path.is_file()) - .map(|path| { - let filename = path.file_name(); - filename.unwrap().to_str().unwrap().to_string() - }) - .collect(); - operations.insert(cloud, operations_map); + for path in dir_entries { + let mut details = match fs::read(&path) { + Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) + .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, + + Err(err) => return Err(OperationsError::FromIo(err)), + }; + + details.name = path + .file_name() + .and_then(|filename| filename.to_str()) + .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? + .to_owned(); + + operations.add(details); } Ok(operations) } #[cfg(test)] mod tests { + use std::io::Write; + use super::*; use test_case::test_case; - #[test_case(0, false)] - #[test_case(0, true)] - #[test_case(2, false)] - #[test_case(2, true)] - fn get_clouds_tests(clouds_count: usize, files: bool) { - let operations = TestOperations::builder().with_clouds(clouds_count); - - if files { - operations.with_random_file_in_clouds_directory(); - } - - let operations = operations.build(); - - let clouds = get_clouds(operations.temp_dir()).unwrap(); - - assert_eq!(clouds.len(), clouds_count); - } - - #[test_case(0, 0)] - #[test_case(1, 1)] - #[test_case(1, 5)] - #[test_case(2, 5)] - fn get_operations_all(clouds_count: usize, ops_count: usize) { - let test_operations = TestOperations::builder() - .with_clouds(clouds_count) - .with_operations(ops_count) - .build(); - - let operations = get_operations(test_operations.temp_dir()).unwrap(); - - assert_eq!(operations.len(), clouds_count); - assert_eq!( - operations.values().map(|ops| ops.len()).sum::<usize>(), - ops_count * clouds_count - ); - } - // Structs for state change with the builder pattern - // Structs for Clouds - struct Clouds(Vec<PathBuf>); - struct NoClouds; - // Structs for Operations struct Ops(Vec<PathBuf>); struct NoOps; - struct TestOperationsBuilder<C, O> { + struct TestOperationsBuilder<O> { temp_dir: tempfile::TempDir, - clouds: C, operations: O, } - impl TestOperationsBuilder<NoClouds, NoOps> { + impl TestOperationsBuilder<NoOps> { fn new() -> Self { Self { temp_dir: tempfile::tempdir().unwrap(), - clouds: NoClouds, operations: NoOps, } } } - impl<O> TestOperationsBuilder<NoClouds, O> { - fn with_clouds(self, clouds_count: usize) -> TestOperationsBuilder<Clouds, O> { - let Self { - temp_dir, - operations, - .. - } = self; - - let mut clouds = Vec::new(); - for i in 0..clouds_count { - let cloud = temp_dir.as_ref().join(format!("cloud{}", i)); - fs::create_dir(&cloud).unwrap(); - clouds.push(cloud); - } - - TestOperationsBuilder { - temp_dir, - clouds: Clouds(clouds), - operations, - } - } - } - - impl TestOperationsBuilder<Clouds, NoOps> { - fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Clouds, Ops> { - let Self { - temp_dir, clouds, .. - } = self; + impl TestOperationsBuilder<NoOps> { + fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> { + let Self { temp_dir, .. } = self; let mut operations = Vec::new(); - clouds.0.iter().for_each(|path| { - for i in 0..operations_count { - let file_path = path.join(format!("operation{}", i)); - fs::File::create(&file_path).unwrap(); - operations.push(file_path); - } - }); + for i in 0..operations_count { + let file_path = temp_dir.path().join(format!("operation{}", i)); + let mut file = fs::File::create(&file_path).unwrap(); + file.write_all( + br#"[exec] + command = "echo" + on_message = "511""#, + ) + .unwrap(); + operations.push(file_path); + } TestOperationsBuilder { operations: Ops(operations), temp_dir, - clouds, - } - } - - fn build(self) -> TestOperations { - let Self { - temp_dir, clouds, .. - } = self; - - TestOperations { - temp_dir, - clouds: clouds.0, - operations: Vec::new(), } } } - impl<C, O> TestOperationsBuilder<C, O> { - fn with_random_file_in_clouds_directory(&self) { - let path = self.temp_dir.as_ref().join("cloudfile"); - fs::File::create(path).unwrap(); - } - } - - impl TestOperationsBuilder<Clouds, Ops> { + impl TestOperationsBuilder<Ops> { fn build(self) -> TestOperations { let Self { temp_dir, - clouds, operations, } = self; TestOperations { temp_dir, - clouds: clouds.0, operations: operations.0, } } @@ -221,21 +188,28 @@ mod tests { struct TestOperations { temp_dir: tempfile::TempDir, - clouds: Vec<PathBuf>, operations: Vec<PathBuf>, } impl TestOperations { - fn builder() -> TestOperationsBuilder<NoClouds, NoOps> { + fn builder() -> TestOperationsBuilder<NoOps> { TestOperationsBuilder::new() } fn temp_dir(&self) -> &tempfile::TempDir { &self.temp_dir } + } - fn operations(&self) -> &Vec<PathBuf> { - &self.operations - } + #[test_case(0)] + #[test_case(1)] + #[test_case(5)] + fn get_operations_all(ops_count: usize) { + let test_operations = TestOperations::builder().with_operations(ops_count).build(); + + let operations = get_operations(test_operations.temp_dir(), "").unwrap(); + dbg!(&operations); + + assert_eq!(operations.operations.len(), ops_count); } } diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs index 8b6a177b..b0e440a9 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs @@ -43,4 +43,10 @@ pub enum SMCumulocityMapperError { #[error("Request timed out")] RequestTimeout, + + #[error("Operation execution failed: {0}")] + ExecuteFailed(String), + + #[error("An unknown operation template: {0}")] + UnknownOperation(String), } diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index f899d482..46d88b85 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -1,8 +1,15 @@ -use crate::component::TEdgeComponent; -use crate::mapper::mqtt_config; -use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; -use crate::sm_c8y_mapper::topic::*; -use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse}; +use crate::{ + component::TEdgeComponent, + mapper::mqtt_config, + operations::Operations, + sm_c8y_mapper::{ + error::*, + http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}, + json_c8y::C8yUpdateSoftwareListResponse, + topic::*, + }, +}; + use agent_interface::{ topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, @@ -23,8 +30,8 @@ use chrono::{DateTime, FixedOffset}; use download::{Auth, DownloadInfo}; use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter}; use serde::{Deserialize, Serialize}; -use std::convert::TryInto; use std::path::PathBuf; +use std::{convert::TryInto, process::Stdio}; use tedge_config::TEdgeConfig; use tracing::{debug, error, info, instrument}; @@ -46,8 +53,9 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?; let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?; + let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations); let messages = sm_mapper.subscribe().await?; let () = sm_mapper.run(messages).await?; @@ -62,14 +70,19 @@ where { pub client: Client, http_proxy: Proxy, + operations: Operations, } impl<Proxy> CumulocitySoftwareManagement<Proxy> where Proxy: C8YHttpProxy, { - pub fn new(client: Client, http_proxy: Proxy) -> Self { - Self { client, http_proxy } + pub fn new(client: Client, http_proxy: Proxy, operations: Operations) -> Self { + Self { + client, + http_proxy, + operations, + } } pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> { @@ -77,8 +90,12 @@ where topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?; topic_filter.add(C8yTopic::SmartRestRequest.as_str())?; topic_filter.add(ResponseTopic::RestartResponse.as_str())?; - let messages = self.client.subscribe(topic_filter).await?; + for topic in self.operations.topics_for_operations() { + topic_filter.add(&topic)? + } + + let messages = self.client.subscribe(topic_filter).await?; Ok(messages) } @@ -128,9 +145,18 @@ where "510" => { let () = self.forward_restart_request(payload).await?; } - _ => { - return Err(SMCumulocityMapperError::InvalidMqttMessage); - } + template => match self.operations.matching_smartrest_template(template) { + Some(operation) => { + if let Some(command) = operation.command() { + execute_operation(payload, command.as_str()).await?; + } + } + None => { + return Err(SMCumulocityMapperError::UnknownOperation( + template.to_string(), + )); + } + }, } Ok(()) @@ -161,13 +187,10 @@ where .publish_restart_operation_status(message.payload_str()?) .await?; } - MapperSubscribeTopic::C8yTopic(C8yTopic::SmartRestRequest) => { + MapperSubscribeTopic::C8yTopic(_) => { debug!("Cumulocity"); let () = self.process_smartrest(message.payload_str()?).await?; } - _ => { - eprintln!("Invalid MapperSubscriberTopic"); - } } } Ok(()) @@ -327,7 +350,7 @@ where // 3. upload log file let binary_upload_event_url = self .http_proxy - .upload_log_binary(&log_output.as_str()) + .upload_log_binary(log_output.as_str()) .await?; // 4. set log file request to done @@ -409,6 +432,26 @@ where } } +async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> { + let command = command.to_owned(); + let payload = payload.to_string(); + + let _handle = tokio::spawn(async move { + let mut child = tokio::process::Command::new(command) + .args(&[payload]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string())) + .unwrap(); + + child.wait().await + }); + + Ok(()) +} + #[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] /// used to retrieve the id of a log event diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs index 4372a8df..666edbe9 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs @@ -1,7 +1,7 @@ -use crate::sm_c8y_mapper::error::SMCumulocityMapperError; use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use crate::sm_c8y_mapper::json_c8y::C8yUpdateSoftwareListResponse; use crate::sm_c8y_mapper::mapper::CumulocitySoftwareManagement; +use crate::{operations::Operations, sm_c8y_mapper::error::SMCumulocityMapperError}; use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; use mqtt_client::Client; use mqtt_tests::test_mqtt_server::MqttProcessHandler; @@ -13,7 +13,8 @@ use tokio::task::JoinHandle; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000); -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_a_software_list_request() { // The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`. @@ -38,7 +39,8 @@ async fn mapper_publishes_a_software_list_request() { sm_mapper.unwrap().abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() { // The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists. @@ -58,7 +60,8 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 sm_mapper.unwrap().abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_software_update_request() { // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` @@ -100,7 +103,8 @@ async fn mapper_publishes_software_update_request() { sm_mapper.unwrap().abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_software_update_status_onto_c8y_topic() { // The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update` @@ -165,7 +169,8 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { sm_mapper.unwrap().abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { let broker = mqtt_tests::test_mqtt_broker(); @@ -220,7 +225,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { sm_mapper.unwrap().abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore] #[serial] async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error> @@ -339,7 +344,8 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] #[serial] async fn mapper_publishes_software_update_request_with_wrong_action() { // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` @@ -448,7 +454,9 @@ async fn start_sm_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error let mqtt_client = Client::connect("SM-C8Y-Mapper-Test", &mqtt_config).await?; let http_proxy = FakeC8YHttpProxy {}; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); + + let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations); let messages = sm_mapper.subscribe().await?; let mapper_task = tokio::spawn(async move { diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs index dbc4f55a..2fc3c9d4 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs @@ -6,21 +6,20 @@ use std::convert::{TryFrom, TryInto}; pub enum C8yTopic { SmartRestRequest, SmartRestResponse, + OperationTopic(String), } impl C8yTopic { - pub fn as_str(&self) -> &'static str { + pub fn as_str(&self) -> &str { match self { Self::SmartRestRequest => r#"c8y/s/ds"#, Self::SmartRestResponse => r#"c8y/s/us"#, + Self::OperationTopic(name) => name.as_str(), } } pub fn to_topic(&self) -> Result<Topic, MqttClientError> { - match self { - Self::SmartRestRequest => Topic::new(Self::SmartRestRequest.as_str()), - Self::SmartRestResponse => Topic::new(Self::SmartRestResponse.as_str()), - } + Ok(Topic::new(self.as_str())?) } } @@ -31,9 +30,15 @@ impl TryFrom<String> for C8yTopic { match value.as_str() { r#"c8y/s/ds"# => Ok(C8yTopic::SmartRestRequest), r#"c8y/s/us"# => Ok(C8yTopic::SmartRestResponse), - err => Err(TopicError::UnknownTopic { - topic: err.to_string(), - }), + topic_name => { + if topic_name[..3].contains("c8y") { + Ok(C8yTopic::OperationTopic(topic_name.to_string())) + } else { + Err(TopicError::UnknownTopic { + topic: topic_name.to_string(), + }) + } + } } } } diff --git a/docs/src/tutorials/supported_operations.md b/docs/src/tutorials/supported_operations.md index 3a5a7af5..a5e228d3 100644 --- a/docs/src/tutorials/supported_operations.md +++ b/docs/src/tutorials/supported_operations.md @@ -10,7 +10,7 @@ IoT devices often do more than just send data to the cloud. They also do things * reboot on demand * install or remove software -These operations that are supported by [Cumulocity IoT](https://cumulocity.com/api/10.11.0/#section/Device-management-library) and other cloud providers. +These operations are supported by [Cumulocity IoT](https://cumulocity.com/api/10.11.0/#section/Device-management-library) and other cloud providers. On `thin-edge.io` the support for one such operation can be added using the `thin-edge.io` Supported Operations API. ### `thin-edge.io` Supported Operations API @@ -18,6 +18,17 @@ On `thin-edge.io` the support for one such operation can be added using the `thi The Supported Operations utilises the file system to add and remove operations. A special file placed in `/etc/tedge/operations` directory will indicate that an operation is supported. The specification for the operation files is described in `thin-edge.io` specifications repository[src/supported-operations/README.md](https://github.com/thin-edge/thin-edge.io-specs/blob/main/src/supported-operations/README.md) +Supported operations are declared in the cloud specific subdirectory of `/etc/tedge/operations` directory. + +## Custom Supported Operations + +`thin-edge.io` supports custom operations by using configuration files and plugin mechanism similar to what software management agent does. + +The main difference between custom operations and native operations is that custom operations are have to be defined in configuration files and provide their own implementation in a callable `plugin` executable. +As per specification the configuration file needs to be a `toml` file which describes the operation. + +`thin-edge.io` stores the operations configuration files in the `/etc/tedge/operations/<cloud-provider>/` directory. + ## `thin-edge.io` List of Supported Operations `thin-edge.io` supports natively the following operations: @@ -95,3 +106,56 @@ To remove supported operation we can remove the file from `/etc/tedge/operations ```shell sudo rm /etc/tedge/operations/c8y/c8y_Restart ``` + +## Working with custom operations + +We will use the `thin-edge.io` Supported Operations API to add custom operations. Our new operation is going to be capability to execute shell commands on the device. +Let's create the operation configuration file: + +We need to tell `thin-edge.io` how to handle the operation and how to execute it. + +### Adding new custom operation + +In Cumulocity IoT we know that there is an operation call c8y_Command which allows us to send commands to the device and get the result back to the cloud, let's create the configuration file for our new operation: + +First we create a file with the name of the operation: + +```shell +sudo -u tedge touch /etc/tedge/operations/c8y/c8y_Command +``` + +> Note: the needs to be readable by `thin-edge.io` user - `tedge` - and should have permissions `644`. + +In this example we want `thin-edge.io` to pick up a message on specific topic and execute the command on the device, our topic is `c8y/s/ds`. +We also know that the message we expect is going to use SmartRest template `511` and our plugin is located in `/etc/tedge/operations/command`. +Then we need to add the configuration to the file (`/etc/tedge/operations/c8y/c8y_Command`): + +```toml +[exec] + topic = "c8y/s/ds" + on_message = "511" + command = "/etc/tedge/operations/command" +``` + +And now the content of our command plugin: + +```shell +#!/usr/bin/sh + +mosquitto_pub -t c8y/s/us -m 501,c8y_Command + +OUTPUT=$(echo $1) + +mosquitto_pub -t c8y/s/us -m 503,c8y_Command,"$OUTPUT" +``` + +This simple example will execute the command `echo $1` and send the result back to the cloud. + +> Note: The command will be executed with tedge-mapper permission level so most of the system level commands will not work. + + +### List of currently supported operations parameters + +* `topic` - The topic on which the operation will be executed. +* `on_message` - The SmartRest templ |