diff options
Diffstat (limited to 'server/src/websocket/server.rs')
-rw-r--r-- | server/src/websocket/server.rs | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs new file mode 100644 index 00000000..cd931e1f --- /dev/null +++ b/server/src/websocket/server.rs @@ -0,0 +1,486 @@ +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use actix::prelude::*; +use rand::{rngs::ThreadRng, Rng}; +use std::collections::{HashMap, HashSet}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value}; +use std::str::FromStr; +use failure::Error; +use std::time::{SystemTime}; + +use api::*; +use api::user::*; +use api::community::*; +use api::post::*; +use api::comment::*; +use api::site::*; + +const RATE_LIMIT_MESSAGES: i32 = 30; +const RATE_LIMIT_PER_SECOND: i32 = 60; +const RATE_LIMIT_REGISTER_MESSAGES: i32 = 1; +const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60; + + +/// Chat server sends this messages to session +#[derive(Message)] +pub struct WSMessage(pub String); + +/// Message for chat server communications + +/// New chat session is created +#[derive(Message)] +#[rtype(usize)] +pub struct Connect { + pub addr: Recipient<WSMessage>, + pub ip: String, +} + +/// Session is disconnected +#[derive(Message)] +pub struct Disconnect { + pub id: usize, + pub ip: String, +} + +/// Send message to specific room +#[derive(Message)] +pub struct ClientMessage { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +#[derive(Serialize, Deserialize)] +pub struct StandardMessage { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, +} + +impl actix::Message for StandardMessage { + type Result = String; +} + +#[derive(Debug)] +pub struct RateLimitBucket { + last_checked: SystemTime, + allowance: f64 +} + +pub struct SessionInfo { + pub addr: Recipient<WSMessage>, + pub ip: String, +} + +/// `ChatServer` manages chat rooms and responsible for coordinating chat +/// session. implementation is super primitive +pub struct ChatServer { + sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr + rate_limits: HashMap<String, RateLimitBucket>, + rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs + rng: ThreadRng, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let rooms = HashMap::new(); + + ChatServer { + sessions: HashMap::new(), + rate_limits: HashMap::new(), + rooms: rooms, + rng: rand::thread_rng(), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); + } + } + } + } + } + + fn join_room(&mut self, room_id: i32, id: usize) { + // remove session from all rooms + for (_n, mut sessions) in &mut self.rooms { + sessions.remove(&id); + } + + // If the room doesn't exist yet + if self.rooms.get_mut(&room_id).is_none() { + self.rooms.insert(room_id, HashSet::new()); + } + + &self.rooms.get_mut(&room_id).unwrap().insert(id); + } + + fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> { + use db::*; + use db::post_view::*; + let conn = establish_connection(); + let posts = PostView::list(&conn, + PostListingType::Community, + &SortType::New, + Some(*community_id), + None, + None, + None, + false, + false, + None, + Some(9999))?; + for post in posts { + self.send_room_message(&post.id, message, skip_id); + } + + Ok(()) + } + + fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> { + self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND) + } + + fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> { + self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND) + } + + fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> { + if let Some(info) = self.sessions.get(&id) { + if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) { + // The initial value + if rate_limit.allowance == -2f64 { + rate_limit.allowance = rate as f64; + }; + + let current = SystemTime::now(); + let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64; + rate_limit.last_checked = current; + rate_limit.allowance += time_passed * (rate as f64 / per as f64); + if rate_limit.allowance > rate as f64 { + rate_limit.allowance = rate as f64; + } + + if rate_limit.allowance < 1.0 { + println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance); + Err(APIError { + op: "Rate Limit".to_string(), + message: format!("Too many requests. {} per {} seconds", rate, per), + })? + } else { + rate_limit.allowance -= 1.0; + Ok(()) + } + } else { + Ok(()) + } + } else { + Ok(()) + } + } +} + + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context<Self>; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler<Connect> for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result { + + // notify all users in same room + // self.send_room_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.gen::<usize>(); + println!("{} joined", &msg.ip); + + self.sessions.insert(id, SessionInfo { + addr: msg.addr, + ip: msg.ip.to_owned(), + }); + + if self.rate_limits.get(&msg.ip).is_none() { + self.rate_limits.insert(msg.ip, RateLimitBucket { + last_checked: SystemTime::now(), + allowance: -2f64, + }); + } + + // for (k,v) in &self.rate_limits { + // println!("{}: {:?}", k,v); + // } + + // auto join session to Main room + // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + + // send id back + id + } +} + +/// Handler for Disconnect message. +impl Handler<Disconnect> for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) { + + // let mut rooms: Vec<i32> = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (_id, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + // rooms.push(*id); + } + } + } + } +} + +/// Handler for Message message. +impl Handler<StandardMessage> for ChatServer { + type Result = MessageResult<StandardMessage>; + + + fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result { + + let msg_out = match parse_json_message(self, msg) { + Ok(m) => m, + Err(e) => e.to_string() + }; + + MessageResult(msg_out) + } +} + +fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> { + + let json: Value = serde_json::from_str(&msg.msg)?; + let data = &json["data"].to_string(); + let op = &json["op"].as_str().unwrap(); + + let user_operation: UserOperation = UserOperation::from_str(&op)?; + + match user_operation { + UserOperation::Login => { + let login: Login = serde_json::from_str(data)?; + let res = Oper::new(user_operation, login).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::Register => { + chat.check_rate_limit_register(msg.id)?; + let register: Register = serde_json::from_str(data)?; + let res = Oper::new(user_operation, register).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetUserDetails => { + let get_user_details: GetUserDetails = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_user_details).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::AddAdmin => { + let add_admin: AddAdmin = serde_json::from_str(data)?; + let res = Oper::new(user_operation, add_admin).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::BanUser => { + let ban_user: BanUser = serde_json::from_str(data)?; + let res = Oper::new(user_operation, ban_user).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetReplies => { + let get_replies: GetReplies = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_replies).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::MarkAllAsRead => { + let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?; + let res = Oper::new(user_operation, mark_all_as_read).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetCommunity => { + let get_community: GetCommunity = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_community).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::ListCommunities => { + let list_communities: ListCommunities = serde_json::from_str(data)?; + let res = Oper::new(user_operation, list_communities).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreateCommunity => { + chat.check_rate_limit_register(msg.id)?; + let create_community: CreateCommunity = serde_json::from_str(data)?; + let res = Oper::new(user_operation, create_community).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::EditCommunity => { + let edit_community: EditCommunity = serde_json::from_str(data)?; + let res = Oper::new(user_operation, edit_community).perform()?; + let mut community_sent: CommunityResponse = res.clone(); + community_sent.community.user_id = None; + community_sent.community.subscribed = None; + let community_sent_str = serde_json::to_string(&community_sent)?; + chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::FollowCommunity => { + let follow_community: FollowCommunity = serde_json::from_str(data)?; + let res = Oper::new(user_operation, follow_community).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetFollowedCommunities => { + let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?; + let res = Oper::new(user_operation, followed_communities).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::BanFromCommunity => { + let ban_from_community: BanFromCommunity = serde_json::from_str(data)?; + let community_id = ban_from_community.community_id; + let res = Oper::new(user_operation, ban_from_community).perform()?; + let res_str = serde_json::to_string(&res)?; + chat.send_community_message(&community_id, &res_str, msg.id)?; + Ok(res_str) + }, + UserOperation::AddModToCommunity => { + let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?; + let community_id = mod_add_to_community.community_id; + let res = Oper::new(user_operation, mod_add_to_community).perform()?; + let res_str = serde_json::to_string(&res)?; + chat.send_community_message(&community_id, &res_str, msg.id)?; + Ok(res_str) + }, + UserOperation::ListCategories => { + let list_categories: ListCategories = ListCategories; + let res = Oper::new(user_operation, list_categories).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreatePost => { + chat.check_rate_limit_register(msg.id)?; + let create_post: CreatePost = serde_json::from_str(data)?; + let res = Oper::new(user_operation, create_post).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetPost => { + let get_post: GetPost = serde_json::from_str(data)?; + chat.join_room(get_post.id, msg.id); + let res = Oper::new(user_operation, get_post).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetPosts => { + let get_posts: GetPosts = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_posts).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreatePostLike => { + chat.check_rate_limit(msg.id)?; + let create_post_like: CreatePostLike = serde_json::from_str(data)?; + let res = Oper::new(user_operation, create_post_like).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::EditPost => { + let edit_post: EditPost = serde_json::from_str(data)?; + let res = Oper::new(user_operation, edit_post).perform()?; + let mut post_sent = res.clone(); + post_sent.post.my_vote = None; + let post_sent_str = serde_json::to_string(&post_sent)?; + chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id); + Ok(serde_json::to_string(&res)?) + }, + UserOperation::SavePost => { + let save_post: SavePost = serde_json::from_str(data)?; + let res = Oper::new(user_operation, save_post).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreateComment => { + chat.check_rate_limit(msg.id)?; + let create_comment: CreateComment = serde_json::from_str(data)?; + let post_id = create_comment.post_id; + let res = Oper::new(user_operation, create_comment).perform()?; + let mut comment_sent = res.clone(); + comment_sent.comment.my_vote = None; + comment_sent.comment.user_id = None; + let comment_sent_str = serde_json::to_string(&comment_sent)?; + chat.send_room_message(&post_id, &comment_sent_str, msg.id); + Ok(serde_json::to_string(&res)?) + }, + UserOperation::EditComment => { + let edit_comment: EditComment = serde_json::from_str(data)?; + let post_id = edit_comment.post_id; + let res = Oper::new(user_operation, edit_comment).perform()?; + let mut comment_sent = res.clone(); + comment_sent.comment.my_vote = None; + comment_sent.comment.user_id = None; + let comment_sent_str = serde_json::to_string(&comment_sent)?; + chat.send_room_message(&post_id, &comment_sent_str, msg.id); + Ok(serde_json::to_string(&res)?) + }, + UserOperation::SaveComment => { + let save_comment: SaveComment = serde_json::from_str(data)?; + let res = Oper::new(user_operation, save_comment).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreateCommentLike => { + chat.check_rate_limit(msg.id)?; + let create_comment_like: CreateCommentLike = serde_json::from_str(data)?; + let post_id = create_comment_like.post_id; + let res = Oper::new(user_operation, create_comment_like).perform()?; + let mut comment_sent = res.clone(); + comment_sent.comment.my_vote = None; + comment_sent.comment.user_id = None; + let comment_sent_str = serde_json::to_string(&comment_sent)?; + chat.send_room_message(&post_id, &comment_sent_str, msg.id); + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetModlog => { + let get_modlog: GetModlog = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_modlog).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::CreateSite => { + let create_site: CreateSite = serde_json::from_str(data)?; + let res = Oper::new(user_operation, create_site).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::EditSite => { + let edit_site: EditSite = serde_json::from_str(data)?; + let res = Oper::new(user_operation, edit_site).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::GetSite => { + let get_site: GetSite = serde_json::from_str(data)?; + let res = Oper::new(user_operation, get_site).perform()?; + Ok(serde_json::to_string(&res)?) + }, + UserOperation::Search => { + let search: Search = serde_json::from_str(data)?; + let res = Oper::new(user_operation, search).perform()?; + Ok(serde_json::to_string(&res)?) + }, + } +} |