From f300c67a4d9674eef05d180a787cc8352092903d Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sun, 19 Apr 2020 18:08:25 -0400 Subject: Adding websocket notification system. - HTTP and APUB clients can now send live updating messages to websocket clients - Rate limiting now affects both HTTP and websockets - Rate limiting / Websocket logic is now moved into the API Perform functions. - TODO This broke getting current online users, but that will have to wait for the perform trait to be made async. - Fixes #446 --- server/src/api/post.rs | 176 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 163 insertions(+), 13 deletions(-) (limited to 'server/src/api/post.rs') diff --git a/server/src/api/post.rs b/server/src/api/post.rs index fb022589..19f16014 100644 --- a/server/src/api/post.rs +++ b/server/src/api/post.rs @@ -1,6 +1,4 @@ use super::*; -use diesel::PgConnection; -use std::str::FromStr; #[derive(Serialize, Deserialize)] pub struct CreatePost { @@ -80,7 +78,12 @@ pub struct SavePost { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &CreatePost = &self.data; let claims = match Claims::decode(&data.auth) { @@ -100,6 +103,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = &rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_post(&rl.ip, true)?; + } + + let conn = pool.get()?; + // Check for a community ban if CommunityUserBanView::get(&conn, user_id, data.community_id).is_ok() { return Err(APIError::err("community_ban").into()); @@ -164,12 +176,34 @@ impl Perform for Oper { Err(_e) => return Err(APIError::err("couldnt_find_post").into()), }; - Ok(PostResponse { post: post_view }) + if let Some(rl) = &rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_post(&rl.ip, false)?; + } + + let res = PostResponse { post: post_view }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendPost { + op: UserOperation::CreatePost, + post: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetPost = &self.data; let user_id: Option = match &data.auth { @@ -183,6 +217,15 @@ impl Perform for Oper { None => None, }; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let post_view = match PostView::read(&conn, data.id, user_id) { Ok(post) => post, Err(_e) => return Err(APIError::err("couldnt_find_post").into()), @@ -204,6 +247,24 @@ impl Perform for Oper { let creator_user = admins.remove(creator_index); admins.insert(0, creator_user); + let online = if let Some(ws) = websocket_info { + if let Some(id) = ws.id { + ws.chatserver.do_send(JoinPostRoom { + post_id: data.id, + id, + }); + } + + // TODO + 1 + // let fut = async { + // ws.chatserver.send(GetPostUsersOnline {post_id: data.id}).await.unwrap() + // }; + // Runtime::new().unwrap().block_on(fut) + } else { + 0 + }; + // Return the jwt Ok(GetPostResponse { post: post_view, @@ -211,13 +272,18 @@ impl Perform for Oper { community, moderators, admins, - online: 0, + online, }) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetPosts = &self.data; let user_claims: Option = match &data.auth { @@ -241,6 +307,15 @@ impl Perform for Oper { let type_ = ListingType::from_str(&data.type_)?; let sort = SortType::from_str(&data.sort)?; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let posts = match PostQueryBuilder::create(&conn) .listing_type(type_) .sort(&sort) @@ -255,12 +330,31 @@ impl Perform for Oper { Err(_e) => return Err(APIError::err("couldnt_get_posts").into()), }; + if let Some(ws) = websocket_info { + // You don't need to join the specific community room, bc this is already handled by + // GetCommunity + if data.community_id.is_none() { + if let Some(id) = ws.id { + // 0 is the "all" community + ws.chatserver.do_send(JoinCommunityRoom { + community_id: 0, + id, + }); + } + } + } + Ok(GetPostsResponse { posts }) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &CreatePostLike = &self.data; let claims = match Claims::decode(&data.auth) { @@ -270,6 +364,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Don't do a downvote if site has downvotes disabled if data.score == -1 { let site = SiteView::read(&conn)?; @@ -312,13 +415,27 @@ impl Perform for Oper { Err(_e) => return Err(APIError::err("couldnt_find_post").into()), }; - // just output the score - Ok(PostResponse { post: post_view }) + let res = PostResponse { post: post_view }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendPost { + op: UserOperation::CreatePostLike, + post: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &EditPost = &self.data; if let Err(slurs) = slur_check(&data.name) { @@ -338,6 +455,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Verify its the creator or a mod or admin let mut editors: Vec = vec![data.creator_id]; editors.append( @@ -427,12 +553,27 @@ impl Perform for Oper { let post_view = PostView::read(&conn, data.edit_id, Some(user_id))?; - Ok(PostResponse { post: post_view }) + let res = PostResponse { post: post_view }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendPost { + op: UserOperation::EditPost, + post: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &SavePost = &self.data; let claims = match Claims::decode(&data.auth) { @@ -447,6 +588,15 @@ impl Perform for Oper { user_id, }; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + if data.save { match PostSaved::save(&conn, &post_saved_form) { Ok(post) => post, -- cgit v1.2.3