diff options
author | Marcel Müller <neikos@neikos.email> | 2024-03-27 15:07:15 +0100 |
---|---|---|
committer | Marcel Müller <neikos@neikos.email> | 2024-03-27 16:46:02 +0100 |
commit | 302deff0cfb7731aadfee3e2fa229f23ce219019 (patch) | |
tree | ac21c1228d92c8e5e45383384b24a33dd1700233 | |
parent | 49adee90b25acdf3324b949254a0a00409207c21 (diff) |
Add first rudimentary connection flow
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r-- | src/client.rs | 60 |
1 files changed, 53 insertions, 7 deletions
diff --git a/src/client.rs b/src/client.rs index ee11d9a..4f61cba 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,8 +4,13 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +use futures::SinkExt; +use futures::StreamExt; +use tokio_util::codec::Framed; + use crate::bytes::MqttBytes; use crate::client_identifier::ClientIdentifier; +use crate::codecs::MqttPacketCodec; use crate::keep_alive::KeepAlive; use crate::string::MqttString; use crate::transport::MqttConnectTransport; @@ -99,7 +104,8 @@ impl MqttClientConnector { } pub async fn connect(self) -> Result<MqttClient, ()> { - let conn: MqttConnection = self.transport.into(); + let mut conn = + tokio_util::codec::Framed::new(MqttConnection::from(self.transport), MqttPacketCodec); let conn_packet = mqtt_format::v5::packets::connect::MConnect { client_identifier: self.client_identifier.as_str(), @@ -111,6 +117,50 @@ impl MqttClientConnector { keep_alive: self.keep_alive.as_u16(), }; + conn.send(mqtt_format::v5::packets::MqttPacket::Connect(conn_packet)) + .await + .map_err(|_| ())?; + + let Some(maybe_connack) = conn.next().await else { + return Err(()); + }; + + let Ok(maybe_connack) = maybe_connack else { + return Err(()); + }; + + let connack = loop { + let can_use_auth = self.properties.authentication_data.is_some(); + let auth = match maybe_connack.get() { + mqtt_format::v5::packets::MqttPacket::Connack(connack) => break connack, + mqtt_format::v5::packets::MqttPacket::Auth(auth) => { + if can_use_auth { + auth + } else { + // MQTT-4.12.0-6 + return Err(()); + } + } + _ => { + return Err(()); + } + }; + + // TODO: Use user-provided method to authenticate further + + todo!() + }; + + // TODO: Timeout here if the server doesn't respond + + if connack.reason_code == mqtt_format::v5::packets::connack::ConnackReasonCode::Success { + // TODO: Read properties, configure client + + return Ok(MqttClient { conn }); + } + + // TODO: Do something with error code + todo!() } @@ -120,11 +170,7 @@ impl MqttClientConnector { } pub struct MqttClient { - conn: MqttConnection, + conn: Framed<MqttConnection, MqttPacketCodec>, } -impl MqttClient { - pub(crate) fn new_with_connection(conn: MqttConnection) -> Self { - MqttClient { conn } - } -} +impl MqttClient {} |