summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_agent/src/agent.rs
diff options
context:
space:
mode:
authorPradeepKiruvale <PRADEEPKIRUVALE@gmail.com>2022-02-04 19:57:56 +0530
committerGitHub <noreply@github.com>2022-02-04 19:57:56 +0530
commit43bf45a3bd868586afe83a645be67f5042d9ac5d (patch)
tree41d607b64c65750e8e7f7e65fd308f9617b351a2 /crates/core/tedge_agent/src/agent.rs
parent89f554fbb724cb4b9fe09336d58558cfa76638a1 (diff)
[699] init/clear agent and mapper sessions (#797)
* add init to tedge_agent * init mapper * init mapper and agent postinst * separate init and clear session functions * use tokio::mutex instead of std::mutex * pysys test for agent init * system test for mapper init session * use mqtt_channel init/clear session apis Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge_agent/src/agent.rs')
-rw-r--r--crates/core/tedge_agent/src/agent.rs35
1 files changed, 22 insertions, 13 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 049cab22..77c2556e 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -15,16 +15,12 @@ use agent_interface::{
use flockfile::{check_another_instance_is_not_running, Flockfile};
use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter};
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
-use std::{
- convert::TryInto,
- fmt::Debug,
- path::PathBuf,
- sync::{Arc, Mutex},
-};
+use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting,
SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting,
};
+use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument};
#[cfg(not(test))]
@@ -184,6 +180,20 @@ impl SmAgent {
}
#[instrument(skip(self), name = "sm-agent")]
+ pub async fn init_session(&mut self) -> Result<(), AgentError> {
+ info!("Initializing the tedge agent session");
+ mqtt_channel::init_session(&self.config.mqtt_config).await?;
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "sm-agent")]
+ pub async fn clear_session(&mut self) -> Result<(), AgentError> {
+ info!("Cleaning the tedge agent session");
+ mqtt_channel::clear_session(&self.config.mqtt_config).await?;
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "sm-agent")]
pub async fn start(&mut self) -> Result<(), AgentError> {
info!("Starting tedge agent");
@@ -195,8 +205,7 @@ impl SmAgent {
Some("sudo".into()),
)?));
- if plugins.lock().unwrap().empty() {
- // `unwrap` should be safe here as we only access data.
+ if plugins.lock().await.empty() {
error!("Couldn't load plugins from /etc/tedge/sm-plugins");
return Err(AgentError::NoPlugins);
}
@@ -244,10 +253,10 @@ impl SmAgent {
}
topic if topic == &self.config.request_topic_update => {
- let () = plugins.lock().unwrap().load()?; // `unwrap` should be safe here as we only access data for write.
+ let () = plugins.lock().await.load()?;
let () = plugins
.lock()
- .unwrap() // `unwrap` should be safe here as we only access data for write.
+ .await
.update_default(&get_default_plugin(&self.config.config_location)?)?;
let _success = self
@@ -340,7 +349,7 @@ impl SmAgent {
.operation_logs
.new_log_file(LogKind::SoftwareList)
.await?;
- let response = plugins.lock().unwrap().list(&request, log_file).await; // `unwrap` should be safe here as we only access data.
+ let response = plugins.lock().await.list(&request, log_file).await;
let () = responses
.publish(Message::new(response_topic, response.to_bytes()?))
@@ -399,9 +408,9 @@ impl SmAgent {
let response = plugins
.lock()
- .unwrap()
+ .await
.process(&request, log_file, &self.config.download_dir)
- .await; // `unwrap` should be safe here as we only access data.
+ .await;
let () = responses
.publish(Message::new(response_topic, response.to_bytes()?))