summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-03-27 15:07:15 +0100
committerMarcel Müller <neikos@neikos.email>2024-03-27 16:46:02 +0100
commit302deff0cfb7731aadfee3e2fa229f23ce219019 (patch)
treeac21c1228d92c8e5e45383384b24a33dd1700233
parent49adee90b25acdf3324b949254a0a00409207c21 (diff)
Add first rudimentary connection flow
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--src/client.rs60
1 files changed, 53 insertions, 7 deletions
diff --git a/src/client.rs b/src/client.rs
index ee11d9a..4f61cba 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -4,8 +4,13 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
+use futures::SinkExt;
+use futures::StreamExt;
+use tokio_util::codec::Framed;
+
use crate::bytes::MqttBytes;
use crate::client_identifier::ClientIdentifier;
+use crate::codecs::MqttPacketCodec;
use crate::keep_alive::KeepAlive;
use crate::string::MqttString;
use crate::transport::MqttConnectTransport;
@@ -99,7 +104,8 @@ impl MqttClientConnector {
}
pub async fn connect(self) -> Result<MqttClient, ()> {
- let conn: MqttConnection = self.transport.into();
+ let mut conn =
+ tokio_util::codec::Framed::new(MqttConnection::from(self.transport), MqttPacketCodec);
let conn_packet = mqtt_format::v5::packets::connect::MConnect {
client_identifier: self.client_identifier.as_str(),
@@ -111,6 +117,50 @@ impl MqttClientConnector {
keep_alive: self.keep_alive.as_u16(),
};
+ conn.send(mqtt_format::v5::packets::MqttPacket::Connect(conn_packet))
+ .await
+ .map_err(|_| ())?;
+
+ let Some(maybe_connack) = conn.next().await else {
+ return Err(());
+ };
+
+ let Ok(maybe_connack) = maybe_connack else {
+ return Err(());
+ };
+
+ let connack = loop {
+ let can_use_auth = self.properties.authentication_data.is_some();
+ let auth = match maybe_connack.get() {
+ mqtt_format::v5::packets::MqttPacket::Connack(connack) => break connack,
+ mqtt_format::v5::packets::MqttPacket::Auth(auth) => {
+ if can_use_auth {
+ auth
+ } else {
+ // MQTT-4.12.0-6
+ return Err(());
+ }
+ }
+ _ => {
+ return Err(());
+ }
+ };
+
+ // TODO: Use user-provided method to authenticate further
+
+ todo!()
+ };
+
+ // TODO: Timeout here if the server doesn't respond
+
+ if connack.reason_code == mqtt_format::v5::packets::connack::ConnackReasonCode::Success {
+ // TODO: Read properties, configure client
+
+ return Ok(MqttClient { conn });
+ }
+
+ // TODO: Do something with error code
+
todo!()
}
@@ -120,11 +170,7 @@ impl MqttClientConnector {
}
pub struct MqttClient {
- conn: MqttConnection,
+ conn: Framed<MqttConnection, MqttPacketCodec>,
}
-impl MqttClient {
- pub(crate) fn new_with_connection(conn: MqttConnection) -> Self {
- MqttClient { conn }
- }
-}
+impl MqttClient {}