summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--crates/core/tedge_mapper/Cargo.toml1
-rw-r--r--crates/core/tedge_mapper/src/c8y_converter.rs7
-rw-r--r--crates/core/tedge_mapper/src/error.rs8
-rw-r--r--crates/core/tedge_mapper/src/operations.rs272
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs6
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs79
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs26
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs21
-rw-r--r--docs/src/tutorials/supported_operations.md66
10 files changed, 299 insertions, 188 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 33423db6..ab6f75a4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2728,6 +2728,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-test",
+ "toml",
"tracing",
]
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index 6384e076..25f2a14d 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -49,6 +49,7 @@ tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
thin_edge_json = { path = "../thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.8", features = ["rt", "sync", "time"] }
+toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs
index 6fb9997a..ee85ade7 100644
--- a/crates/core/tedge_mapper/src/c8y_converter.rs
+++ b/crates/core/tedge_mapper/src/c8y_converter.rs
@@ -103,13 +103,14 @@ impl Converter for CumulocityConverter {
} else if input.topic.name.starts_with("tedge/alarms") {
self.try_convert_alarm(input)
} else {
- return Err(ConversionError::UnsupportedTopic(input.topic.name.clone()));
+ Err(ConversionError::UnsupportedTopic(input.topic.name.clone()))
}
}
fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> {
- let ops = Operations::try_new("/etc/tedge/operations")?;
- let ops = ops.get_operations_list("c8y");
+ let ops = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let ops = ops.get_operations_list();
+ let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>();
let ops_msg = SmartRestSetSupportedOperations::new(&ops);
let topic = Topic::new_unchecked("c8y/s/us");
diff --git a/crates/core/tedge_mapper/src/error.rs b/crates/core/tedge_mapper/src/error.rs
index b1aa90b5..93bc1041 100644
--- a/crates/core/tedge_mapper/src/error.rs
+++ b/crates/core/tedge_mapper/src/error.rs
@@ -1,3 +1,5 @@
+use std::path::PathBuf;
+
use crate::size_threshold::SizeThresholdExceeded;
use mqtt_client::MqttClientError;
use tedge_config::TEdgeConfigError;
@@ -61,4 +63,10 @@ pub enum ConversionError {
pub enum OperationsError {
#[error(transparent)]
FromIo(#[from] std::io::Error),
+
+ #[error("Cannot extract the operation name from the path: {0}")]
+ InvalidOperationName(PathBuf),
+
+ #[error("Error while parsing operation file: '{0}': {1}.")]
+ TomlError(PathBuf, #[source] toml::de::Error),
}
diff --git a/crates/core/tedge_mapper/src/operations.rs b/crates/core/tedge_mapper/src/operations.rs
index b4dbe2a7..b3909aac 100644
--- a/crates/core/tedge_mapper/src/operations.rs
+++ b/crates/core/tedge_mapper/src/operations.rs
@@ -4,216 +4,183 @@ use std::{
path::{Path, PathBuf},
};
+use serde::Deserialize;
+
use crate::error::OperationsError;
/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory
/// Each operation is a file name in one of the subdirectories
/// The file name is the operation name
-type Cloud = String;
-type OperationName = String;
-type Operation = HashSet<OperationName>;
-type OperationsMap = HashMap<Cloud, Operation>;
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub struct OnMessageExec {
+ command: Option<String>,
+ on_message: Option<String>,
+ topic: Option<String>,
+ user: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "lowercase")]
+pub struct Operation {
+ #[serde(skip)]
+ name: String,
+ exec: Option<OnMessageExec>,
+}
+
+impl Operation {
+ pub fn exec(&self) -> Option<&OnMessageExec> {
+ self.exec.as_ref()
+ }
+
+ pub fn command(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.command.clone())
+ }
-#[derive(Debug, Clone, PartialEq)]
+ pub fn topic(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.topic.clone())
+ }
+}
+
+#[derive(Debug, Clone)]
pub struct Operations {
- cloud: PathBuf,
- operations: OperationsMap,
+ operations: Vec<Operation>,
+ operations_by_trigger: HashMap<String, usize>,
}
impl Operations {
- pub fn try_new(dir: impl AsRef<Path>) -> Result<Self, OperationsError> {
- let operations = get_operations(dir.as_ref())?;
+ pub fn new() -> Self {
+ Self {
+ operations: vec![],
+ operations_by_trigger: HashMap::new(),
+ }
+ }
- Ok(Self {
- cloud: dir.as_ref().to_path_buf(),
- operations,
- })
+ pub fn add(&mut self, operation: Operation) {
+ if let Some(detail) = operation.exec() {
+ if let Some(on_message) = &detail.on_message {
+ self.operations_by_trigger
+ .insert(on_message.clone(), self.operations.len());
+ }
+ }
+ self.operations.push(operation);
}
- pub fn get_operations_list(&self, cloud: &str) -> Vec<&str> {
+ pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> {
+ get_operations(dir.as_ref(), cloud_name)
+ }
+
+ pub fn get_operations_list(&self) -> Vec<String> {
self.operations
- .get(cloud)
- .map(|operations| operations.iter().map(|k| k.as_str()).collect())
- .unwrap_or_default()
+ .iter()
+ .map(|operation| operation.name.clone())
+ .collect::<Vec<String>>()
+ }
+
+ pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> {
+ self.operations_by_trigger
+ .get(operation_template)
+ .and_then(|index| self.operations.get(*index))
+ }
+
+ pub fn topics_for_operations(&self) -> HashSet<String> {
+ self.operations
+ .iter()
+ .filter_map(|operation| operation.topic())
+ .collect::<HashSet<String>>()
}
}
-fn get_clouds(dir: impl AsRef<Path>) -> Result<Vec<String>, OperationsError> {
- Ok(fs::read_dir(dir)?
+fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> {
+ let mut operations = Operations::new();
+
+ let path = dir.as_ref().join(&cloud_name);
+ let dir_entries = fs::read_dir(&path)?
.map(|entry| entry.map(|e| e.path()))
.collect::<Result<Vec<PathBuf>, _>>()?
.into_iter()
- .filter(|path| path.is_dir())
- .map(|path| {
- let filename = path.file_name();
- filename.unwrap().to_str().unwrap().to_string()
- })
- .collect())
-}
+ .filter(|path| path.is_file())
+ .collect::<Vec<PathBuf>>();
-fn get_operations(dir: impl AsRef<Path>) -> Result<OperationsMap, OperationsError> {
- let mut operations = OperationsMap::new();
- for cloud in get_clouds(&dir)? {
- let path = dir.as_ref().join(cloud.as_str());
- let operations_map = fs::read_dir(&path)?
- .map(|entry| entry.map(|e| e.path()))
- .collect::<Result<Vec<PathBuf>, _>>()?
- .into_iter()
- .filter(|path| path.is_file())
- .map(|path| {
- let filename = path.file_name();
- filename.unwrap().to_str().unwrap().to_string()
- })
- .collect();
- operations.insert(cloud, operations_map);
+ for path in dir_entries {
+ let mut details = match fs::read(&path) {
+ Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
+ .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
+
+ Err(err) => return Err(OperationsError::FromIo(err)),
+ };
+
+ details.name = path
+ .file_name()
+ .and_then(|filename| filename.to_str())
+ .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
+ .to_owned();
+
+ operations.add(details);
}
Ok(operations)
}
#[cfg(test)]
mod tests {
+ use std::io::Write;
+
use super::*;
use test_case::test_case;
- #[test_case(0, false)]
- #[test_case(0, true)]
- #[test_case(2, false)]
- #[test_case(2, true)]
- fn get_clouds_tests(clouds_count: usize, files: bool) {
- let operations = TestOperations::builder().with_clouds(clouds_count);
-
- if files {
- operations.with_random_file_in_clouds_directory();
- }
-
- let operations = operations.build();
-
- let clouds = get_clouds(operations.temp_dir()).unwrap();
-
- assert_eq!(clouds.len(), clouds_count);
- }
-
- #[test_case(0, 0)]
- #[test_case(1, 1)]
- #[test_case(1, 5)]
- #[test_case(2, 5)]
- fn get_operations_all(clouds_count: usize, ops_count: usize) {
- let test_operations = TestOperations::builder()
- .with_clouds(clouds_count)
- .with_operations(ops_count)
- .build();
-
- let operations = get_operations(test_operations.temp_dir()).unwrap();
-
- assert_eq!(operations.len(), clouds_count);
- assert_eq!(
- operations.values().map(|ops| ops.len()).sum::<usize>(),
- ops_count * clouds_count
- );
- }
-
// Structs for state change with the builder pattern
- // Structs for Clouds
- struct Clouds(Vec<PathBuf>);
- struct NoClouds;
-
// Structs for Operations
struct Ops(Vec<PathBuf>);
struct NoOps;
- struct TestOperationsBuilder<C, O> {
+ struct TestOperationsBuilder<O> {
temp_dir: tempfile::TempDir,
- clouds: C,
operations: O,
}
- impl TestOperationsBuilder<NoClouds, NoOps> {
+ impl TestOperationsBuilder<NoOps> {
fn new() -> Self {
Self {
temp_dir: tempfile::tempdir().unwrap(),
- clouds: NoClouds,
operations: NoOps,
}
}
}
- impl<O> TestOperationsBuilder<NoClouds, O> {
- fn with_clouds(self, clouds_count: usize) -> TestOperationsBuilder<Clouds, O> {
- let Self {
- temp_dir,
- operations,
- ..
- } = self;
-
- let mut clouds = Vec::new();
- for i in 0..clouds_count {
- let cloud = temp_dir.as_ref().join(format!("cloud{}", i));
- fs::create_dir(&cloud).unwrap();
- clouds.push(cloud);
- }
-
- TestOperationsBuilder {
- temp_dir,
- clouds: Clouds(clouds),
- operations,
- }
- }
- }
-
- impl TestOperationsBuilder<Clouds, NoOps> {
- fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Clouds, Ops> {
- let Self {
- temp_dir, clouds, ..
- } = self;
+ impl TestOperationsBuilder<NoOps> {
+ fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> {
+ let Self { temp_dir, .. } = self;
let mut operations = Vec::new();
- clouds.0.iter().for_each(|path| {
- for i in 0..operations_count {
- let file_path = path.join(format!("operation{}", i));
- fs::File::create(&file_path).unwrap();
- operations.push(file_path);
- }
- });
+ for i in 0..operations_count {
+ let file_path = temp_dir.path().join(format!("operation{}", i));
+ let mut file = fs::File::create(&file_path).unwrap();
+ file.write_all(
+ br#"[exec]
+ command = "echo"
+ on_message = "511""#,
+ )
+ .unwrap();
+ operations.push(file_path);
+ }
TestOperationsBuilder {
operations: Ops(operations),
temp_dir,
- clouds,
- }
- }
-
- fn build(self) -> TestOperations {
- let Self {
- temp_dir, clouds, ..
- } = self;
-
- TestOperations {
- temp_dir,
- clouds: clouds.0,
- operations: Vec::new(),
}
}
}
- impl<C, O> TestOperationsBuilder<C, O> {
- fn with_random_file_in_clouds_directory(&self) {
- let path = self.temp_dir.as_ref().join("cloudfile");
- fs::File::create(path).unwrap();
- }
- }
-
- impl TestOperationsBuilder<Clouds, Ops> {
+ impl TestOperationsBuilder<Ops> {
fn build(self) -> TestOperations {
let Self {
temp_dir,
- clouds,
operations,
} = self;
TestOperations {
temp_dir,
- clouds: clouds.0,
operations: operations.0,
}
}
@@ -221,21 +188,28 @@ mod tests {
struct TestOperations {
temp_dir: tempfile::TempDir,
- clouds: Vec<PathBuf>,
operations: Vec<PathBuf>,
}
impl TestOperations {
- fn builder() -> TestOperationsBuilder<NoClouds, NoOps> {
+ fn builder() -> TestOperationsBuilder<NoOps> {
TestOperationsBuilder::new()
}
fn temp_dir(&self) -> &tempfile::TempDir {
&self.temp_dir
}
+ }
- fn operations(&self) -> &Vec<PathBuf> {
- &self.operations
- }
+ #[test_case(0)]
+ #[test_case(1)]
+ #[test_case(5)]
+ fn get_operations_all(ops_count: usize) {
+ let test_operations = TestOperations::builder().with_operations(ops_count).build();
+
+ let operations = get_operations(test_operations.temp_dir(), "").unwrap();
+ dbg!(&operations);
+
+ assert_eq!(operations.operations.len(), ops_count);
}
}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
index 8b6a177b..b0e440a9 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
@@ -43,4 +43,10 @@ pub enum SMCumulocityMapperError {
#[error("Request timed out")]
RequestTimeout,
+
+ #[error("Operation execution failed: {0}")]
+ ExecuteFailed(String),
+
+ #[error("An unknown operation template: {0}")]
+ UnknownOperation(String),
}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index f899d482..46d88b85 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -1,8 +1,15 @@
-use crate::component::TEdgeComponent;
-use crate::mapper::mqtt_config;
-use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
-use crate::sm_c8y_mapper::topic::*;
-use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse};
+use crate::{
+ component::TEdgeComponent,
+ mapper::mqtt_config,
+ operations::Operations,
+ sm_c8y_mapper::{
+ error::*,
+ http_proxy::{C8YHttpProxy, JwtAuthHttpProxy},
+ json_c8y::C8yUpdateSoftwareListResponse,
+ topic::*,
+ },
+};
+
use agent_interface::{
topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse,
SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
@@ -23,8 +30,8 @@ use chrono::{DateTime, FixedOffset};
use download::{Auth, DownloadInfo};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
use serde::{Deserialize, Serialize};
-use std::convert::TryInto;
use std::path::PathBuf;
+use std::{convert::TryInto, process::Stdio};
use tedge_config::TEdgeConfig;
use tracing::{debug, error, info, instrument};
@@ -46,8 +53,9 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper {
let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?;
+ let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?;
- let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy);
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations);
let messages = sm_mapper.subscribe().await?;
let () = sm_mapper.run(messages).await?;
@@ -62,14 +70,19 @@ where
{
pub client: Client,
http_proxy: Proxy,
+ operations: Operations,
}
impl<Proxy> CumulocitySoftwareManagement<Proxy>
where
Proxy: C8YHttpProxy,
{
- pub fn new(client: Client, http_proxy: Proxy) -> Self {
- Self { client, http_proxy }
+ pub fn new(client: Client, http_proxy: Proxy, operations: Operations) -> Self {
+ Self {
+ client,
+ http_proxy,
+ operations,
+ }
}
pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
@@ -77,8 +90,12 @@ where
topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?;
topic_filter.add(C8yTopic::SmartRestRequest.as_str())?;
topic_filter.add(ResponseTopic::RestartResponse.as_str())?;
- let messages = self.client.subscribe(topic_filter).await?;
+ for topic in self.operations.topics_for_operations() {
+ topic_filter.add(&topic)?
+ }
+
+ let messages = self.client.subscribe(topic_filter).await?;
Ok(messages)
}
@@ -128,9 +145,18 @@ where
"510" => {
let () = self.forward_restart_request(payload).await?;
}
- _ => {
- return Err(SMCumulocityMapperError::InvalidMqttMessage);
- }
+ template => match self.operations.matching_smartrest_template(template) {
+ Some(operation) => {
+ if let Some(command) = operation.command() {
+ execute_operation(payload, command.as_str()).await?;
+ }
+ }
+ None => {
+ return Err(SMCumulocityMapperError::UnknownOperation(
+ template.to_string(),
+ ));
+ }
+ },
}
Ok(())
@@ -161,13 +187,10 @@ where
.publish_restart_operation_status(message.payload_str()?)
.await?;
}
- MapperSubscribeTopic::C8yTopic(C8yTopic::SmartRestRequest) => {
+ MapperSubscribeTopic::C8yTopic(_) => {
debug!("Cumulocity");
let () = self.process_smartrest(message.payload_str()?).await?;
}
- _ => {
- eprintln!("Invalid MapperSubscriberTopic");
- }
}
}
Ok(())
@@ -327,7 +350,7 @@ where
// 3. upload log file
let binary_upload_event_url = self
.http_proxy
- .upload_log_binary(&log_output.as_str())
+ .upload_log_binary(log_output.as_str())
.await?;
// 4. set log file request to done
@@ -409,6 +432,26 @@ where
}
}
+async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> {
+ let command = command.to_owned();
+ let payload = payload.to_string();
+
+ let _handle = tokio::spawn(async move {
+ let mut child = tokio::process::Command::new(command)
+ .args(&[payload])
+ .stdin(Stdio::null())
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .spawn()
+ .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string()))
+ .unwrap();
+
+ child.wait().await
+ });
+
+ Ok(())
+}
+
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
/// used to retrieve the id of a log event
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
index 4372a8df..666edbe9 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
@@ -1,7 +1,7 @@
-use crate::sm_c8y_mapper::error::SMCumulocityMapperError;
use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use crate::sm_c8y_mapper::json_c8y::C8yUpdateSoftwareListResponse;
use crate::sm_c8y_mapper::mapper::CumulocitySoftwareManagement;
+use crate::{operations::Operations, sm_c8y_mapper::error::SMCumulocityMapperError};
use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
use mqtt_client::Client;
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
@@ -13,7 +13,8 @@ use tokio::task::JoinHandle;
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000);
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_a_software_list_request() {
// The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`.
@@ -38,7 +39,8 @@ async fn mapper_publishes_a_software_list_request() {
sm_mapper.unwrap().abort();
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() {
// The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists.
@@ -58,7 +60,8 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
sm_mapper.unwrap().abort();
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_software_update_request() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
@@ -100,7 +103,8 @@ async fn mapper_publishes_software_update_request() {
sm_mapper.unwrap().abort();
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update`
@@ -165,7 +169,8 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
sm_mapper.unwrap().abort();
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
let broker = mqtt_tests::test_mqtt_broker();
@@ -220,7 +225,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
sm_mapper.unwrap().abort();
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
#[serial]
async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error>
@@ -339,7 +344,8 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
Ok(())
}
-#[tokio::test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[ignore]
#[serial]
async fn mapper_publishes_software_update_request_with_wrong_action() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
@@ -448,7 +454,9 @@ async fn start_sm_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error
let mqtt_client = Client::connect("SM-C8Y-Mapper-Test", &mqtt_config).await?;
let http_proxy = FakeC8YHttpProxy {};
- let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy);
+
+ let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations);
let messages = sm_mapper.subscribe().await?;
let mapper_task = tokio::spawn(async move {
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
index dbc4f55a..2fc3c9d4 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
@@ -6,21 +6,20 @@ use std::convert::{TryFrom, TryInto};
pub enum C8yTopic {
SmartRestRequest,
SmartRestResponse,
+ OperationTopic(String),
}
impl C8yTopic {
- pub fn as_str(&self) -> &'static str {
+ pub fn as_str(&self) -> &str {
match self {
Self::SmartRestRequest => r#"c8y/s/ds"#,
Self::SmartRestResponse => r#"c8y/s/us"#,
+ Self::OperationTopic(name) => name.as_str(),
}
}
pub fn to_topic(&self) -> Result<Topic, MqttClientError> {
- match self {
- Self::SmartRestRequest => Topic::new(Self::SmartRestRequest.as_str()),
- Self::SmartRestResponse => Topic::new(Self::SmartRestResponse.as_str()),
- }
+ Ok(Topic::new(self.as_str())?)
}
}
@@ -31,9 +30,15 @@ impl TryFrom<String> for C8yTopic {
match value.as_str() {
r#"c8y/s/ds"# => Ok(C8yTopic::SmartRestRequest),
r#"c8y/s/us"# => Ok(C8yTopic::SmartRestResponse),
- err => Err(TopicError::UnknownTopic {
- topic: err.to_string(),
- }),
+ topic_name => {
+ if topic_name[..3].contains("c8y") {
+ Ok(C8yTopic::OperationTopic(topic_name.to_string()))
+ } else {
+ Err(TopicError::UnknownTopic {
+ topic: topic_name.to_string(),
+ })
+ }
+ }
}
}
}
diff --git a/docs/src/tutorials/supported_operations.md b/docs/src/tutorials/supported_operations.md
index 3a5a7af5..a5e228d3 100644
--- a/docs/src/tutorials/supported_operations.md
+++ b/docs/src/tutorials/supported_operations.md
@@ -10,7 +10,7 @@ IoT devices often do more than just send data to the cloud. They also do things
* reboot on demand
* install or remove software
-These operations that are supported by [Cumulocity IoT](https://cumulocity.com/api/10.11.0/#section/Device-management-library) and other cloud providers.
+These operations are supported by [Cumulocity IoT](https://cumulocity.com/api/10.11.0/#section/Device-management-library) and other cloud providers.
On `thin-edge.io` the support for one such operation can be added using the `thin-edge.io` Supported Operations API.
### `thin-edge.io` Supported Operations API
@@ -18,6 +18,17 @@ On `thin-edge.io` the support for one such operation can be added using the `thi
The Supported Operations utilises the file system to add and remove operations. A special file placed in `/etc/tedge/operations` directory will indicate that an operation is supported.
The specification for the operation files is described in `thin-edge.io` specifications repository[src/supported-operations/README.md](https://github.com/thin-edge/thin-edge.io-specs/blob/main/src/supported-operations/README.md)
+Supported operations are declared in the cloud specific subdirectory of `/etc/tedge/operations` directory.
+
+## Custom Supported Operations
+
+`thin-edge.io` supports custom operations by using configuration files and plugin mechanism similar to what software management agent does.
+
+The main difference between custom operations and native operations is that custom operations are have to be defined in configuration files and provide their own implementation in a callable `plugin` executable.
+As per specification the configuration file needs to be a `toml` file which describes the operation.
+
+`thin-edge.io` stores the operations configuration files in the `/etc/tedge/operations/<cloud-provider>/` directory.
+
## `thin-edge.io` List of Supported Operations
`thin-edge.io` supports natively the following operations:
@@ -95,3 +106,56 @@ To remove supported operation we can remove the file from `/etc/tedge/operations
```shell
sudo rm /etc/tedge/operations/c8y/c8y_Restart
```
+
+## Working with custom operations
+
+We will use the `thin-edge.io` Supported Operations API to add custom operations. Our new operation is going to be capability to execute shell commands on the device.
+Let's create the operation configuration file:
+
+We need to tell `thin-edge.io` how to handle the operation and how to execute it.
+
+### Adding new custom operation
+
+In Cumulocity IoT we know that there is an operation call c8y_Command which allows us to send commands to the device and get the result back to the cloud, let's create the configuration file for our new operation:
+
+First we create a file with the name of the operation:
+
+```shell
+sudo -u tedge touch /etc/tedge/operations/c8y/c8y_Command
+```
+
+> Note: the needs to be readable by `thin-edge.io` user - `tedge` - and should have permissions `644`.
+
+In this example we want `thin-edge.io` to pick up a message on specific topic and execute the command on the device, our topic is `c8y/s/ds`.
+We also know that the message we expect is going to use SmartRest template `511` and our plugin is located in `/etc/tedge/operations/command`.
+Then we need to add the configuration to the file (`/etc/tedge/operations/c8y/c8y_Command`):
+
+```toml
+[exec]
+ topic = "c8y/s/ds"
+ on_message = "511"
+ command = "/etc/tedge/operations/command"
+```
+
+And now the content of our command plugin:
+
+```shell
+#!/usr/bin/sh
+
+mosquitto_pub -t c8y/s/us -m 501,c8y_Command
+
+OUTPUT=$(echo $1)
+
+mosquitto_pub -t c8y/s/us -m 503,c8y_Command,"$OUTPUT"
+```
+
+This simple example will execute the command `echo $1` and send the result back to the cloud.
+
+> Note: The command will be executed with tedge-mapper permission level so most of the system level commands will not work.
+
+
+### List of currently supported operations parameters
+
+* `topic` - The topic on which the operation will be executed.
+* `on_message` - The SmartRest templ