From f300c67a4d9674eef05d180a787cc8352092903d Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sun, 19 Apr 2020 18:08:25 -0400 Subject: Adding websocket notification system. - HTTP and APUB clients can now send live updating messages to websocket clients - Rate limiting now affects both HTTP and websockets - Rate limiting / Websocket logic is now moved into the API Perform functions. - TODO This broke getting current online users, but that will have to wait for the perform trait to be made async. - Fixes #446 --- server/src/api/site.rs | 187 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 167 insertions(+), 20 deletions(-) (limited to 'server/src/api/site.rs') diff --git a/server/src/api/site.rs b/server/src/api/site.rs index 3720a2c4..891f52a4 100644 --- a/server/src/api/site.rs +++ b/server/src/api/site.rs @@ -1,10 +1,5 @@ +use super::user::Register; use super::*; -use crate::api::user::Register; -use crate::api::{Oper, Perform}; -use crate::settings::Settings; -use diesel::PgConnection; -use log::info; -use std::str::FromStr; #[derive(Serialize, Deserialize)] pub struct ListCategories {} @@ -78,7 +73,7 @@ pub struct EditSite { #[derive(Serialize, Deserialize)] pub struct GetSite {} -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct SiteResponse { site: SiteView, } @@ -114,9 +109,23 @@ pub struct SaveSiteConfig { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let _data: &ListCategories = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let categories: Vec = Category::list_all(&conn)?; // Return the jwt @@ -125,9 +134,23 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetModlog = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let removed_posts = ModRemovePostView::list( &conn, data.community_id, @@ -198,7 +221,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &CreateSite = &self.data; let claims = match Claims::decode(&data.auth) { @@ -218,6 +246,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Make sure user is an admin if !UserView::read(&conn, user_id)?.admin { return Err(APIError::err("not_an_admin").into()); @@ -245,7 +282,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &EditSite = &self.data; let claims = match Claims::decode(&data.auth) { @@ -265,6 +307,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Make sure user is an admin if !UserView::read(&conn, user_id)?.admin { return Err(APIError::err("not_an_admin").into()); @@ -289,14 +340,39 @@ impl Perform for Oper { let site_view = SiteView::read(&conn)?; - Ok(SiteResponse { site: site_view }) + let res = SiteResponse { site: site_view }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendAllMessage { + op: UserOperation::EditSite, + response: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let _data: &GetSite = &self.data; + if let Some(rl) = &rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + + // TODO refactor this a little let site = Site::read(&conn, 1); let site_view = if site.is_ok() { Some(SiteView::read(&conn)?) @@ -309,7 +385,11 @@ impl Perform for Oper { admin: true, show_nsfw: true, }; - let login_response = Oper::new(register).perform(&conn)?; + let login_response = Oper::new(register).perform( + pool.clone(), + websocket_info.clone(), + rate_limit_info.clone(), + )?; info!("Admin {} created", setup.admin_username); let create_site = CreateSite { @@ -320,7 +400,7 @@ impl Perform for Oper { enable_nsfw: false, auth: login_response.jwt, }; - Oper::new(create_site).perform(&conn)?; + Oper::new(create_site).perform(pool, websocket_info.clone(), rate_limit_info)?; info!("Site {} created", setup.site_name); Some(SiteView::read(&conn)?) } else { @@ -337,17 +417,33 @@ impl Perform for Oper { let banned = UserView::banned(&conn)?; + let online = if let Some(_ws) = websocket_info { + // TODO + 1 + // let fut = async { + // ws.chatserver.send(GetUsersOnline).await.unwrap() + // }; + // Runtime::new().unwrap().block_on(fut) + } else { + 0 + }; + Ok(GetSiteResponse { site: site_view, admins, banned, - online: 0, + online, }) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &Search = &self.data; let user_id: Option = match &data.auth { @@ -371,6 +467,15 @@ impl Perform for Oper { // TODO no clean / non-nsfw searching rn + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + match type_ { SearchType::Posts => { posts = PostQueryBuilder::create(&conn) @@ -465,7 +570,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &TransferSite = &self.data; let claims = match Claims::decode(&data.auth) { @@ -475,6 +585,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let read_site = Site::read(&conn, 1)?; // Make sure user is the creator @@ -528,7 +647,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetSiteConfig = &self.data; let claims = match Claims::decode(&data.auth) { @@ -538,6 +662,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Only let admins read this let admins = UserView::admins(&conn)?; let admin_ids: Vec = admins.into_iter().map(|m| m.id).collect(); @@ -553,7 +686,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &SaveSiteConfig = &self.data; let claims = match Claims::decode(&data.auth) { @@ -563,6 +701,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Only let admins read this let admins = UserView::admins(&conn)?; let admin_ids: Vec = admins.into_iter().map(|m| m.id).collect(); -- cgit v1.2.3