summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorPradeepKiruvale <PRADEEPKIRUVALE@gmail.com>2021-12-01 11:57:00 +0530
committerGitHub <noreply@github.com>2021-12-01 11:57:00 +0530
commitff073f8acebb1e7ae80de7033750f51591e687b2 (patch)
tree262001230c728f59c2c45522afd2ffc4e790ca3c /crates
parentd461b9dc1544d560b4fe5ac0d1f6e3ac523898b3 (diff)
[Bug-540] tedge agent missing software list request (#653)
* test * fix the mqtt message missing issue * fix typos * rename function Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates')
-rw-r--r--crates/core/tedge_agent/src/agent.rs16
1 files changed, 10 insertions, 6 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 9217e8b5..ff4560b4 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -13,7 +13,7 @@ use json_sm::{
RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse,
SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
-use mqtt_client::{Client, Config, Message, MqttClient, Topic, TopicFilter};
+use mqtt_client::{Client, Config, Message, MqttClient, MqttMessageStream, Topic, TopicFilter};
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
use std::{
fmt::Debug,
@@ -171,6 +171,9 @@ impl SmAgent {
pub async fn start(&mut self) -> Result<(), AgentError> {
info!("Starting tedge agent");
+ let mqtt = Client::connect(self.name.as_str(), &self.config.mqtt_client_config).await?;
+ let mut operations = mqtt.subscribe(self.config.request_topics.clone()).await?;
+
let plugins = Arc::new(Mutex::new(ExternalPlugins::open(
self.config.sm_home.join("sm-plugins"),
get_default_plugin(&self.config.config_location)?,
@@ -183,7 +186,6 @@ impl SmAgent {
return Err(AgentError::NoPlugins);
}
- let mqtt = Client::connect(self.name.as_str(), &self.config.mqtt_client_config).await?;
let mut errors = mqtt.subscribe_errors();
tokio::spawn(async move {
while let Some(error) = errors.next().await {
@@ -195,22 +197,24 @@ impl SmAgent {
// * Maybe it would be nice if mapper/registry responds
let () = publish_capabilities(&mqtt).await?;
- while let Err(error) = self.subscribe_and_process(&mqtt, &plugins).await {
+ while let Err(error) = self
+ .process_subscribed_messages(&mqtt, &mut operations, &plugins)
+ .await
+ {
error!("{}", error);
}
Ok(())
}
- async fn subscribe_and_process(
+ async fn process_subscribed_messages(
&mut self,
mqtt: &Client,
+ operations: &mut Box<dyn MqttMessageStream>,
plugins: &Arc<Mutex<ExternalPlugins>>,
) -> Result<(), AgentError> {
- let mut operations = mqtt.subscribe(self.config.request_topics.clone()).await?;
while let Some(message) = operations.next().await {
debug!("Request {:?}", message);
-
match &message.topic {
topic if topic == &self.config.request_topic_list => {
let _success = self