diff options
author | PradeepKiruvale <PRADEEPKIRUVALE@gmail.com> | 2022-02-04 19:57:56 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-04 19:57:56 +0530 |
commit | 43bf45a3bd868586afe83a645be67f5042d9ac5d (patch) | |
tree | 41d607b64c65750e8e7f7e65fd308f9617b351a2 /crates/core/tedge_agent/src/agent.rs | |
parent | 89f554fbb724cb4b9fe09336d58558cfa76638a1 (diff) |
[699] init/clear agent and mapper sessions (#797)
* add init to tedge_agent
* init mapper
* init mapper and agent postinst
* separate init and clear session functions
* use tokio::mutex instead of std::mutex
* pysys test for agent init
* system test for mapper init session
* use mqtt_channel init/clear session apis
Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge_agent/src/agent.rs')
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 35 |
1 files changed, 22 insertions, 13 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 049cab22..77c2556e 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -15,16 +15,12 @@ use agent_interface::{ use flockfile::{check_another_instance_is_not_running, Flockfile}; use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter}; use plugin_sm::plugin_manager::{ExternalPlugins, Plugins}; -use std::{ - convert::TryInto, - fmt::Debug, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting, }; +use tokio::sync::Mutex; use tracing::{debug, error, info, instrument}; #[cfg(not(test))] @@ -184,6 +180,20 @@ impl SmAgent { } #[instrument(skip(self), name = "sm-agent")] + pub async fn init_session(&mut self) -> Result<(), AgentError> { + info!("Initializing the tedge agent session"); + mqtt_channel::init_session(&self.config.mqtt_config).await?; + Ok(()) + } + + #[instrument(skip(self), name = "sm-agent")] + pub async fn clear_session(&mut self) -> Result<(), AgentError> { + info!("Cleaning the tedge agent session"); + mqtt_channel::clear_session(&self.config.mqtt_config).await?; + Ok(()) + } + + #[instrument(skip(self), name = "sm-agent")] pub async fn start(&mut self) -> Result<(), AgentError> { info!("Starting tedge agent"); @@ -195,8 +205,7 @@ impl SmAgent { Some("sudo".into()), )?)); - if plugins.lock().unwrap().empty() { - // `unwrap` should be safe here as we only access data. + if plugins.lock().await.empty() { error!("Couldn't load plugins from /etc/tedge/sm-plugins"); return Err(AgentError::NoPlugins); } @@ -244,10 +253,10 @@ impl SmAgent { } topic if topic == &self.config.request_topic_update => { - let () = plugins.lock().unwrap().load()?; // `unwrap` should be safe here as we only access data for write. + let () = plugins.lock().await.load()?; let () = plugins .lock() - .unwrap() // `unwrap` should be safe here as we only access data for write. + .await .update_default(&get_default_plugin(&self.config.config_location)?)?; let _success = self @@ -340,7 +349,7 @@ impl SmAgent { .operation_logs .new_log_file(LogKind::SoftwareList) .await?; - let response = plugins.lock().unwrap().list(&request, log_file).await; // `unwrap` should be safe here as we only access data. + let response = plugins.lock().await.list(&request, log_file).await; let () = responses .publish(Message::new(response_topic, response.to_bytes()?)) @@ -399,9 +408,9 @@ impl SmAgent { let response = plugins .lock() - .unwrap() + .await .process(&request, log_file, &self.config.download_dir) - .await; // `unwrap` should be safe here as we only access data. + .await; let () = responses .publish(Message::new(response_topic, response.to_bytes()?)) |