summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2021-12-06 17:42:17 +0000
committerGitHub <noreply@github.com>2021-12-06 17:42:17 +0000
commit07f7d72b3c207c7c9e651fee5438b6f89fc97371 (patch)
tree3f25b4979428466a4aaaaf883958c40b6c9d8d2d
parent64f727c2ecebbc9026dd58622a405ed69f67173f (diff)
Feature/599/restart device cloud operation (#662)
* cloud restart operation #599 Signed-off-by: initard <solo@softwareag.com> * operation set to executing c8y #599 Signed-off-by: initard <solo@softwareag.com> * cargo fmt Signed-off-by: initard <solo@softwareag.com> * removing debug hard-coded echo Signed-off-by: initard <solo@softwareag.com> * updated handle_restart_opeartion signature in test #599 Signed-off-by: initard <solo@softwareag.com> * test update for c8y_Restart Signed-off-by: initard <solo@softwareag.com> * removed panic on empty restart request #599 Signed-off-by: initard <solo@softwareag.com> Co-authored-by: initard <solo@softwareag.com>
-rw-r--r--crates/core/c8y_smartrest/src/error.rs3
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs28
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs11
-rw-r--r--crates/core/json_sm/src/messages.rs10
-rw-r--r--crates/core/tedge_agent/src/agent.rs27
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs63
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs8
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs6
8 files changed, 144 insertions, 12 deletions
diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs
index 03fd5125..4539ac02 100644
--- a/crates/core/c8y_smartrest/src/error.rs
+++ b/crates/core/c8y_smartrest/src/error.rs
@@ -32,4 +32,7 @@ pub enum SmartRestDeserializerError {
parameter: String,
hint: String,
},
+
+ #[error("Empty request")]
+ EmptyRequest,
}
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index d9868e81..49638301 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -228,6 +228,27 @@ impl SmartRestLogRequest {
}
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub struct SmartRestRestartRequest {
+ pub message_id: String,
+ pub device: String,
+}
+
+impl SmartRestRestartRequest {
+ pub fn from_smartrest(smartrest: &str) -> Result<Self, SmartRestDeserializerError> {
+ let mut rdr = ReaderBuilder::new()
+ .has_headers(false)
+ .flexible(true)
+ .from_reader(smartrest.as_bytes());
+
+ match rdr.deserialize().next() {
+ Some(Ok(record)) => Ok(record),
+ Some(Err(err)) => Err(err)?,
+ None => Err(SmartRestDeserializerError::EmptyRequest),
+ }
+ }
+}
+
type JwtToken = String;
#[derive(Debug, Deserialize, PartialEq)]
@@ -540,4 +561,11 @@ mod tests {
let log = SmartRestLogRequest::from_smartrest(&smartrest);
assert!(log.is_ok());
}
+
+ #[test]
+ fn deserialize_smartrest_restart_request_operation() {
+ let smartrest = String::from(&format!("510,user"));
+ let log = SmartRestRestartRequest::from_smartrest(&smartrest);
+ assert!(log.is_ok());
+ }
}
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index 6504567d..ce48c1a3 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -9,6 +9,7 @@ type SmartRest = String;
pub enum CumulocitySupportedOperations {
C8ySoftwareUpdate,
C8yLogFileRequest,
+ C8yRestartRequest,
}
impl From<CumulocitySupportedOperations> for &'static str {
@@ -16,6 +17,7 @@ impl From<CumulocitySupportedOperations> for &'static str {
match op {
CumulocitySupportedOperations::C8ySoftwareUpdate => "c8y_SoftwareUpdate",
CumulocitySupportedOperations::C8yLogFileRequest => "c8y_LogfileRequest",
+ CumulocitySupportedOperations::C8yRestartRequest => "c8y_Restart",
}
}
}
@@ -59,6 +61,7 @@ impl Default for SmartRestSetSupportedOperations {
supported_operations: vec![
CumulocitySupportedOperations::C8ySoftwareUpdate.into(),
CumulocitySupportedOperations::C8yLogFileRequest.into(),
+ CumulocitySupportedOperations::C8yRestartRequest.into(),
],
}
}
@@ -160,7 +163,7 @@ pub struct SmartRestSetOperationToFailed {
}
impl SmartRestSetOperationToFailed {
- fn new(operation: CumulocitySupportedOperations, reason: String) -> Self {
+ pub fn new(operation: CumulocitySupportedOperations, reason: String) -> Self {
Self {
message_id: "502",
operation: operation.into(),
@@ -208,11 +211,15 @@ mod tests {
use json_sm::*;
#[test]
+ // NOTE: this test always needs changing when a new operation is added
fn serialize_smartrest_supported_operations() {
let smartrest = SmartRestSetSupportedOperations::default()
.to_smartrest()
.unwrap();
- assert_eq!(smartrest, "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n");
+ assert_eq!(
+ smartrest,
+ "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n"
+ );
}
#[test]
diff --git a/crates/core/json_sm/src/messages.rs b/crates/core/json_sm/src/messages.rs
index d7ffad69..ca31ed78 100644
--- a/crates/core/json_sm/src/messages.rs
+++ b/crates/core/json_sm/src/messages.rs
@@ -505,6 +505,12 @@ impl From<SoftwareError> for Option<SoftwareModuleItem> {
}
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub enum RestartOperation {
+ Request(RestartOperationRequest),
+ Response(RestartOperationResponse),
+}
+
/// Message payload definition for restart operation request.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields)]
@@ -553,6 +559,10 @@ impl RestartOperationResponse {
pub fn topic_name() -> &'static str {
"tedge/commands/res/control/restart"
}
+
+ pub fn status(&self) -> OperationStatus {
+ self.status
+ }
}
#[cfg(test)]
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index ff4560b4..1402d8f3 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -252,7 +252,10 @@ impl SmAgent {
topic if topic == &self.config.request_topic_restart => {
let request = self.match_restart_operation_payload(mqtt, &message).await?;
- if let Err(error) = self.handle_restart_operation().await {
+ if let Err(error) = self
+ .handle_restart_operation(mqtt, &self.config.response_topic_restart)
+ .await
+ {
error!("{}", error);
self.persistance_store.clear().await?;
@@ -425,11 +428,20 @@ impl SmAgent {
Ok(request)
}
- async fn handle_restart_operation(&self) -> Result<(), AgentError> {
+ async fn handle_restart_operation(
+ &self,
+ mqtt: &Client,
+ topic: &Topic,
+ ) -> Result<(), AgentError> {
self.persistance_store
.update(&StateStatus::Restart(RestartOperationStatus::Restarting))
.await?;
+ // update status to executing.
+ let executing_response = RestartOperationResponse::new(&RestartOperationRequest::new());
+ let _ = mqtt
+ .publish(Message::new(&topic, executing_response.to_bytes()?))
+ .await?;
let () = restart_operation::create_slash_run_file()?;
let _process_result = std::process::Command::new("sudo").arg("sync").status();
@@ -542,7 +554,16 @@ mod tests {
.unwrap();
// calling handle_restart_operation should create a file in /run/tedge_agent_restart
- let () = agent.handle_restart_operation().await?;
+ let mqtt = Client::connect(
+ "sm-agent-test",
+ &mqtt_client::Config::default().with_packet_size(10 * 1024 * 1024),
+ )
+ .await?;
+ let response_topic_restart =
+ Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic");
+ let () = agent
+ .handle_restart_operation(&mqtt, &response_topic_restart)
+ .await?;
assert!(std::path::Path::new(&SLASH_RUN_PATH_TEDGE_AGENT_RESTART).exists());
// removing the file
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index f40b1873..e838c508 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -3,7 +3,7 @@ use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
use async_trait::async_trait;
-use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
+use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest};
use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_smartrest::{
error::SmartRestDeserializerError,
@@ -16,8 +16,8 @@ use c8y_smartrest::{
};
use chrono::{DateTime, FixedOffset, Local};
use json_sm::{
- Auth, DownloadInfo, Jsonify, OperationStatus, SoftwareListRequest, SoftwareListResponse,
- SoftwareUpdateResponse,
+ Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest,
+ RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
use reqwest::Url;
@@ -77,6 +77,7 @@ impl CumulocitySoftwareManagement {
let mut topic_filter = TopicFilter::new(IncomingTopic::SoftwareListResponse.as_str())?;
topic_filter.add(IncomingTopic::SoftwareUpdateResponse.as_str())?;
topic_filter.add(IncomingTopic::SmartRestRequest.as_str())?;
+ topic_filter.add(IncomingTopic::RestartResponse.as_str())?;
let messages = self.client.subscribe(topic_filter).await?;
Ok(messages)
@@ -140,6 +141,9 @@ impl CumulocitySoftwareManagement {
"522" => {
let () = self.forward_log_request(payload).await?;
}
+ "510" => {
+ let () = self.forward_restart_request(payload).await?;
+ }
_ => {
return Err(SMCumulocityMapperError::InvalidMqttMessage);
}
@@ -171,6 +175,11 @@ impl CumulocitySoftwareManagement {
.publish_operation_status(message.payload_str()?)
.await?;
}
+ IncomingTopic::RestartResponse => {
+ let () = self
+ .publish_restart_operation_status(message.payload_str()?)
+ .await?;
+ }
IncomingTopic::SmartRestRequest => {
debug!("Cumulocity");
let () = self.process_smartrest(message.payload_str()?).await?;
@@ -269,6 +278,41 @@ impl CumulocitySoftwareManagement {
Ok(())
}
+ async fn publish_restart_operation_status(
+ &self,
+ json_response: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let response = RestartOperationResponse::from_json(json_response)?;
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+
+ match response.status() {
+ OperationStatus::Executing => {
+ let smartrest_set_operation = SmartRestSetOperationToExecuting::new(
+ CumulocitySupportedOperations::C8yRestartRequest,
+ )
+ .to_smartrest()?;
+
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ }
+ OperationStatus::Successful => {
+ let smartrest_set_operation = SmartRestSetOperationToSuccessful::new(
+ CumulocitySupportedOperations::C8yRestartRequest,
+ )
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ }
+ OperationStatus::Failed => {
+ let smartrest_set_operation = SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yRestartRequest,
+ "Restart Failed".into(),
+ )
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ }
+ }
+ Ok(())
+ }
+
async fn set_log_file_request_executing(&self) -> Result<(), SMCumulocityMapperError> {
let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
let smartrest_set_operation_status =
@@ -367,6 +411,19 @@ impl CumulocitySoftwareManagement {
Ok(())
}
+ async fn forward_restart_request(
+ &self,
+ smartrest: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::RestartRequest.to_topic()?;
+ let _ = SmartRestRestartRequest::from_smartrest(smartrest)?;
+
+ let request = RestartOperationRequest::new();
+ let () = self.publish(&topic, request.to_json()?).await?;
+
+ Ok(())
+ }
+
async fn publish(&self, topic: &Topic, payload: String) -> Result<(), MqttClientError> {
let () = self
.client
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
index 964214ba..bd3515e8 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
@@ -49,7 +49,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
&mut messages,
TEST_TIMEOUT_MS,
vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n",
+ "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
"118,software-management\n",
"500\n",
],
@@ -116,7 +116,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
&mut messages,
TEST_TIMEOUT_MS,
vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n",
+ "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
"118,software-management\n",
"500\n",
],
@@ -181,7 +181,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
&mut messages,
TEST_TIMEOUT_MS,
vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n",
+ "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
"118,software-management\n",
"500\n",
],
@@ -365,7 +365,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
&mut messages,
TEST_TIMEOUT_MS,
vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n",
+ "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
"118,software-management\n",
"500\n",
],
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
index 16cbcf74..d87ee33e 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs
@@ -7,6 +7,7 @@ pub(crate) enum IncomingTopic {
SoftwareListResponse,
SoftwareUpdateResponse,
SmartRestRequest,
+ RestartResponse,
}
impl IncomingTopic {
@@ -15,6 +16,7 @@ impl IncomingTopic {
Self::SoftwareListResponse => r#"tedge/commands/res/software/list"#,
Self::SoftwareUpdateResponse => r#"tedge/commands/res/software/update"#,
Self::SmartRestRequest => r#"c8y/s/ds"#,
+ Self::RestartResponse => r#"tedge/commands/res/control/restart"#,
}
}
}
@@ -27,6 +29,7 @@ impl TryFrom<String> for IncomingTopic {
r#"tedge/commands/res/software/list"# => Ok(IncomingTopic::SoftwareListResponse),
r#"tedge/commands/res/software/update"# => Ok(IncomingTopic::SoftwareUpdateResponse),
r#"c8y/s/ds"# => Ok(IncomingTopic::SmartRestRequest),
+ r#"tedge/commands/res/control/restart"# => Ok(IncomingTopic::RestartResponse),
err => Err(MapperTopicError::UnknownTopic {
topic: err.to_string(),
}),
@@ -55,6 +58,7 @@ pub(crate) enum OutgoingTopic {
SoftwareListRequest,
SoftwareUpdateRequest,
SmartRestResponse,
+ RestartRequest,
}
impl OutgoingTopic {
@@ -63,6 +67,7 @@ impl OutgoingTopic {
Self::SoftwareListRequest => r#"tedge/commands/req/software/list"#,
Self::SoftwareUpdateRequest => r#"tedge/commands/req/software/update"#,
Self::SmartRestResponse => r#"c8y/s/us"#,
+ Self::RestartRequest => r#"tedge/commands/req/control/restart"#,
}
}
@@ -71,6 +76,7 @@ impl OutgoingTopic {
Self::SoftwareListRequest => Topic::new(Self::SoftwareListRequest.as_str()),
Self::SoftwareUpdateRequest => Topic::new(Self::SoftwareUpdateRequest.as_str()),
Self::SmartRestResponse => Topic::new(Self::SmartRestResponse.as_str()),
+ Self::RestartRequest => Topic::new(Self::RestartRequest.as_str()),
}
}
}