summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2023-01-04 14:29:15 +0100
committerMarcel Müller <neikos@neikos.email>2023-01-04 14:29:39 +0100
commitb1a2c2d9a1723de358b34786f8194f66d17f42c2 (patch)
treef057d4d407b3aa9b1f610c47e45fcae330e0ea78
parent80574cc1d6c7d4b922b570e451c58c5c1cf5f118 (diff)
Add simple subscription handler
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--src/bin/cloudmqtt-server.rs24
1 files changed, 22 insertions, 2 deletions
diff --git a/src/bin/cloudmqtt-server.rs b/src/bin/cloudmqtt-server.rs
index fc384c4..37ad878 100644
--- a/src/bin/cloudmqtt-server.rs
+++ b/src/bin/cloudmqtt-server.rs
@@ -6,8 +6,10 @@
use std::sync::Arc;
-use cloudmqtt::server::handler::{LoginError, LoginHandler};
+use cloudmqtt::server::handler::{LoginError, LoginHandler, SubscriptionHandler};
use cloudmqtt::server::{ClientId, MqttServer};
+use mqtt_format::v3::qos::MQualityOfService;
+use mqtt_format::v3::subscription_request::MSubscriptionRequest;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -32,6 +34,23 @@ impl LoginHandler for SimpleLoginHandler {
}
}
+struct SimpleSubscriptionHandler;
+
+#[async_trait::async_trait]
+impl SubscriptionHandler for SimpleSubscriptionHandler {
+ async fn allow_subscription(
+ &self,
+ _client_id: Arc<ClientId>,
+ subscription: MSubscriptionRequest<'_>,
+ ) -> Option<MQualityOfService> {
+ if &*subscription.topic == "forbidden" {
+ return None;
+ }
+
+ Some(subscription.qos)
+ }
+}
+
#[tokio::main]
async fn main() {
let fmt_layer = tracing_subscriber::fmt::layer()
@@ -50,7 +69,8 @@ async fn main() {
let server = MqttServer::serve_v3_unsecured_tcp("0.0.0.0:1883")
.await
.unwrap()
- .with_login_handler(SimpleLoginHandler);
+ .with_login_handler(SimpleLoginHandler)
+ .with_subscription_handler(SimpleSubscriptionHandler);
Arc::new(server).accept_new_clients().await.unwrap();
}