diff options
author | PradeepKiruvale <PRADEEPKIRUVALE@gmail.com> | 2021-12-01 11:57:00 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-01 11:57:00 +0530 |
commit | ff073f8acebb1e7ae80de7033750f51591e687b2 (patch) | |
tree | 262001230c728f59c2c45522afd2ffc4e790ca3c /crates | |
parent | d461b9dc1544d560b4fe5ac0d1f6e3ac523898b3 (diff) |
[Bug-540] tedge agent missing software list request (#653)
* test
* fix the mqtt message missing issue
* fix typos
* rename function
Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates')
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 9217e8b5..ff4560b4 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -13,7 +13,7 @@ use json_sm::{ RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse, }; -use mqtt_client::{Client, Config, Message, MqttClient, Topic, TopicFilter}; +use mqtt_client::{Client, Config, Message, MqttClient, MqttMessageStream, Topic, TopicFilter}; use plugin_sm::plugin_manager::{ExternalPlugins, Plugins}; use std::{ fmt::Debug, @@ -171,6 +171,9 @@ impl SmAgent { pub async fn start(&mut self) -> Result<(), AgentError> { info!("Starting tedge agent"); + let mqtt = Client::connect(self.name.as_str(), &self.config.mqtt_client_config).await?; + let mut operations = mqtt.subscribe(self.config.request_topics.clone()).await?; + let plugins = Arc::new(Mutex::new(ExternalPlugins::open( self.config.sm_home.join("sm-plugins"), get_default_plugin(&self.config.config_location)?, @@ -183,7 +186,6 @@ impl SmAgent { return Err(AgentError::NoPlugins); } - let mqtt = Client::connect(self.name.as_str(), &self.config.mqtt_client_config).await?; let mut errors = mqtt.subscribe_errors(); tokio::spawn(async move { while let Some(error) = errors.next().await { @@ -195,22 +197,24 @@ impl SmAgent { // * Maybe it would be nice if mapper/registry responds let () = publish_capabilities(&mqtt).await?; - while let Err(error) = self.subscribe_and_process(&mqtt, &plugins).await { + while let Err(error) = self + .process_subscribed_messages(&mqtt, &mut operations, &plugins) + .await + { error!("{}", error); } Ok(()) } - async fn subscribe_and_process( + async fn process_subscribed_messages( &mut self, mqtt: &Client, + operations: &mut Box<dyn MqttMessageStream>, plugins: &Arc<Mutex<ExternalPlugins>>, ) -> Result<(), AgentError> { - let mut operations = mqtt.subscribe(self.config.request_topics.clone()).await?; while let Some(message) = operations.next().await { debug!("Request {:?}", message); - match &message.topic { topic if topic == &self.config.request_topic_list => { let _success = self |