summaryrefslogtreecommitdiffstats
path: root/server/src/routes/websocket.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/routes/websocket.rs')
-rw-r--r--server/src/routes/websocket.rs179
1 files changed, 179 insertions, 0 deletions
diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs
new file mode 100644
index 00000000..8ae3552c
--- /dev/null
+++ b/server/src/routes/websocket.rs
@@ -0,0 +1,179 @@
+use crate::websocket::server::*;
+use actix::prelude::*;
+use actix_web::web;
+use actix_web::*;
+use actix_web_actors::ws;
+use std::time::{Duration, Instant};
+
+pub fn config(cfg: &mut web::ServiceConfig) {
+ // Start chat server actor in separate thread
+ let server = ChatServer::default().start();
+ cfg
+ .data(server)
+ .service(web::resource("/api/v1/ws").to(chat_route));
+}
+
+/// How often heartbeat pings are sent
+const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
+/// How long before lack of client response causes a timeout
+const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Entry point for our route
+fn chat_route(
+ req: HttpRequest,
+ stream: web::Payload,
+ chat_server: web::Data<Addr<ChatServer>>,
+) -> Result<HttpResponse, Error> {
+ ws::start(
+ WSSession {
+ cs_addr: chat_server.get_ref().to_owned(),
+ id: 0,
+ hb: Instant::now(),
+ ip: req
+ .connection_info()
+ .remote()
+ .unwrap_or("127.0.0.1:12345")
+ .split(':')
+ .next()
+ .unwrap_or("127.0.0.1")
+ .to_string(),
+ },
+ &req,
+ stream,
+ )
+}
+
+struct WSSession {
+ cs_addr: Addr<ChatServer>,
+ /// unique session id
+ id: usize,
+ ip: String,
+ /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
+ /// otherwise we drop connection.
+ hb: Instant,
+}
+
+impl Actor for WSSession {
+ type Context = ws::WebsocketContext<Self>;
+
+ /// Method is called on actor start.
+ /// We register ws session with ChatServer
+ fn started(&mut self, ctx: &mut Self::Context) {
+ // we'll start heartbeat process on session start.
+ self.hb(ctx);
+
+ // register self in chat server. `AsyncContext::wait` register
+ // future within context, but context waits until this future resolves
+ // before processing any other events.
+ // across all routes within application
+ let addr = ctx.address();
+ self
+ .cs_addr
+ .send(Connect {
+ addr: addr.recipient(),
+ ip: self.ip.to_owned(),
+ })
+ .into_actor(self)
+ .then(|res, act, ctx| {
+ match res {
+ Ok(res) => act.id = res,
+ // something is wrong with chat server
+ _ => ctx.stop(),
+ }
+ fut::ok(())
+ })
+ .wait(ctx);
+ }
+
+ fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
+ // notify chat server
+ self.cs_addr.do_send(Disconnect {
+ id: self.id,
+ ip: self.ip.to_owned(),
+ });
+ Running::Stop
+ }
+}
+
+/// Handle messages from chat server, we simply send it to peer websocket
+/// These are room messages, IE sent to others in the room
+impl Handler<WSMessage> for WSSession {
+ type Result = ();
+
+ fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
+ // println!("id: {} msg: {}", self.id, msg.0);
+ ctx.text(msg.0);
+ }
+}
+
+/// WebSocket message handler
+impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
+ fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
+ // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
+ match msg {
+ ws::Message::Ping(msg) => {
+ self.hb = Instant::now();
+ ctx.pong(&msg);
+ }
+ ws::Message::Pong(_) => {
+ self.hb = Instant::now();
+ }
+ ws::Message::Text(text) => {
+ let m = text.trim().to_owned();
+ println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
+
+ self
+ .cs_addr
+ .send(StandardMessage {
+ id: self.id,
+ msg: m,
+ })
+ .into_actor(self)
+ .then(|res, _, ctx| {
+ match res {
+ Ok(res) => ctx.text(res),
+ Err(e) => {
+ eprintln!("{}", &e);
+ }
+ }
+ fut::ok(())
+ })
+ .wait(ctx);
+ }
+ ws::Message::Binary(_bin) => println!("Unexpected binary"),
+ ws::Message::Close(_) => {
+ ctx.stop();
+ }
+ _ => {}
+ }
+ }
+}
+
+impl WSSession {
+ /// helper method that sends ping to client every second.
+ ///
+ /// also this method checks heartbeats from client
+ fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
+ ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
+ // check client heartbeats
+ if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
+ // heartbeat timed out
+ println!("Websocket Client heartbeat failed, disconnecting!");
+
+ // notify chat server
+ act.cs_addr.do_send(Disconnect {
+ id: act.id,
+ ip: act.ip.to_owned(),
+ });
+
+ // stop actor
+ ctx.stop();
+
+ // don't try to send a ping
+ return;
+ }
+
+ ctx.ping("");
+ });
+ }
+}