diff options
Diffstat (limited to 'server/src')
-rw-r--r-- | server/src/actions/comment.rs | 7 | ||||
-rw-r--r-- | server/src/apub.rs | 2 | ||||
-rw-r--r-- | server/src/bin/main.rs | 270 | ||||
-rw-r--r-- | server/src/lib.rs | 22 | ||||
-rw-r--r-- | server/src/websocket_server/mod.rs | 1 | ||||
-rw-r--r-- | server/src/websocket_server/server.rs | 269 |
6 files changed, 563 insertions, 8 deletions
diff --git a/server/src/actions/comment.rs b/server/src/actions/comment.rs index 104c13f2..d23382c6 100644 --- a/server/src/actions/comment.rs +++ b/server/src/actions/comment.rs @@ -4,6 +4,13 @@ use diesel::*; use diesel::result::Error; use {Crud, Likeable}; +// WITH RECURSIVE MyTree AS ( +// SELECT * FROM comment WHERE parent_id IS NULL +// UNION ALL +// SELECT m.* FROM comment AS m JOIN MyTree AS t ON m.parent_id = t.id +// ) +// SELECT * FROM MyTree; + #[derive(Queryable, Identifiable, PartialEq, Debug)] #[table_name="comment"] pub struct Comment { diff --git a/server/src/apub.rs b/server/src/apub.rs index 16b8be1b..6272fedc 100644 --- a/server/src/apub.rs +++ b/server/src/apub.rs @@ -34,7 +34,7 @@ mod tests { use super::User_; use naive_now; - #[test] + #[test] fn test_person() { let expected_user = User_ { id: 52, diff --git a/server/src/bin/main.rs b/server/src/bin/main.rs new file mode 100644 index 00000000..25181aaa --- /dev/null +++ b/server/src/bin/main.rs @@ -0,0 +1,270 @@ +extern crate server; + +use std::time::{Instant, Duration}; +use server::actix::*; +use server::actix_web::server::HttpServer; +use server::actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; + + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +use server::websocket_server::server::*; +use std::str::FromStr; +// use server::websocket_server::server::UserOperation::from_str; + +/// This is our websocket route state, this state is shared with all route +/// instances via `HttpContext::state()` +struct WsChatSessionState { + addr: Addr<ChatServer>, +} + +/// Entry point for our route +fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { + ws::start( + req, + WSSession { + id: 0, + hb: Instant::now() + }, + ) +} + +struct WSSession { + /// unique session id + id: usize, + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant +} + +impl Actor for WSSession { + type Context = ws::WebsocketContext<Self, WsChatSessionState>; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // HttpContext::state() is instance of WsChatSessionState, state is shared + // across all routes within application + let addr = ctx.address(); + ctx.state() + .addr + .send(Connect { + addr: addr.recipient(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + // notify chat server + ctx.state().addr.do_send(Disconnect { id: self.id }); + Running::Stop + } +} + +/// Handle messages from chat server, we simply send it to peer websocket +impl Handler<WSMessage> for WSSession { + type Result = (); + + fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +use server::serde_json::Value; +/// WebSocket message handler +impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + // println!("WEBSOCKET MESSAGE: {:?}", msg); + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + ws::Message::Text(text) => { + let m = text.trim(); + let json: Value = serde_json::from_str(m).unwrap(); + + // Get the OP command, and its data + let op: &str = &json["op"].as_str().unwrap(); + let data: &Value = &json["data"]; + + let user_operation: UserOperation = UserOperation::from_str(op).unwrap(); + + match user_operation { + UserOperation::Login => { + let login: Login = serde_json::from_str(&data.to_string()).unwrap(); + ctx.state() + .addr + .do_send(login); + }, + UserOperation::Register => { + let register: Register = serde_json::from_str(&data.to_string()).unwrap(); + ctx.state() + .addr + .send(register) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(wut) => ctx.text(wut), + _ => println!("Something is wrong"), + } + fut::ok(()) + }) + .wait(ctx) + } + _ => ctx.text(format!("!!! unknown command: {:?}", m)), + } + + // we check for /sss type of messages + // if m.starts_with('/') { + // let v: Vec<&str> = m.splitn(2, ' ').collect(); + // match v[0] { + // "/list" => { + // // Send ListRooms message to chat server and wait for + // // response + // println!("List rooms"); + // ctx.state() + // .addr + // .send(ListRooms) + // .into_actor(self) + // .then(|res, _, ctx| { + // match res { + // Ok(rooms) => { + // for room in rooms { + // ctx.text(room); + // } + // } + // _ => println!("Something is wrong"), + // } + // fut::ok(()) + // }) + // .wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list + // of rooms back + // } + // "/join" => { + // if v.len() == 2 { + // self.room = v[1].to_owned(); + // ctx.state().addr.do_send(Join { + // id: self.id, + // name: self.room.clone(), + // }); + + // ctx.text("joined"); + // } else { + // ctx.text("!!! room name is required"); + // } + // } + // "/name" => { + // if v.len() == 2 { + // self.name = Some(v[1].to_owned()); + // } else { + // ctx.text("!!! name is required"); + // } + // } + // _ => ctx.text(format!("!!! unknown command: {:?}", m)), + // } + // } else { + // let msg = if let Some(ref name) = self.name { + // format!("{}: {}", name, m) + // } else { + // m.to_owned() + // }; + // send message to chat server + // ctx.state().addr.do_send(ClientMessage { + // id: self.id, + // msg: msg, + // room: self.room.clone(), + // }) + // } + } + ws::Message::Binary(_bin) => println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + }, + } + } +} + +impl WSSession { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + ctx.state() + .addr + .do_send(Disconnect { id: act.id }); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(""); + }); + } +} + +fn main() { + let _ = env_logger::init(); + let sys = actix::System::new("rust-reddit-fediverse-server"); + + // Start chat server actor in separate thread + let server = Arbiter::start(|_| ChatServer::default()); + + // Create Http server with websocket support + HttpServer::new(move || { + // Websocket sessions state + let state = WsChatSessionState { + addr: server.clone(), + }; + + App::with_state(state) + // redirect to websocket.html + // .resource("/", |r| r.method(http::Method::GET).f(|_| { + // HttpResponse::Found() + // .header("LOCATION", "/static/websocket.html") + // .finish() + // })) + // // websocket + .resource("/service/ws", |r| r.route().f(chat_route)) + // static resources + // .handler("/static/", fs::StaticFiles::new("static/").unwrap()) + }).bind("127.0.0.1:8080") + .unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/server/src/lib.rs b/server/src/lib.rs index b1a1f252..3daeb8d2 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,7 +1,19 @@ #[macro_use] -extern crate diesel; -extern crate dotenv; -extern crate chrono; +pub extern crate diesel; +pub extern crate dotenv; +pub extern crate chrono; +pub extern crate serde; +pub extern crate serde_json; +pub extern crate actix; +pub extern crate actix_web; +pub extern crate rand; +pub extern crate strum; +#[macro_use] pub extern crate strum_macros; + +pub mod schema; +pub mod apub; +pub mod actions; +pub mod websocket_server; use diesel::*; use diesel::pg::PgConnection; @@ -9,11 +21,7 @@ use diesel::result::Error; use dotenv::dotenv; use std::env; -pub mod schema; -pub mod apub; -pub mod actions; -// pub trait Likeable; pub trait Crud<T> { fn create(conn: &PgConnection, form: T) -> Result<Self, Error> where Self: Sized; fn read(conn: &PgConnection, id: i32) -> Self; diff --git a/server/src/websocket_server/mod.rs b/server/src/websocket_server/mod.rs new file mode 100644 index 00000000..74f47ad3 --- /dev/null +++ b/server/src/websocket_server/mod.rs @@ -0,0 +1 @@ +pub mod server; diff --git a/server/src/websocket_server/server.rs b/server/src/websocket_server/server.rs new file mode 100644 index 00000000..2d410176 --- /dev/null +++ b/server/src/websocket_server/server.rs @@ -0,0 +1,269 @@ +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use actix::prelude::*; +use rand::{rngs::ThreadRng, Rng}; +use std::collections::{HashMap, HashSet}; +use serde::{Deserialize, Serialize}; + +use {Crud,establish_connection}; + +#[derive(EnumString,ToString,Debug)] +pub enum UserOperation { + Login, Register, Logout, Join, Edit, Reply, Vote, Delete, NextPage, Sticky +} + +pub enum MessageType { + Comments, Users, Ping, Pong +} + + + +/// Chat server sends this messages to session +#[derive(Message)] +pub struct WSMessage(pub String); + +/// Message for chat server communications + +/// New chat session is created +#[derive(Message)] +#[rtype(usize)] +pub struct Connect { + pub addr: Recipient<WSMessage>, +} + +/// Session is disconnected +#[derive(Message)] +pub struct Disconnect { + pub id: usize, +} + +/// Send message to specific room +#[derive(Message)] +pub struct ClientMessage { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +/// List of available rooms +pub struct ListRooms; + +impl actix::Message for ListRooms { + type Result = Vec<String>; +} + +/// Join room, if room does not exists create new one. +#[derive(Message)] +pub struct Join { + /// Client id + pub id: usize, + /// Room name + pub name: String, +} + +#[derive(Message)] +#[derive(Serialize, Deserialize)] +pub struct Login { + pub username: String, + pub password: String +} + +// #[derive(Message)] +#[derive(Serialize, Deserialize)] +pub struct Register { + username: String, + email: Option<String>, + password: String, + password_verify: String +} + +impl actix::Message for Register { + type Result = String; +} +/// `ChatServer` manages chat rooms and responsible for coordinating chat +/// session. implementation is super primitive +pub struct ChatServer { + sessions: HashMap<usize, Recipient<WSMessage>>, // A map from generated random ID to session addr + rooms: HashMap<String, HashSet<usize>>, // A map from room name to set of connectionIDs + rng: ThreadRng, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let mut rooms = HashMap::new(); + rooms.insert("Main".to_owned(), HashSet::new()); + + ChatServer { + sessions: HashMap::new(), + rooms: rooms, + rng: rand::thread_rng(), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_room_message(&self, room: &str, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(addr) = self.sessions.get(id) { + let _ = addr.do_send(WSMessage(message.to_owned())); + } + } + } + } + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context<Self>; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler<Connect> for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result { + println!("Someone joined"); + + // notify all users in same room + self.send_room_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.gen::<usize>(); + self.sessions.insert(id, msg.addr); + + // auto join session to Main room + self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + + // send id back + id + } +} + +/// Handler for Disconnect message. +impl Handler<Disconnect> for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) { + println!("Someone disconnected"); + + let mut rooms: Vec<String> = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + rooms.push(name.to_owned()); + } + } + } + // send message to other users + for room in rooms { + self.send_room_message(&room, "Someone disconnected", 0); + } + } +} + +/// Handler for Message message. +impl Handler<ClientMessage> for ChatServer { + type Result = (); + + fn handle(&mut self, msg: ClientMessage, _: &mut Context<Self>) { + self.send_room_message(&msg.room, msg.msg.as_str(), msg.id); + } +} + +/// Handler for `ListRooms` message. +impl Handler<ListRooms> for ChatServer { + type Result = MessageResult<ListRooms>; + + fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + MessageResult(rooms) + } +} + +/// Join room, send disconnect message to old room +/// send join message to new room +impl Handler<Join> for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Join, _: &mut Context<Self>) { + let Join { id, name } = msg; + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_room_message(&room, "Someone disconnected", 0); + } + + if self.rooms.get_mut(&name).is_none() { + self.rooms.insert(name.clone(), HashSet::new()); + } + self.send_room_message(&name, "Someone connected", id); + self.rooms.get_mut(&name).unwrap().insert(id); + } + +} + +impl Handler<Login> for ChatServer { + + type Result = (); + fn handle(&mut self, msg: Login, _: &mut Context<Self>) { + println!("{}", msg.password); + + } +} + +impl Handler<Register> for ChatServer { + + type Result = MessageResult<Register>; + fn handle(&mut self, msg: Register, _: &mut Context<Self>) -> Self::Result { + + use actions::user::*; + let conn = establish_connection(); + + // TODO figure out how to return values, and throw errors + + // Register the new user + let user_form = UserForm { + name: &msg.username, + email: msg.email.as_ref().map(|x| &**x), + password_encrypted: &msg.password, + preferred_username: None, + updated: None + }; + + let inserted_user = User_::create(&conn, user_form).unwrap(); + + + // Return the jwt + MessageResult("hi".to_string()) + + } +} |