From f300c67a4d9674eef05d180a787cc8352092903d Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sun, 19 Apr 2020 18:08:25 -0400 Subject: Adding websocket notification system. - HTTP and APUB clients can now send live updating messages to websocket clients - Rate limiting now affects both HTTP and websockets - Rate limiting / Websocket logic is now moved into the API Perform functions. - TODO This broke getting current online users, but that will have to wait for the perform trait to be made async. - Fixes #446 --- server/src/api/user.rs | 325 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 297 insertions(+), 28 deletions(-) (limited to 'server/src/api/user.rs') diff --git a/server/src/api/user.rs b/server/src/api/user.rs index 40e09969..31a0a4e7 100644 --- a/server/src/api/user.rs +++ b/server/src/api/user.rs @@ -1,10 +1,5 @@ use super::*; -use crate::settings::Settings; -use crate::{generate_random_string, send_email}; use bcrypt::verify; -use diesel::PgConnection; -use log::error; -use std::str::FromStr; #[derive(Serialize, Deserialize, Debug)] pub struct Login { @@ -89,7 +84,7 @@ pub struct AddAdmin { auth: String, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct AddAdminResponse { admins: Vec, } @@ -103,7 +98,7 @@ pub struct BanUser { auth: String, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct BanUserResponse { user: UserView, banned: bool, @@ -205,9 +200,23 @@ pub struct UserJoinResponse { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &Login = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Fetch that username / email let user: User_ = match User_::find_by_email_or_username(&conn, &data.username_or_email) { Ok(user) => user, @@ -226,9 +235,23 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &Register = &self.data; + if let Some(rl) = &rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_register(&rl.ip, true)?; + } + + let conn = pool.get()?; + // Make sure site has open registration if let Ok(site) = SiteView::read(&conn) { if !site.open_registration { @@ -332,6 +355,13 @@ impl Perform for Oper { }; } + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_register(&rl.ip, false)?; + } + // Return the jwt Ok(LoginResponse { jwt: inserted_user.jwt(), @@ -340,7 +370,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &SaveUserSettings = &self.data; let claims = match Claims::decode(&data.auth) { @@ -350,6 +385,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let read_user = User_::read(&conn, user_id)?; let email = match &data.email { @@ -428,9 +472,23 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetUserDetails = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let user_claims: Option = match &data.auth { Some(auth) => match Claims::decode(&auth) { Ok(claims) => Some(claims.claims), @@ -525,7 +583,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &AddAdmin = &self.data; let claims = match Claims::decode(&data.auth) { @@ -535,6 +598,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Make sure user is an admin if !UserView::read(&conn, user_id)?.admin { return Err(APIError::err("not_an_admin").into()); @@ -583,12 +655,27 @@ impl Perform for Oper { let creator_user = admins.remove(creator_index); admins.insert(0, creator_user); - Ok(AddAdminResponse { admins }) + let res = AddAdminResponse { admins }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendAllMessage { + op: UserOperation::AddAdmin, + response: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &BanUser = &self.data; let claims = match Claims::decode(&data.auth) { @@ -598,6 +685,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Make sure user is an admin if !UserView::read(&conn, user_id)?.admin { return Err(APIError::err("not_an_admin").into()); @@ -649,15 +745,30 @@ impl Perform for Oper { let user_view = UserView::read(&conn, data.user_id)?; - Ok(BanUserResponse { + let res = BanUserResponse { user: user_view, banned: data.ban, - }) + }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendAllMessage { + op: UserOperation::BanUser, + response: res.clone(), + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetReplies = &self.data; let claims = match Claims::decode(&data.auth) { @@ -669,6 +780,15 @@ impl Perform for Oper { let sort = SortType::from_str(&data.sort)?; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let replies = ReplyQueryBuilder::create(&conn, user_id) .sort(&sort) .unread_only(data.unread_only) @@ -681,7 +801,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetUserMentions = &self.data; let claims = match Claims::decode(&data.auth) { @@ -693,6 +818,15 @@ impl Perform for Oper { let sort = SortType::from_str(&data.sort)?; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let mentions = UserMentionQueryBuilder::create(&conn, user_id) .sort(&sort) .unread_only(data.unread_only) @@ -705,7 +839,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &EditUserMention = &self.data; let claims = match Claims::decode(&data.auth) { @@ -715,6 +854,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let user_mention = UserMention::read(&conn, data.user_mention_id)?; let user_mention_form = UserMentionForm { @@ -738,7 +886,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &MarkAllAsRead = &self.data; let claims = match Claims::decode(&data.auth) { @@ -748,6 +901,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let replies = ReplyQueryBuilder::create(&conn, user_id) .unread_only(true) .page(1) @@ -822,7 +984,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &DeleteAccount = &self.data; let claims = match Claims::decode(&data.auth) { @@ -832,6 +999,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let user: User_ = User_::read(&conn, user_id)?; // Verify the password @@ -903,9 +1079,23 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &PasswordReset = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Fetch that email let user: User_ = match User_::find_by_email(&conn, &data.email) { Ok(user) => user, @@ -934,9 +1124,23 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &PasswordChange = &self.data; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Fetch the user_id from the token let user_id = PasswordResetRequest::read_from_token(&conn, &data.token)?.user_id; @@ -959,7 +1163,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &CreatePrivateMessage = &self.data; let claims = match Claims::decode(&data.auth) { @@ -971,6 +1180,15 @@ impl Perform for Oper { let hostname = &format!("https://{}", Settings::get().hostname); + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + // Check for a site ban if UserView::read(&conn, user_id)?.banned { return Err(APIError::err("site_ban").into()); @@ -1016,12 +1234,28 @@ impl Perform for Oper { let message = PrivateMessageView::read(&conn, inserted_private_message.id)?; - Ok(PrivateMessageResponse { message }) + let res = PrivateMessageResponse { message }; + + if let Some(ws) = websocket_info { + ws.chatserver.do_send(SendUserRoomMessage { + op: UserOperation::CreatePrivateMessage, + response: res.clone(), + recipient_id: recipient_user.id, + my_id: ws.id, + }); + } + + Ok(res) } } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &EditPrivateMessage = &self.data; let claims = match Claims::decode(&data.auth) { @@ -1031,6 +1265,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let orig_private_message = PrivateMessage::read(&conn, data.edit_id)?; // Check for a site ban @@ -1076,7 +1319,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, conn: &PgConnection) -> Result { + fn perform( + &self, + pool: Pool>, + _websocket_info: Option, + rate_limit_info: Option, + ) -> Result { let data: &GetPrivateMessages = &self.data; let claims = match Claims::decode(&data.auth) { @@ -1086,6 +1334,15 @@ impl Perform for Oper { let user_id = claims.id; + if let Some(rl) = rate_limit_info { + rl.rate_limiter + .lock() + .unwrap() + .check_rate_limit_message(&rl.ip, false)?; + } + + let conn = pool.get()?; + let messages = PrivateMessageQueryBuilder::create(&conn, user_id) .page(data.page) .limit(data.limit) @@ -1097,7 +1354,12 @@ impl Perform for Oper { } impl Perform for Oper { - fn perform(&self, _conn: &PgConnection) -> Result { + fn perform( + &self, + _pool: Pool>, + websocket_info: Option, + _rate_limit_info: Option, + ) -> Result { let data: &UserJoin = &self.data; let claims = match Claims::decode(&data.auth) { @@ -1106,6 +1368,13 @@ impl Perform for Oper { }; let user_id = claims.id; + + if let Some(ws) = websocket_info { + if let Some(id) = ws.id { + ws.chatserver.do_send(JoinUserRoom { user_id, id }); + } + } + Ok(UserJoinResponse { user_id }) } } -- cgit v1.2.3