summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2021-11-26 15:30:42 +0000
committerGitHub <noreply@github.com>2021-11-26 15:30:42 +0000
commit07052edcb05d176f565db421b04d5f2453abf590 (patch)
tree3e7f0bbc28435aad40ed5c9b51d26afc12589589 /crates
parent47c52a679d8e0ca6c63c693cf1e21eeccd02230c (diff)
[CIT-659] restart device local operation (#591)
* [CIT-659] operation status rename Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] OperationStatus rename Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] adding new structs to lib import Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] restart operation implementation Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] restart operation check (WIP) Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] wip Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] restart operation check when /run/tedge_agent_restart persists after operation Signed-off-by: Alex <solo@softwareag.com> * [CIT-659] stash merge Signed-off-by: initard <solo@softwareag.com> * [CIT-659] restart operation refactoring, - chrono dependency Signed-off-by: initard <solo@softwareag.com> * [CIT-659] removed package feature, made file creation more explicit Signed-off-by: initard <solo@softwareag.com> * [CIT-659] dealing with some unwraps and error handling Signed-off-by: initard <solo@softwareag.com> * [CIT-659] reverting changes to operation_logs + error handling Signed-off-by: initard <solo@softwareag.com> * [CIT-659] changed operation from echo 6 to init 6 Signed-off-by: initard <solo@softwareag.com> * [CIT-659] fixing restart check logic Signed-off-by: initard <solo@softwareag.com> * [CIT-659] refactored error handling and added unit test for /run/tedge_agent_restart Signed-off-by: initard <solo@softwareag.com> * [CIT-659] refactoring of tests Signed-off-by: initard <solo@softwareag.com> * [CIT-659] ignored failed test Signed-off-by: initard <solo@softwareag.com> Co-authored-by: Alex <solo@softwareag.com>
Diffstat (limited to 'crates')
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs1
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs8
-rw-r--r--crates/core/json_sm/src/lib.rs14
-rw-r--r--crates/core/json_sm/src/messages.rs80
-rw-r--r--crates/core/tedge_agent/Cargo.toml3
-rw-r--r--crates/core/tedge_agent/src/agent.rs201
-rw-r--r--crates/core/tedge_agent/src/error.rs12
-rw-r--r--crates/core/tedge_agent/src/main.rs1
-rw-r--r--crates/core/tedge_agent/src/restart_operation_handler.rs107
-rw-r--r--crates/core/tedge_agent/src/state.rs75
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs16
11 files changed, 443 insertions, 75 deletions
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index 841915b7..d9868e81 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -177,6 +177,7 @@ where
// 2021-10-23T19:03:26+01:00
// so we add a ':'
let mut date_string: String = Deserialize::deserialize(deserializer)?;
+
let str_size = date_string.len();
// check if `date_string` does not have a colon.
let date_string_end = &date_string.split('+').last();
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index 81e7ed4f..6504567d 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -1,6 +1,6 @@
use crate::error::SmartRestSerializerError;
use csv::{QuoteStyle, WriterBuilder};
-use json_sm::{SoftwareOperationStatus, SoftwareUpdateResponse};
+use json_sm::{OperationStatus, SoftwareUpdateResponse};
use serde::{Deserialize, Serialize, Serializer};
type SmartRest = String;
@@ -104,7 +104,7 @@ impl SmartRestSetOperationToExecuting {
response: SoftwareUpdateResponse,
) -> Result<Self, SmartRestSerializerError> {
match response.status() {
- SoftwareOperationStatus::Executing => {
+ OperationStatus::Executing => {
Ok(Self::new(CumulocitySupportedOperations::C8ySoftwareUpdate))
}
_ => Err(SmartRestSerializerError::UnsupportedOperationStatus { response }),
@@ -141,7 +141,7 @@ impl SmartRestSetOperationToSuccessful {
response: SoftwareUpdateResponse,
) -> Result<Self, SmartRestSerializerError> {
match response.status() {
- SoftwareOperationStatus::Successful => {
+ OperationStatus::Successful => {
Ok(Self::new(CumulocitySupportedOperations::C8ySoftwareUpdate))
}
_ => Err(SmartRestSerializerError::UnsupportedOperationStatus { response }),
@@ -172,7 +172,7 @@ impl SmartRestSetOperationToFailed {
response: SoftwareUpdateResponse,
) -> Result<Self, SmartRestSerializerError> {
match &response.status() {
- SoftwareOperationStatus::Failed => Ok(Self::new(
+ OperationStatus::Failed => Ok(Self::new(
CumulocitySupportedOperations::C8ySoftwareUpdate,
response.error().unwrap_or_else(|| "".to_string()),
)),
diff --git a/crates/core/json_sm/src/lib.rs b/crates/core/json_sm/src/lib.rs
index d2153a19..2755647a 100644
--- a/crates/core/json_sm/src/lib.rs
+++ b/crates/core/json_sm/src/lib.rs
@@ -4,9 +4,9 @@ mod software;
pub use error::*;
pub use messages::{
- software_filter_topic, Auth, DownloadInfo, Jsonify, SoftwareListRequest, SoftwareListResponse,
- SoftwareOperationStatus, SoftwareRequestResponse, SoftwareUpdateRequest,
- SoftwareUpdateResponse,
+ control_filter_topic, software_filter_topic, Auth, DownloadInfo, Jsonify, OperationStatus,
+ RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse,
+ SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
pub use software::*;
@@ -153,7 +153,7 @@ mod tests {
SoftwareListResponse::from_json(json_response).expect("Failed to deserialize");
assert_eq!(response.id(), "123");
- assert_eq!(response.status(), SoftwareOperationStatus::Successful);
+ assert_eq!(response.status(), OperationStatus::Successful);
assert_eq!(response.error(), None);
// The mapper can use then the current list of modules
@@ -227,7 +227,7 @@ mod tests {
SoftwareListResponse::from_json(json_response).expect("Failed to deserialize");
assert_eq!(response.id(), "123");
- assert_eq!(response.status(), SoftwareOperationStatus::Failed);
+ assert_eq!(response.status(), OperationStatus::Failed);
assert_eq!(response.error(), Some("Request timed-out".into()));
assert_eq!(response.modules(), vec![]);
}
@@ -592,7 +592,7 @@ mod tests {
SoftwareUpdateResponse::from_json(json_response).expect("Failed to deserialize");
assert_eq!(response.id(), "123".to_string());
- assert_eq!(response.status(), SoftwareOperationStatus::Executing);
+ assert_eq!(response.status(), OperationStatus::Executing);
assert_eq!(response.error(), None);
assert_eq!(response.modules(), vec![]);
}
@@ -864,7 +864,7 @@ mod tests {
SoftwareUpdateResponse::from_json(json_response).expect("Failed to deserialize");
assert_eq!(response.id(), "123");
- assert_eq!(response.status(), SoftwareOperationStatus::Failed);
+ assert_eq!(response.status(), OperationStatus::Failed);
assert_eq!(
response.error(),
Some("2 errors: fail to install [ collectd ] fail to remove [ mongodb ]".into())
diff --git a/crates/core/json_sm/src/messages.rs b/crates/core/json_sm/src/messages.rs
index 8e09bd71..d7ffad69 100644
--- a/crates/core/json_sm/src/messages.rs
+++ b/crates/core/json_sm/src/messages.rs
@@ -28,6 +28,10 @@ pub const fn software_filter_topic() -> &'static str {
"tedge/commands/req/software/#"
}
+pub const fn control_filter_topic() -> &'static str {
+ "tedge/commands/req/control/#"
+}
+
/// Message payload definition for SoftwareList request.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields)]
@@ -171,7 +175,7 @@ pub struct SoftwareRequestResponseSoftwareList {
/// Possible statuses for result of Software operation.
#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Clone)]
#[serde(rename_all = "camelCase")]
-pub enum SoftwareOperationStatus {
+pub enum OperationStatus {
Successful,
Failed,
Executing,
@@ -189,7 +193,7 @@ impl<'a> Jsonify<'a> for SoftwareListResponse {}
impl SoftwareListResponse {
pub fn new(req: &SoftwareListRequest) -> SoftwareListResponse {
SoftwareListResponse {
- response: SoftwareRequestResponse::new(&req.id, SoftwareOperationStatus::Executing),
+ response: SoftwareRequestResponse::new(&req.id, OperationStatus::Executing),
}
}
@@ -208,7 +212,7 @@ impl SoftwareListResponse {
}
pub fn set_error(&mut self, reason: &str) {
- self.response.status = SoftwareOperationStatus::Failed;
+ self.response.status = OperationStatus::Failed;
self.response.reason = Some(reason.into());
}
@@ -216,7 +220,7 @@ impl SoftwareListResponse {
&self.response.id
}
- pub fn status(&self) -> SoftwareOperationStatus {
+ pub fn status(&self) -> OperationStatus {
self.response.status
}
@@ -241,7 +245,7 @@ impl<'a> Jsonify<'a> for SoftwareUpdateResponse {}
impl SoftwareUpdateResponse {
pub fn new(req: &SoftwareUpdateRequest) -> SoftwareUpdateResponse {
SoftwareUpdateResponse {
- response: SoftwareRequestResponse::new(&req.id, SoftwareOperationStatus::Executing),
+ response: SoftwareRequestResponse::new(&req.id, OperationStatus::Executing),
}
}
@@ -270,7 +274,7 @@ impl SoftwareUpdateResponse {
}
pub fn set_error(&mut self, reason: &str) {
- self.response.status = SoftwareOperationStatus::Failed;
+ self.response.status = OperationStatus::Failed;
self.response.reason = Some(reason.into());
}
@@ -278,7 +282,7 @@ impl SoftwareUpdateResponse {
&self.response.id
}
- pub fn status(&self) -> SoftwareOperationStatus {
+ pub fn status(&self) -> OperationStatus {
self.response.status
}
@@ -373,7 +377,7 @@ pub struct SoftwareModuleItem {
#[serde(rename_all = "camelCase")]
pub struct SoftwareRequestResponse {
pub id: String,
- pub status: SoftwareOperationStatus,
+ pub status: OperationStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
@@ -388,7 +392,7 @@ pub struct SoftwareRequestResponse {
impl<'a> Jsonify<'a> for SoftwareRequestResponse {}
impl SoftwareRequestResponse {
- pub fn new(id: &str, status: SoftwareOperationStatus) -> Self {
+ pub fn new(id: &str, status: OperationStatus) -> Self {
SoftwareRequestResponse {
id: id.to_string(),
status,
@@ -400,7 +404,7 @@ impl SoftwareRequestResponse {
pub fn add_modules(&mut self, plugin_type: SoftwareType, modules: Vec<SoftwareModuleItem>) {
if self.failures.is_empty() {
- self.status = SoftwareOperationStatus::Successful;
+ self.status = OperationStatus::Successful;
}
if self.current_software_list.is_none() {
@@ -416,7 +420,7 @@ impl SoftwareRequestResponse {
}
pub fn add_errors(&mut self, plugin_type: SoftwareType, modules: Vec<SoftwareModuleItem>) {
- self.status = SoftwareOperationStatus::Failed;
+ self.status = OperationStatus::Failed;
self.failures.push(SoftwareRequestResponseSoftwareList {
plugin_type,
@@ -501,6 +505,56 @@ impl From<SoftwareError> for Option<SoftwareModuleItem> {
}
}
+/// Message payload definition for restart operation request.
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+#[serde(deny_unknown_fields)]
+#[serde(rename_all = "camelCase")]
+pub struct RestartOperationRequest {
+ pub id: String,
+}
+
+impl<'a> Jsonify<'a> for RestartOperationRequest {}
+
+impl RestartOperationRequest {
+ pub fn new() -> RestartOperationRequest {
+ let id = nanoid!();
+ RestartOperationRequest { id }
+ }
+
+ pub fn new_with_id(id: &str) -> RestartOperationRequest {
+ RestartOperationRequest { id: id.to_string() }
+ }
+
+ pub fn topic_name() -> &'static str {
+ "tedge/commands/req/control/restart"
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub struct RestartOperationResponse {
+ pub id: String,
+ pub status: OperationStatus,
+}
+
+impl<'a> Jsonify<'a> for RestartOperationResponse {}
+
+impl RestartOperationResponse {
+ pub fn new(req: &RestartOperationRequest) -> Self {
+ Self {
+ id: req.id.clone(),
+ status: OperationStatus::Executing,
+ }
+ }
+
+ pub fn with_status(self, status: OperationStatus) -> Self {
+ Self { status, ..self }
+ }
+
+ pub fn topic_name() -> &'static str {
+ "tedge/commands/res/control/restart"
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -576,7 +630,7 @@ mod tests {
fn serde_software_list_empty_successful() {
let request = SoftwareRequestResponse {
id: "1234".to_string(),
- status: SoftwareOperationStatus::Successful,
+ status: OperationStatus::Successful,
reason: None,
current_software_list: Some(vec![]),
failures: vec![],
@@ -609,7 +663,7 @@ mod tests {
let request = SoftwareRequestResponse {
id: "1234".to_string(),
- status: SoftwareOperationStatus::Successful,
+ status: OperationStatus::Successful,
reason: None,
current_software_list: Some(vec![docker_module1]),
failures: vec![],
diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml
index 5ff002cd..4c0a502e 100644
--- a/crates/core/tedge_agent/Cargo.toml
+++ b/crates/core/tedge_agent/Cargo.toml
@@ -35,14 +35,17 @@ serde_json = "1.0"
structopt = "0.3"
tedge_config = { path = "../../common/tedge_config" }
tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
+time = { version = "0.3", features = ["formatting"] }
thiserror = "1.0"
tokio = { version = "1.8", features = ["fs","process", "rt"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
+anyhow = "1.0"
assert_cmd = "2.0"
once_cell = "1.8"
predicates = "2.0"
tempfile = "3.2"
+tokio-test = "0.4"
serial_test = "0.5"
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 16148ab1..9217e8b5 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -1,18 +1,22 @@
use crate::{
error::AgentError,
- state::{AgentStateRepository, State, StateRepository},
+ restart_operation_handler::restart_operation,
+ state::{
+ AgentStateRepository, RestartOperationStatus, SoftwareOperationVariants, State,
+ StateRepository, StateStatus,
+ },
};
-
use flockfile::{check_another_instance_is_not_running, Flockfile};
use json_sm::{
- software_filter_topic, Jsonify, SoftwareError, SoftwareListRequest, SoftwareListResponse,
- SoftwareOperationStatus, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
- SoftwareUpdateResponse,
+ control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
+ RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse,
+ SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
use mqtt_client::{Client, Config, Message, MqttClient, Topic, TopicFilter};
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
use std::{
+ fmt::Debug,
path::PathBuf,
sync::{Arc, Mutex},
};
@@ -24,6 +28,12 @@ use tedge_config::{
SoftwarePluginDefaultSetting, TEdgeConfigLocation,
};
+#[cfg(not(test))]
+const INIT_COMMAND: &'static str = "init";
+
+#[cfg(test)]
+const INIT_COMMAND: &'static str = "echo";
+
#[derive(Debug)]
pub struct SmAgentConfig {
pub errors_topic: Topic,
@@ -31,8 +41,10 @@ pub struct SmAgentConfig {
pub request_topic_list: Topic,
pub request_topic_update: Topic,
pub request_topics: TopicFilter,
+ pub request_topic_restart: Topic,
pub response_topic_list: Topic,
pub response_topic_update: Topic,
+ pub response_topic_restart: Topic,
pub sm_home: PathBuf,
pub log_dir: PathBuf,
config_location: TEdgeConfigLocation,
@@ -44,7 +56,10 @@ impl Default for SmAgentConfig {
let mqtt_client_config = mqtt_client::Config::default().with_packet_size(10 * 1024 * 1024);
- let request_topics = TopicFilter::new(software_filter_topic()).expect("Invalid topic");
+ let mut request_topics = TopicFilter::new(software_filter_topic()).expect("Invalid topic");
+ let () = request_topics
+ .add(control_filter_topic())
+ .expect("Invalid topic filter");
let request_topic_list =
Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic");
@@ -58,6 +73,12 @@ impl Default for SmAgentConfig {
let response_topic_update =
Topic::new(SoftwareUpdateResponse::topic_name()).expect("Invalid topic");
+ let request_topic_restart =
+ Topic::new(RestartOperationRequest::topic_name()).expect("Invalid topic");
+
+ let response_topic_restart =
+ Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic");
+
let sm_home = PathBuf::from("/etc/tedge");
let log_dir = PathBuf::from("/var/log/tedge/agent");
@@ -72,6 +93,8 @@ impl Default for SmAgentConfig {
request_topics,
response_topic_list,
response_topic_update,
+ request_topic_restart,
+ response_topic_restart,
sm_home,
log_dir,
config_location,
@@ -168,12 +191,13 @@ impl SmAgent {
}
});
- let () = self.fail_pending_operation(&mqtt).await?;
+ let () = self.process_pending_operation(&mqtt).await?;
// * Maybe it would be nice if mapper/registry responds
let () = publish_capabilities(&mqtt).await?;
-
- let () = self.subscribe_and_process(&mqtt, &plugins).await?;
+ while let Err(error) = self.subscribe_and_process(&mqtt, &plugins).await {
+ error!("{}", error);
+ }
Ok(())
}
@@ -222,6 +246,23 @@ impl SmAgent {
});
}
+ topic if topic == &self.config.request_topic_restart => {
+ let request = self.match_restart_operation_payload(mqtt, &message).await?;
+ if let Err(error) = self.handle_restart_operation().await {
+ error!("{}", error);
+
+ self.persistance_store.clear().await?;
+ let status = OperationStatus::Failed;
+ let response = RestartOperationResponse::new(&request).with_status(status);
+ let () = mqtt
+ .publish(Message::new(
+ &self.config.response_topic_restart,
+ response.to_bytes()?,
+ ))
+ .await?;
+ }
+ }
+
_ => error!("Unknown operation. Discarded."),
}
}
@@ -242,7 +283,7 @@ impl SmAgent {
.persistance_store
.store(&State {
operation_id: Some(request.id.clone()),
- operation: Some("list".into()),
+ operation: Some(StateStatus::Software(SoftwareOperationVariants::List)),
})
.await?;
@@ -283,7 +324,7 @@ impl SmAgent {
.publish(Message::new(response_topic, response.to_bytes()?))
.await?;
- let _state = self.persistance_store.clear().await?;
+ let _state: State = self.persistance_store.clear().await?;
Ok(())
}
@@ -301,7 +342,7 @@ impl SmAgent {
.persistance_store
.store(&State {
operation_id: Some(request.id.clone()),
- operation: Some("update".into()),
+ operation: Some(StateStatus::Software(SoftwareOperationVariants::Update)),
})
.await?;
@@ -345,29 +386,111 @@ impl SmAgent {
Ok(())
}
- async fn fail_pending_operation(&self, mqtt: &Client) -> Result<(), AgentError> {
+ async fn match_restart_operation_payload(
+ &self,
+ mqtt: &Client,
+ message: &Message,
+ ) -> Result<RestartOperationRequest, AgentError> {
+ let request = match RestartOperationRequest::from_slice(message.payload_trimmed()) {
+ Ok(request) => {
+ let () = self
+ .persistance_store
+ .store(&State {
+ operation_id: Some(request.id.clone()),
+ operation: Some(StateStatus::Restart(RestartOperationStatus::Restarting)),
+ })
+ .await?;
+ request
+ }
+
+ Err(error) => {
+ error!("Parsing error: {}", error);
+ let _ = mqtt
+ .publish(Message::new(
+ &self.config.errors_topic,
+ format!("{}", error),
+ ))
+ .await?;
+
+ return Err(SoftwareError::ParseError {
+ reason: "Parsing failed".into(),
+ }
+ .into());
+ }
+ };
+ Ok(request)
+ }
+
+ async fn handle_restart_operation(&self) -> Result<(), AgentError> {
+ self.persistance_store
+ .update(&StateStatus::Restart(RestartOperationStatus::Restarting))
+ .await?;
+
+ let () = restart_operation::create_slash_run_file()?;
+
+ let _process_result = std::process::Command::new("sudo").arg("sync").status();
+ // state = "Restarting"
+ match std::process::Command::new("sudo")
+ .arg(INIT_COMMAND)
+ .arg("6")
+ .status()
+ {
+ Ok(process_status) => {
+ if !process_status.success() {
+ return Err(AgentError::CommandFailed);
+ }
+ }
+ Err(e) => {
+ return Err(AgentError::FromIo(e));
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn process_pending_operation(&self, mqtt: &Client) -> Result<(), AgentError> {
+ let state: Result<State, _> = self.persistance_store.load().await;
+ let mut status = OperationStatus::Failed;
+
if let State {
operation_id: Some(id),
- operation: Some(operation_string),
- } = match self.persistance_store.load().await {
+ operation: Some(operation),
+ } = match state {
Ok(state) => state,
Err(_) => State {
operation_id: None,
operation: None,
},
} {
- let topic = match operation_string.into() {
- SoftwareOperation::CurrentSoftwareList => &self.config.response_topic_list,
+ let topic = match operation {
+ StateStatus::Software(SoftwareOperationVariants::List) => {
+ &self.config.response_topic_list
+ }
- SoftwareOperation::SoftwareUpdates => &self.config.response_topic_update,
+ StateStatus::Software(SoftwareOperationVariants::Update) => {
+ &self.config.response_topic_update
+ }
- SoftwareOperation::UnknownOperation => {
+ StateStatus::Restart(RestartOperationStatus::Pending) => {
+ &self.config.response_topic_restart
+ }
+
+ StateStatus::Restart(RestartOperationStatus::Restarting) => {
+ let _state = self.persistance_store.clear().await?;
+ if restart_operation::has_rebooted()? {
+ info!("Device restart successful.");
+ status = OperationStatus::Successful;
+ }
+ &self.config.response_topic_restart
+ }
+
+ StateStatus::UnknownOperation => {
error!("UnknownOperation in store.");
&self.config.errors_topic
}
};
- let response = SoftwareRequestResponse::new(&id, SoftwareOperationStatus::Failed);
+ let response = SoftwareRequestResponse::new(&id, status);
let () = mqtt
.publish(Message::new(topic, response.to_bytes()?))
@@ -397,20 +520,30 @@ async fn publish_capabilities(mqtt: &Client) -> Result<(), AgentError> {
Ok(())
}
-/// Variants of supported software operations.
-#[derive(Debug, Clone, PartialEq)]
-pub enum SoftwareOperation {
- CurrentSoftwareList,
- SoftwareUpdates,
- UnknownOperation,
-}
+#[cfg(test)]
+mod tests {
+ use super::*;
+ const SLASH_RUN_PATH_TEDGE_AGENT_RESTART: &str = "/run/tedge_agent_restart";
+
+ #[ignore]
+ #[tokio::test]
+ async fn check_agent_restart_file_is_created() -> Result<(), AgentError> {
+ assert_eq!(INIT_COMMAND, "echo");
+ let tedge_config_location =
+ tedge_config::TEdgeConfigLocation::from_default_system_location();
+ let agent = SmAgent::try_new(
+ "tedge_agent_test",
+ SmAgentConfig::try_new(tedge_config_location).unwrap(),
+ )
+ .unwrap();
+
+ // calling handle_restart_operation should create a file in /run/tedge_agent_restart
+ let () = agent.handle_restart_operation().await?;
+ assert!(std::path::Path::new(&SLASH_RUN_PATH_TEDGE_AGENT_RESTART).exists());
+
+ // removing the file
+ let () = std::fs::remove_file(&SLASH_RUN_PATH_TEDGE_AGENT_RESTART).unwrap();
-impl From<String> for SoftwareOperation {
- fn from(s: String) -> Self {
- match s.as_str() {
- r#"list"# => Self::CurrentSoftwareList,
- r#"update"# => Self::SoftwareUpdates,
- _ => Self::UnknownOperation,
- }
+ Ok(())
}
}
diff --git a/crates/core/tedge_agent/src/error.rs b/crates/core/tedge_agent/src/error.rs
index 5f99dd82..043c74ca 100644
--- a/crates/core/tedge_agent/src/error.rs
+++ b/crates/core/tedge_agent/src/error.rs
@@ -34,6 +34,18 @@ pub enum AgentError {
#[error(transparent)]
FromFlockfileError(#[from] FlockfileError),
+
+ #[error("Command returned non 0 exit code.")]
+ CommandFailed,
+
+ #[error("Failed parsing /proc/uptime")]
+ UptimeParserError,
+
+ #[error("Failed to cast string to float.")]
+ FloatCastingError,
+
+ #[error("Could not convert {timestamp:?} to unix timestamp. Error message: {}")]
+ TimestampConversionError { timestamp: i64, error_msg: String },
}
#[derive(Debug, thiserror::Error)]
diff --git a/crates/core/tedge_agent/src/main.rs b/crates/core/tedge_agent/src/main.rs
index 88a3d132..423a703a 100644
--- a/crates/core/tedge_agent/src/main.rs
+++ b/crates/core/tedge_agent/src/main.rs
@@ -4,6 +4,7 @@ use structopt::*;
mod agent;
mod error;
mod operation_logs;
+mod restart_operation_handler;
mod state;
#[derive(Debug, StructOpt)]
diff --git a/crates/core/tedge_agent/src/restart_operation_handler.rs b/crates/core/tedge_agent/src/restart_operation_handler.rs
new file mode 100644
index 00000000..1f9d85f6
--- /dev/null
+++ b/crates/core/tedge_agent/src/restart_operation_handler.rs
@@ -0,0 +1,107 @@
+pub mod restart_operation {
+
+ use crate::error::AgentError;
+ use std::{fs::File, fs::OpenOptions, io::Read, io::Write, path::Path};
+ use time::OffsetDateTime;
+
+ const SLASH_RUN_PATH_TEDGE_AGENT_RESTART: &str = "/run/tedge_agent_restart";
+ const SLASH_PROC_UPTIME: &str = "/proc/uptime";
+
+ /// creates an empty file in /run
+ /// the file name defined by `SLASH_RUN_PATH_TEDGE_AGENT_RESTART`
+ ///
+ /// # Example
+ /// ```
+ /// let () = RestartOperationHelper::create_slash_run_file()?;
+ /// ```
+ pub fn create_slash_run_file() -> Result<(), AgentError> {
+ let path = Path::new(SLASH_RUN_PATH_TEDGE_AGENT_RESTART);
+
+ let mut file = match OpenOptions::new()
+ .create(true)
+ .read(true)
+ .write(true)
+ .open(&path)
+ {
+ Ok(file) => file,
+ Err(err) => {
+ return Err(AgentError::FromIo(err));
+ }
+ };
+ let date_utc = OffsetDateTime::now_utc().unix_timestamp();
+ file.write_all(date_utc.to_string().as_bytes())?;
+ Ok(())
+ }
+
+ pub fn slash_run_file_exists() -> bool {
+ std::path::Path::new(&SLASH_RUN_PATH_TEDGE_AGENT_RESTART.to_string()).exists()
+ }
+
+ /// returns the datetime of `SLASH_RUN_PATH_TEDGE_AGENT_RESTART` "modified at".
+ fn get_restart_file_datetime() -> Result<time::OffsetDateTime, AgentError> {
+ let mut file = File::open(&SLASH_RUN_PATH_TEDGE_AGENT_RESTART)?;
+ let mut contents = String::new();
+ file.read_to_string(&mut contents)?;
+
+ let unix_timestamp = contents
+ .parse::<i64>()
+ .expect("Could not parse unix timestamp");
+
+ let dt = match OffsetDateTime::from_unix_timestamp(unix_timestamp) {
+ Ok(result) => result,
+ Err(error) => {
+ return Err(AgentError::TimestampConversionError {
+ timestamp: unix_timestamp,
+ error_msg: error.to_string(),
+ });
+ }
+ };
+ Ok(dt)
+ }
+
+ /// computes the time of last reboot.
+ ///
+ /// where "time of last reboot" is defined as:
+ /// current datetime - number of seconds the system has been up.
+ ///
+ /// number of seconds the system has been up are obtained from /proc/uptime
+ fn get_system_uptime() -> Result<time::OffsetDateTime, AgentError> {
+ // reading uptime
+ let uptime_file = std::fs::File::open(std::path::Path::new(SLASH_PROC_UPTIM