diff options
author | Lukasz Woznicki <75632179+makr11st@users.noreply.github.com> | 2022-01-19 13:05:05 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-19 13:05:05 +0000 |
commit | 31b5dd2a3b00a28fcbd85d1c8883ca2ab8283a16 (patch) | |
tree | 1940deee45ffd4103fc742b2dd02315b43fd0af0 /crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | |
parent | 924814805cd8a0ec8897cb9b88784fd7935657f1 (diff) |
Operations execution from operations files. (#764)
* Add operations reading from operations file and execute when template
called
* Update docs
* Update docs to list supported parameters
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 79 |
1 files changed, 61 insertions, 18 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index f899d482..46d88b85 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -1,8 +1,15 @@ -use crate::component::TEdgeComponent; -use crate::mapper::mqtt_config; -use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; -use crate::sm_c8y_mapper::topic::*; -use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse}; +use crate::{ + component::TEdgeComponent, + mapper::mqtt_config, + operations::Operations, + sm_c8y_mapper::{ + error::*, + http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}, + json_c8y::C8yUpdateSoftwareListResponse, + topic::*, + }, +}; + use agent_interface::{ topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, @@ -23,8 +30,8 @@ use chrono::{DateTime, FixedOffset}; use download::{Auth, DownloadInfo}; use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter}; use serde::{Deserialize, Serialize}; -use std::convert::TryInto; use std::path::PathBuf; +use std::{convert::TryInto, process::Stdio}; use tedge_config::TEdgeConfig; use tracing::{debug, error, info, instrument}; @@ -46,8 +53,9 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?; let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?; + let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations); let messages = sm_mapper.subscribe().await?; let () = sm_mapper.run(messages).await?; @@ -62,14 +70,19 @@ where { pub client: Client, http_proxy: Proxy, + operations: Operations, } impl<Proxy> CumulocitySoftwareManagement<Proxy> where Proxy: C8YHttpProxy, { - pub fn new(client: Client, http_proxy: Proxy) -> Self { - Self { client, http_proxy } + pub fn new(client: Client, http_proxy: Proxy, operations: Operations) -> Self { + Self { + client, + http_proxy, + operations, + } } pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> { @@ -77,8 +90,12 @@ where topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?; topic_filter.add(C8yTopic::SmartRestRequest.as_str())?; topic_filter.add(ResponseTopic::RestartResponse.as_str())?; - let messages = self.client.subscribe(topic_filter).await?; + for topic in self.operations.topics_for_operations() { + topic_filter.add(&topic)? + } + + let messages = self.client.subscribe(topic_filter).await?; Ok(messages) } @@ -128,9 +145,18 @@ where "510" => { let () = self.forward_restart_request(payload).await?; } - _ => { - return Err(SMCumulocityMapperError::InvalidMqttMessage); - } + template => match self.operations.matching_smartrest_template(template) { + Some(operation) => { + if let Some(command) = operation.command() { + execute_operation(payload, command.as_str()).await?; + } + } + None => { + return Err(SMCumulocityMapperError::UnknownOperation( + template.to_string(), + )); + } + }, } Ok(()) @@ -161,13 +187,10 @@ where .publish_restart_operation_status(message.payload_str()?) .await?; } - MapperSubscribeTopic::C8yTopic(C8yTopic::SmartRestRequest) => { + MapperSubscribeTopic::C8yTopic(_) => { debug!("Cumulocity"); let () = self.process_smartrest(message.payload_str()?).await?; } - _ => { - eprintln!("Invalid MapperSubscriberTopic"); - } } } Ok(()) @@ -327,7 +350,7 @@ where // 3. upload log file let binary_upload_event_url = self .http_proxy - .upload_log_binary(&log_output.as_str()) + .upload_log_binary(log_output.as_str()) .await?; // 4. set log file request to done @@ -409,6 +432,26 @@ where } } +async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> { + let command = command.to_owned(); + let payload = payload.to_string(); + + let _handle = tokio::spawn(async move { + let mut child = tokio::process::Command::new(command) + .args(&[payload]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string())) + .unwrap(); + + child.wait().await + }); + + Ok(()) +} + #[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] /// used to retrieve the id of a log event |