summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2022-01-19 13:05:05 +0000
committerGitHub <noreply@github.com>2022-01-19 13:05:05 +0000
commit31b5dd2a3b00a28fcbd85d1c8883ca2ab8283a16 (patch)
tree1940deee45ffd4103fc742b2dd02315b43fd0af0 /crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
parent924814805cd8a0ec8897cb9b88784fd7935657f1 (diff)
Operations execution from operations files. (#764)
* Add operations reading from operations file and execute when template called * Update docs * Update docs to list supported parameters Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs79
1 files changed, 61 insertions, 18 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index f899d482..46d88b85 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -1,8 +1,15 @@
-use crate::component::TEdgeComponent;
-use crate::mapper::mqtt_config;
-use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
-use crate::sm_c8y_mapper::topic::*;
-use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse};
+use crate::{
+ component::TEdgeComponent,
+ mapper::mqtt_config,
+ operations::Operations,
+ sm_c8y_mapper::{
+ error::*,
+ http_proxy::{C8YHttpProxy, JwtAuthHttpProxy},
+ json_c8y::C8yUpdateSoftwareListResponse,
+ topic::*,
+ },
+};
+
use agent_interface::{
topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse,
SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
@@ -23,8 +30,8 @@ use chrono::{DateTime, FixedOffset};
use download::{Auth, DownloadInfo};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
use serde::{Deserialize, Serialize};
-use std::convert::TryInto;
use std::path::PathBuf;
+use std::{convert::TryInto, process::Stdio};
use tedge_config::TEdgeConfig;
use tracing::{debug, error, info, instrument};
@@ -46,8 +53,9 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper {
let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?;
+ let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?;
- let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy);
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations);
let messages = sm_mapper.subscribe().await?;
let () = sm_mapper.run(messages).await?;
@@ -62,14 +70,19 @@ where
{
pub client: Client,
http_proxy: Proxy,
+ operations: Operations,
}
impl<Proxy> CumulocitySoftwareManagement<Proxy>
where
Proxy: C8YHttpProxy,
{
- pub fn new(client: Client, http_proxy: Proxy) -> Self {
- Self { client, http_proxy }
+ pub fn new(client: Client, http_proxy: Proxy, operations: Operations) -> Self {
+ Self {
+ client,
+ http_proxy,
+ operations,
+ }
}
pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
@@ -77,8 +90,12 @@ where
topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?;
topic_filter.add(C8yTopic::SmartRestRequest.as_str())?;
topic_filter.add(ResponseTopic::RestartResponse.as_str())?;
- let messages = self.client.subscribe(topic_filter).await?;
+ for topic in self.operations.topics_for_operations() {
+ topic_filter.add(&topic)?
+ }
+
+ let messages = self.client.subscribe(topic_filter).await?;
Ok(messages)
}
@@ -128,9 +145,18 @@ where
"510" => {
let () = self.forward_restart_request(payload).await?;
}
- _ => {
- return Err(SMCumulocityMapperError::InvalidMqttMessage);
- }
+ template => match self.operations.matching_smartrest_template(template) {
+ Some(operation) => {
+ if let Some(command) = operation.command() {
+ execute_operation(payload, command.as_str()).await?;
+ }
+ }
+ None => {
+ return Err(SMCumulocityMapperError::UnknownOperation(
+ template.to_string(),
+ ));
+ }
+ },
}
Ok(())
@@ -161,13 +187,10 @@ where
.publish_restart_operation_status(message.payload_str()?)
.await?;
}
- MapperSubscribeTopic::C8yTopic(C8yTopic::SmartRestRequest) => {
+ MapperSubscribeTopic::C8yTopic(_) => {
debug!("Cumulocity");
let () = self.process_smartrest(message.payload_str()?).await?;
}
- _ => {
- eprintln!("Invalid MapperSubscriberTopic");
- }
}
}
Ok(())
@@ -327,7 +350,7 @@ where
// 3. upload log file
let binary_upload_event_url = self
.http_proxy
- .upload_log_binary(&log_output.as_str())
+ .upload_log_binary(log_output.as_str())
.await?;
// 4. set log file request to done
@@ -409,6 +432,26 @@ where
}
}
+async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> {
+ let command = command.to_owned();
+ let payload = payload.to_string();
+
+ let _handle = tokio::spawn(async move {
+ let mut child = tokio::process::Command::new(command)
+ .args(&[payload])
+ .stdin(Stdio::null())
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .spawn()
+ .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string()))
+ .unwrap();
+
+ child.wait().await
+ });
+
+ Ok(())
+}
+
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
/// used to retrieve the id of a log event