diff options
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_client.rs | 60 |
1 files changed, 18 insertions, 42 deletions
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs index 9d0909cb..c12047f6 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs @@ -23,7 +23,7 @@ pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedRece continue; } Err(err) => { - let msg = format!("Error: {:?}", err).to_string(); + let msg = format!("Error: {:?}", err); let _ = sender.send(msg); return recv; } @@ -134,14 +134,8 @@ impl TestCon { self.client.subscribe(topic, qos).await?; loop { - match self.eventloop.poll().await { - Ok(Event::Incoming(Packet::SubAck(_))) => { - return Ok(()); - } - Err(err) => { - return Err(err)?; - } - _ => {} + if let Event::Incoming(Packet::SubAck(_)) = self.eventloop.poll().await? { + return Ok(()); } } } @@ -156,14 +150,8 @@ impl TestCon { self.client.publish(topic, qos, retain, payload).await?; loop { - match self.eventloop.poll().await { - Ok(Event::Incoming(Packet::PubAck(_))) => { - return Ok(()); - } - Err(err) => { - return Err(err)?; - } - _ => {} + if let Event::Incoming(Packet::PubAck(_)) = self.eventloop.poll().await? { + return Ok(()); } } } @@ -175,12 +163,12 @@ impl TestCon { let msg = std::str::from_utf8(&response.payload) .unwrap_or("Error: non-utf8-payload") .to_string(); - if let Err(_) = sender.send(msg).await { + if sender.send(msg).await.is_err() { break; } } Err(err) => { - let msg = format!("Error: {:?}", err).to_string(); + let msg = format!("Error: {:?}", err); let _ = sender.send(msg).await; break; } @@ -192,35 +180,23 @@ impl TestCon { pub async fn next_message(&mut self) -> Result<String, anyhow::Error> { loop { - match self.eventloop.poll().await { - Ok(Event::Incoming(Packet::Publish(packet))) => { - let msg = std::str::from_utf8(&packet.payload) - .unwrap_or("Error: non-utf8-payload") - .to_string(); - return Ok(msg); - } - Err(err) => { - return Err(err)?; - } - _ => {} + if let Event::Incoming(Packet::Publish(packet)) = self.eventloop.poll().await? { + let msg = std::str::from_utf8(&packet.payload) + .unwrap_or("Error: non-utf8-payload") + .to_string(); + return Ok(msg); } } } pub async fn next_topic_payload(&mut self) -> Result<(String, String), anyhow::Error> { loop { - match self.eventloop.poll().await { - Ok(Event::Incoming(Packet::Publish(packet))) => { - let topic = packet.topic.clone(); - let msg = std::str::from_utf8(&packet.payload) - .unwrap_or("Error: non-utf8-payload") - .to_string(); - return Ok((topic, msg)); - } - Err(err) => { - return Err(err)?; - } - _ => {} + if let Event::Incoming(Packet::Publish(packet)) = self.eventloop.poll().await? { + let topic = packet.topic.clone(); + let msg = std::str::from_utf8(&packet.payload) + .unwrap_or("Error: non-utf8-payload") + .to_string(); + return Ok((topic, msg)); } } } |