diff options
Diffstat (limited to 'server/src/websocket/server.rs')
-rw-r--r-- | server/src/websocket/server.rs | 338 |
1 files changed, 249 insertions, 89 deletions
diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index f26ae0ec..9b53f21d 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -22,6 +22,12 @@ use crate::api::*; use crate::websocket::UserOperation; use crate::Settings; +type ConnectionId = usize; +type PostId = i32; +type CommunityId = i32; +type UserId = i32; +type IPAddr = String; + /// Chat server sends this messages to session #[derive(Message)] #[rtype(result = "()")] @@ -34,35 +40,22 @@ pub struct WSMessage(pub String); #[rtype(usize)] pub struct Connect { pub addr: Recipient<WSMessage>, - pub ip: String, + pub ip: IPAddr, } /// Session is disconnected #[derive(Message)] #[rtype(result = "()")] pub struct Disconnect { - pub id: usize, - pub ip: String, -} - -// TODO this is unused rn -/// Send message to specific room -#[derive(Message)] -#[rtype(result = "()")] -pub struct ClientMessage { - /// Id of the client session - pub id: usize, - /// Peer message - pub msg: String, - /// Room name - pub room: String, + pub id: ConnectionId, + pub ip: IPAddr, } #[derive(Serialize, Deserialize, Message)] #[rtype(String)] pub struct StandardMessage { /// Id of the client session - pub id: usize, + pub id: ConnectionId, /// Peer message pub msg: String, } @@ -75,83 +68,146 @@ pub struct RateLimitBucket { pub struct SessionInfo { pub addr: Recipient<WSMessage>, - pub ip: String, + pub ip: IPAddr, } /// `ChatServer` manages chat rooms and responsible for coordinating chat -/// session. implementation is super primitive +/// session. pub struct ChatServer { - sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr - rate_limits: HashMap<String, RateLimitBucket>, - rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs + /// A map from generated random ID to session addr + sessions: HashMap<ConnectionId, SessionInfo>, + + /// A map from post_id to set of connectionIDs + post_rooms: HashMap<PostId, HashSet<ConnectionId>>, + + /// A map from community to set of connectionIDs + community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>, + + /// A map from user id to its connection ID for joined users. Remember a user can have multiple + /// sessions (IE clients) + user_rooms: HashMap<UserId, HashSet<ConnectionId>>, + + /// Rate limiting based on IP addr + rate_limits: HashMap<IPAddr, RateLimitBucket>, + rng: ThreadRng, db: Pool<ConnectionManager<PgConnection>>, } +// TODO show online users for communities too +// TODO GetPosts is the community / front page join. +// What is sent: New posts, post edits, post removes, post likes, community edits, community mod adds. Notifs for new posts? +// GetPost is the PostJoin, LeavePost is the leave +// What is sent: New comments, comment edits, comment likes +// UserJoin is the user join, a disconnect should remove you from all the scopes impl ChatServer { pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer { - // default room - let rooms = HashMap::new(); - ChatServer { sessions: HashMap::new(), rate_limits: HashMap::new(), - rooms, + post_rooms: HashMap::new(), + community_rooms: HashMap::new(), + user_rooms: HashMap::new(), rng: rand::thread_rng(), db, } } - /// Send message to all users in the room - fn send_room_message(&self, room: i32, message: &str, skip_id: usize) { - if let Some(sessions) = self.rooms.get(&room) { - for id in sessions { - if *id != skip_id { - if let Some(info) = self.sessions.get(id) { - let _ = info.addr.do_send(WSMessage(message.to_owned())); - } - } - } + fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) { + // remove session from all rooms + for sessions in self.community_rooms.values_mut() { + sessions.remove(&id); } + + // If the room doesn't exist yet + if self.community_rooms.get_mut(&community_id).is_none() { + self.community_rooms.insert(community_id, HashSet::new()); + } + + self + .community_rooms + .get_mut(&community_id) + .unwrap() + .insert(id); } - fn join_room(&mut self, room_id: i32, id: usize) { + fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) { // remove session from all rooms - for sessions in self.rooms.values_mut() { + for sessions in self.post_rooms.values_mut() { sessions.remove(&id); } // If the room doesn't exist yet - if self.rooms.get_mut(&room_id).is_none() { - self.rooms.insert(room_id, HashSet::new()); + if self.post_rooms.get_mut(&post_id).is_none() { + self.post_rooms.insert(post_id, HashSet::new()); } - self.rooms.get_mut(&room_id).unwrap().insert(id); + self.post_rooms.get_mut(&post_id).unwrap().insert(id); } - fn send_community_message( - &self, - community_id: i32, - message: &str, - skip_id: usize, - ) -> Result<(), Error> { - use crate::db::post_view::*; - use crate::db::*; + fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) { + // remove session from all rooms + for sessions in self.user_rooms.values_mut() { + sessions.remove(&id); + } + + // If the room doesn't exist yet + if self.user_rooms.get_mut(&user_id).is_none() { + self.user_rooms.insert(user_id, HashSet::new()); + } - let conn = self.db.get()?; + self.user_rooms.get_mut(&user_id).unwrap().insert(id); + } - let posts = PostQueryBuilder::create(&conn) - .listing_type(ListingType::Community) - .sort(&SortType::New) - .for_community_id(community_id) - .limit(9999) - .list()?; + fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) { + if let Some(sessions) = self.post_rooms.get(&post_id) { + for id in sessions { + if *id != skip_id { + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); + } + } + } + } + } - for post in posts { - self.send_room_message(post.id, message, skip_id); + fn send_community_room_message( + &self, + community_id: CommunityId, + message: &str, + skip_id: ConnectionId, + ) { + if let Some(sessions) = self.community_rooms.get(&community_id) { + for id in sessions { + if *id != skip_id { + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); + } + } + } + } + } + + fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) { + if let Some(sessions) = self.user_rooms.get(&user_id) { + for id in sessions { + if *id != skip_id { + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); + } + } + } } + } - Ok(()) + fn send_all_message(&self, message: &str, skip_id: ConnectionId) { + for id in self.sessions.keys() { + if *id != skip_id { + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); + } + } + } } fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> { @@ -233,9 +289,6 @@ impl Handler<Connect> for ChatServer { type Result = usize; fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result { - // notify all users in same room - // self.send_room_message(&"Main".to_owned(), "Someone joined", 0); - // register session with random id let id = self.rng.gen::<usize>(); println!("{} joined", &msg.ip); @@ -267,15 +320,18 @@ impl Handler<Disconnect> for ChatServer { type Result = (); fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) { - // let mut rooms: Vec<i32> = Vec::new(); - - // remove address + // Remove connections from sessions and all 3 scopes if self.sessions.remove(&msg.id).is_some() { - // remove session from all rooms - for sessions in self.rooms.values_mut() { - if sessions.remove(&msg.id) { - // rooms.push(*id); - } + for sessions in self.user_rooms.values_mut() { + sessions.remove(&msg.id); + } + + for sessions in self.post_rooms.values_mut() { + sessions.remove(&msg.id); + } + + for sessions in self.community_rooms.values_mut() { + sessions.remove(&msg.id); } } } @@ -354,10 +410,18 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn) } UserOperation::AddAdmin => { - do_user_operation::<AddAdmin, AddAdminResponse>(user_operation, data, &conn) + let add_admin: AddAdmin = serde_json::from_str(data)?; + let res = Oper::new(add_admin).perform(&conn)?; + let res_str = to_json_string(&user_operation, &res)?; + chat.send_all_message(&res_str, msg.id); + Ok(res_str) } UserOperation::BanUser => { - do_user_operation::<BanUser, BanUserResponse>(user_operation, data, &conn) + let ban_user: BanUser = serde_json::from_str(data)?; + let res = Oper::new(ban_user).perform(&conn)?; + let res_str = to_json_string(&user_operation, &res)?; + chat.send_all_message(&res_str, msg.id); + Ok(res_str) } UserOperation::GetReplies => { do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn) @@ -372,7 +436,19 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn) } UserOperation::GetCommunity => { - do_user_operation::<GetCommunity, GetCommunityResponse>(user_operation, data, &conn) + let get_community: GetCommunity = serde_json::from_str(data)?; + let mut res = Oper::new(get_community).perform(&conn)?; + let community_id = res.community.id; + + chat.join_community_room(community_id, msg.id); + + res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) { + community_users.len() + } else { + 0 + }; + + to_json_string(&user_operation, &res) } UserOperation::ListCommunities => { do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn) @@ -388,7 +464,7 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str community_sent.community.user_id = None; community_sent.community.subscribed = None; let community_sent_str = to_json_string(&user_operation, &community_sent)?; - chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?; + chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id); to_json_string(&user_operation, &res) } UserOperation::FollowCommunity => { @@ -403,7 +479,7 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str let community_id = ban_from_community.community_id; let res = Oper::new(ban_from_community).perform(&conn)?; let res_str = to_json_string(&user_operation, &res)?; - chat.send_community_message(community_id, &res_str, msg.id)?; + chat.send_community_room_message(community_id, &res_str, msg.id); Ok(res_str) } UserOperation::AddModToCommunity => { @@ -411,7 +487,7 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str let community_id = mod_add_to_community.community_id; let res = Oper::new(mod_add_to_community).perform(&conn)?; let res_str = to_json_string(&user_operation, &res)?; - chat.send_community_message(community_id, &res_str, msg.id)?; + chat.send_community_room_message(community_id, &res_str, msg.id); Ok(res_str) } UserOperation::ListCategories => { @@ -419,28 +495,82 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str } UserOperation::CreatePost => { chat.check_rate_limit_post(msg.id)?; - do_user_operation::<CreatePost, PostResponse>(user_operation, data, &conn) + let create_post: CreatePost = serde_json::from_str(data)?; + let community_id = create_post.community_id; + let res = Oper::new(create_post).perform(&conn)?; + let res_str = to_json_string(&user_operation, &res)?; + + // Don't send my data with it + let mut post_sent = res.clone(); + post_sent.post.my_vote = None; + post_sent.post.user_id = None; + let post_sent_str = to_json_string(&user_operation, &post_sent)?; + + // Send it to /c/all and that community + chat.send_community_room_message(0, &post_sent_str, msg.id); + chat.send_community_room_message(community_id, &post_sent_str, msg.id); + + Ok(res_str) } UserOperation::GetPost => { let get_post: GetPost = serde_json::from_str(data)?; - chat.join_room(get_post.id, msg.id); - let res = Oper::new(get_post).perform(&conn)?; + let post_id = get_post.id; + chat.join_post_room(post_id, msg.id); + let mut res = Oper::new(get_post).perform(&conn)?; + + res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) { + post_users.len() + } else { + 0 + }; + to_json_string(&user_operation, &res) } UserOperation::GetPosts => { - do_user_operation::<GetPosts, GetPostsResponse>(user_operation, data, &conn) + let get_posts: GetPosts = serde_json::from_str(data)?; + if get_posts.community_id.is_none() { + // 0 is the "all" community + chat.join_community_room(0, msg.id); + } + let res = Oper::new(get_posts).perform(&conn)?; + to_json_string(&user_operation, &res) + } + UserOperation::UserJoin => { + let user_join: UserJoin = serde_json::from_str(data)?; + let res = Oper::new(user_join).perform(&conn)?; + chat.join_user_room(res.user_id, msg.id); + to_json_string(&user_operation, &res) } UserOperation::CreatePostLike => { chat.check_rate_limit_message(msg.id)?; - do_user_operation::<CreatePostLike, CreatePostLikeResponse>(user_operation, data, &conn) + let create_post_like: CreatePostLike = serde_json::from_str(data)?; + let res = Oper::new(create_post_like).perform(&conn)?; + let community_id = res.post.community_id; + let res_str = to_json_string(&user_operation, &res)?; + + // Don't send my data with it + let mut post_sent = res.clone(); + post_sent.post.my_vote = None; + post_sent.post.user_id = None; + let post_sent_str = to_json_string(&user_operation, &post_sent)?; + + // Send it to /c/all and that community + chat.send_community_room_message(0, &post_sent_str, msg.id); + chat.send_community_room_message(community_id, &post_sent_str, msg.id); + + Ok(res_str) } UserOperation::EditPost => { let edit_post: EditPost = serde_json::from_str(data)?; let res = Oper::new(edit_post).perform(&conn)?; let mut post_sent = res.clone(); post_sent.post.my_vote = None; + post_sent.post.user_id = None; let post_sent_str = to_json_string(&user_operation, &post_sent)?; - chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id); + + // Send it to /c/all and that community + chat.send_community_room_message(0, &post_sent_str, msg.id); + chat.send_community_room_message(post_sent.post.community_id, &post_sent_str, msg.id); to_json_string(&user_operation, &res) } UserOperation::SavePost => { @@ -455,7 +585,15 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str comment_sent.comment.my_vote = None; comment_sent.comment.user_id = None; let comment_sent_str = to_json_string(&user_operation, &comment_sent)?; - chat.send_room_message(post_id, &comment_sent_str, msg.id); + + // Send it to the post room + chat.send_post_room_message(post_id, &comment_sent_str, msg.id); + + // Send it to the recipient(s) including the mentioned users + for recipient_id in comment_sent.recipient_ids { + chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id); + } + to_json_string(&user_operation, &res) } UserOperation::EditComment => { @@ -466,7 +604,14 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str comment_sent.comment.my_vote = None; comment_sent.comment.user_id = None; let comment_sent_str = to_json_string(&user_operation, &comment_sent)?; - chat.send_room_message(post_id, &comment_sent_str, msg.id); + + chat.send_post_room_message(post_id, &comment_sent_str, msg.id); + + // Send it to the recipient(s) including the mentioned users + for recipient_id in comment_sent.recipient_ids { + chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id); + } + to_json_string(&user_operation, &res) } UserOperation::SaveComment => { @@ -481,7 +626,13 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str comment_sent.comment.my_vote = None; comment_sent.comment.user_id = None; let comment_sent_str = to_json_string(&user_operation, &comment_sent)?; - chat.send_room_message(post_id, &comment_sent_str, msg.id); + + chat.send_post_room_message(post_id, &comment_sent_str, msg.id); + + // Send it to the recipient(s) including the mentioned users + for recipient_id in comment_sent.recipient_ids { + chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id); + } to_json_string(&user_operation, &res) } UserOperation::GetModlog => { @@ -491,13 +642,16 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn) } UserOperation::EditSite => { - do_user_operation::<EditSite, SiteResponse>(user_operation, data, &conn) + let edit_site: EditSite = serde_json::from_str(data)?; + let res = Oper::new(edit_site).perform(&conn)?; + let res_str = to_json_string(&user_operation, &res)?; + chat.send_all_message(&res_str, msg.id); + Ok(res_str) } UserOperation::GetSite => { - let online: usize = chat.sessions.len(); let get_site: GetSite = serde_json::from_str(data)?; let mut res = Oper::new(get_site).perform(&conn)?; - res.online = online; + res.online = chat.sessions.len(); to_json_string(&user_operation, &res) } UserOperation::Search => { @@ -520,7 +674,13 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str } UserOperation::CreatePrivateMessage => { chat.check_rate_limit_message(msg.id)?; - do_user_operation::<CreatePrivateMessage, PrivateMessageResponse>(user_operation, data, &conn) + let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?; + let recipient_id = create_private_message.recipient_id; + let res = Oper::new(create_private_message).perform(&conn)?; + let res_str = to_json_string(&user_operation, &res)?; + + chat.send_user_room_message(recipient_id, &res_str, msg.id); + Ok(res_str) } UserOperation::EditPrivateMessage => { do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn) |