summaryrefslogtreecommitdiffstats
path: root/server/src/api/user.rs
diff options
context:
space:
mode:
authorRiley <asonix@asonix.dog>2020-07-01 07:54:29 -0500
committerGitHub <noreply@github.com>2020-07-01 08:54:29 -0400
commita074564458b8a108b77d98e5e8ce24168656763a (patch)
tree8cfb4e463b6b2dbd3c4b3ac2f312a42542f38d64 /server/src/api/user.rs
parent4c1cb5999cad496714cec67f101be38cd281d416 (diff)
Federation async (#848)
* Asyncify more * I guess these changed * Clean PR a bit * Convert more away from failure error * config changes for testing federation * It was DNS So actix-web's client relies on TRust DNS Resolver to figure out where to send data, but TRust DNS Resolver seems to not play nice with docker, which expressed itself as not resolving the name to an IP address _the first time_ when making a request. The fix was literally to make the request again (which I limited to 3 times total, and not exceeding the request timeout in total) * Only retry for connecterror Since TRust DNS Resolver was causing ConnectError::Timeout, this change limits the retry to only this error, returning immediately for any other error * Use http sig norm 0.4.0-alpha for actix-web 3.0 support * Blocking function, retry http requests * cargo +nightly fmt * Only create one pictrs dir * Don't yarn build * cargo +nightly fmt
Diffstat (limited to 'server/src/api/user.rs')
-rw-r--r--server/src/api/user.rs623
1 files changed, 368 insertions, 255 deletions
diff --git a/server/src/api/user.rs b/server/src/api/user.rs
index 0b6458e7..e7c27def 100644
--- a/server/src/api/user.rs
+++ b/server/src/api/user.rs
@@ -6,6 +6,7 @@ use crate::{
ApubObjectType,
EndpointType,
},
+ blocking,
db::{
comment::*,
comment_view::*,
@@ -43,13 +44,10 @@ use crate::{
UserOperation,
WebsocketInfo,
},
+ DbPool,
+ LemmyError,
};
use bcrypt::verify;
-use diesel::{
- r2d2::{ConnectionManager, Pool},
- PgConnection,
-};
-use failure::Error;
use log::error;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
@@ -252,20 +250,24 @@ pub struct UserJoinResponse {
pub user_id: i32,
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<Login> {
type Response = LoginResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<LoginResponse, Error> {
+ ) -> Result<LoginResponse, LemmyError> {
let data: &Login = &self.data;
- let conn = pool.get()?;
-
// Fetch that username / email
- let user: User_ = match User_::find_by_email_or_username(&conn, &data.username_or_email) {
+ let username_or_email = data.username_or_email.clone();
+ let user = match blocking(pool, move |conn| {
+ User_::find_by_email_or_username(conn, &username_or_email)
+ })
+ .await?
+ {
Ok(user) => user,
Err(_e) => return Err(APIError::err("couldnt_find_that_username_or_email").into()),
};
@@ -281,20 +283,20 @@ impl Perform for Oper<Login> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<Register> {
type Response = LoginResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<LoginResponse, Error> {
+ ) -> Result<LoginResponse, LemmyError> {
let data: &Register = &self.data;
- let conn = pool.get()?;
-
// Make sure site has open registration
- if let Ok(site) = SiteView::read(&conn) {
+ if let Ok(site) = blocking(pool, move |conn| SiteView::read(conn)).await? {
+ let site: SiteView = site;
if !site.open_registration {
return Err(APIError::err("registration_closed").into());
}
@@ -310,7 +312,11 @@ impl Perform for Oper<Register> {
}
// Make sure there are no admins
- if data.admin && !UserView::admins(&conn)?.is_empty() {
+ let any_admins = blocking(pool, move |conn| {
+ UserView::admins(conn).map(|a| a.is_empty())
+ })
+ .await??;
+ if data.admin && !any_admins {
return Err(APIError::err("admin_already_created").into());
}
@@ -346,7 +352,7 @@ impl Perform for Oper<Register> {
};
// Create the user
- let inserted_user = match User_::register(&conn, &user_form) {
+ let inserted_user = match blocking(pool, move |conn| User_::register(conn, &user_form)).await? {
Ok(user) => user,
Err(e) => {
let err_type = if e.to_string()
@@ -364,7 +370,7 @@ impl Perform for Oper<Register> {
let main_community_keypair = generate_actor_keypair()?;
// Create the main community if it doesn't exist
- let main_community: Community = match Community::read(&conn, 2) {
+ let main_community = match blocking(pool, move |conn| Community::read(conn, 2)).await? {
Ok(c) => c,
Err(_e) => {
let default_community_name = "main";
@@ -385,7 +391,7 @@ impl Perform for Oper<Register> {
last_refreshed_at: None,
published: None,
};
- Community::create(&conn, &community_form).unwrap()
+ blocking(pool, move |conn| Community::create(conn, &community_form)).await??
}
};
@@ -395,11 +401,10 @@ impl Perform for Oper<Register> {
user_id: inserted_user.id,
};
- let _inserted_community_follower =
- match CommunityFollower::follow(&conn, &community_follower_form) {
- Ok(user) => user,
- Err(_e) => return Err(APIError::err("community_follower_already_exists").into()),
- };
+ let follow = move |conn: &'_ _| CommunityFollower::follow(conn, &community_follower_form);
+ if blocking(pool, follow).await?.is_err() {
+ return Err(APIError::err("community_follower_already_exists").into());
+ };
// If its an admin, add them as a mod and follower to main
if data.admin {
@@ -408,11 +413,10 @@ impl Perform for Oper<Register> {
user_id: inserted_user.id,
};
- let _inserted_community_moderator =
- match CommunityModerator::join(&conn, &community_moderator_form) {
- Ok(user) => user,
- Err(_e) => return Err(APIError::err("community_moderator_already_exists").into()),
- };
+ let join = move |conn: &'_ _| CommunityModerator::join(conn, &community_moderator_form);
+ if blocking(pool, join).await?.is_err() {
+ return Err(APIError::err("community_moderator_already_exists").into());
+ }
}
// Return the jwt
@@ -422,14 +426,15 @@ impl Perform for Oper<Register> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<SaveUserSettings> {
type Response = LoginResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<LoginResponse, Error> {
+ ) -> Result<LoginResponse, LemmyError> {
let data: &SaveUserSettings = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -439,9 +444,7 @@ impl Perform for Oper<SaveUserSettings> {
let user_id = claims.id;
- let conn = pool.get()?;
-
- let read_user = User_::read(&conn, user_id)?;
+ let read_user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
let email = match &data.email {
Some(email) => Some(email.to_owned()),
@@ -465,7 +468,12 @@ impl Perform for Oper<SaveUserSettings> {
if !valid {
return Err(APIError::err("password_incorrect").into());
}
- User_::update_password(&conn, user_id, &new_password)?.password_encrypted
+ let new_password = new_password.to_owned();
+ let user = blocking(pool, move |conn| {
+ User_::update_password(conn, user_id, &new_password)
+ })
+ .await??;
+ user.password_encrypted
}
None => return Err(APIError::err("password_incorrect").into()),
}
@@ -501,7 +509,8 @@ impl Perform for Oper<SaveUserSettings> {
last_refreshed_at: None,
};
- let updated_user = match User_::update(&conn, user_id, &user_form) {
+ let res = blocking(pool, move |conn| User_::update(conn, user_id, &user_form)).await?;
+ let updated_user: User_ = match res {
Ok(user) => user,
Err(e) => {
let err_type = if e.to_string()
@@ -523,18 +532,17 @@ impl Perform for Oper<SaveUserSettings> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetUserDetails> {
type Response = GetUserDetailsResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetUserDetailsResponse, Error> {
+ ) -> Result<GetUserDetailsResponse, LemmyError> {
let data: &GetUserDetails = &self.data;
- let conn = pool.get()?;
-
let user_claims: Option<Claims> = match &data.auth {
Some(auth) => match Claims::decode(&auth) {
Ok(claims) => Some(claims.claims),
@@ -555,54 +563,71 @@ impl Perform for Oper<GetUserDetails> {
let sort = SortType::from_str(&data.sort)?;
+ let username = data
+ .username
+ .to_owned()
+ .unwrap_or_else(|| "admin".to_string());
let user_details_id = match data.user_id {
Some(id) => id,
None => {
- match User_::read_from_name(
- &conn,
- &data
- .username
- .to_owned()
- .unwrap_or_else(|| "admin".to_string()),
- ) {
+ let user = blocking(pool, move |conn| User_::read_from_name(conn, &username)).await?;
+ match user {
Ok(user) => user.id,
Err(_e) => return Err(APIError::err("couldnt_find_that_username_or_email").into()),
}
}
};
- let mut user_view = UserView::read(&conn, user_details_id)?;
-
- let mut posts_query = PostQueryBuilder::create(&conn)
- .sort(&sort)
- .show_nsfw(show_nsfw)
- .saved_only(data.saved_only)
- .for_community_id(data.community_id)
- .my_user_id(user_id)
- .page(data.page)
- .limit(data.limit);
-
- let mut comments_query = CommentQueryBuilder::create(&conn)
- .sort(&sort)
- .saved_only(data.saved_only)
- .my_user_id(user_id)
- .page(data.page)
- .limit(data.limit);
-
- // If its saved only, you don't care what creator it was
- // Or, if its not saved, then you only want it for that specific creator
- if !data.saved_only {
- posts_query = posts_query.for_creator_id(user_details_id);
- comments_query = comments_query.for_creator_id(user_details_id);
- }
+ let mut user_view = blocking(pool, move |conn| UserView::read(conn, user_details_id)).await??;
+
+ let page = data.page;
+ let limit = data.limit;
+ let saved_only = data.saved_only;
+ let community_id = data.community_id;
+ let (posts, comments) = blocking(pool, move |conn| {
+ let mut posts_query = PostQueryBuilder::create(conn)
+ .sort(&sort)
+ .show_nsfw(show_nsfw)
+ .saved_only(saved_only)
+ .for_community_id(community_id)
+ .my_user_id(user_id)
+ .page(page)
+ .limit(limit);
+
+ let mut comments_query = CommentQueryBuilder::create(conn)
+ .sort(&sort)
+ .saved_only(saved_only)
+ .my_user_id(user_id)
+ .page(page)
+ .limit(limit);
+
+ // If its saved only, you don't care what creator it was
+ // Or, if its not saved, then you only want it for that specific creator
+ if !saved_only {
+ posts_query = posts_query.for_creator_id(user_details_id);
+ comments_query = comments_query.for_creator_id(user_details_id);
+ }
+
+ let posts = posts_query.list()?;
+ let comments = comments_query.list()?;
+
+ Ok((posts, comments)) as Result<_, LemmyError>
+ })
+ .await??;
+
+ let follows = blocking(pool, move |conn| {
+ CommunityFollowerView::for_user(conn, user_details_id)
+ })
+ .await??;
+ let moderates = blocking(pool, move |conn| {
+ CommunityModeratorView::for_user(conn, user_details_id)
+ })
+ .await??;
- let posts = posts_query.list()?;
- let comments = comments_query.list()?;
+ let site_creator_id =
+ blocking(pool, move |conn| Site::read(conn, 1).map(|s| s.creator_id)).await??;
- let follows = CommunityFollowerView::for_user(&conn, user_details_id)?;
- let moderates = CommunityModeratorView::for_user(&conn, user_details_id)?;
- let site_creator_id = Site::read(&conn, 1)?.creator_id;
- let mut admins = UserView::admins(&conn)?;
+ let mut admins = blocking(pool, move |conn| UserView::admins(conn)).await??;
let creator_index = admins.iter().position(|r| r.id == site_creator_id).unwrap();
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
@@ -628,14 +653,15 @@ impl Perform for Oper<GetUserDetails> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<AddAdmin> {
type Response = AddAdminResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<AddAdminResponse, Error> {
+ ) -> Result<AddAdminResponse, LemmyError> {
let data: &AddAdmin = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -645,17 +671,17 @@ impl Perform for Oper<AddAdmin> {
let user_id = claims.id;
- let conn = pool.get()?;
-
// Make sure user is an admin
- if !UserView::read(&conn, user_id)?.admin {
+ let is_admin = move |conn: &'_ _| UserView::read(conn, user_id).map(|u| u.admin);
+ if !blocking(pool, is_admin).await?? {
return Err(APIError::err("not_an_admin").into());
}
- match User_::add_admin(&conn, user_id, data.added) {
- Ok(user) => user,
- Err(_e) => return Err(APIError::err("couldnt_update_user").into()),
- };
+ let added = data.added;
+ let add_admin = move |conn: &'_ _| User_::add_admin(conn, user_id, added);
+ if blocking(pool, add_admin).await?.is_err() {
+ return Err(APIError::err("couldnt_update_user").into());
+ }
// Mod tables
let form = ModAddForm {
@@ -664,10 +690,12 @@ impl Perform for Oper<AddAdmin> {
removed: Some(!data.added),
};
- ModAdd::create(&conn, &form)?;
+ blocking(pool, move |conn| ModAdd::create(conn, &form)).await??;
- let site_creator_id = Site::read(&conn, 1)?.creator_id;
- let mut admins = UserView::admins(&conn)?;
+ let site_creator_id =
+ blocking(pool, move |conn| Site::read(conn, 1).map(|s| s.creator_id)).await??;
+
+ let mut admins = blocking(pool, move |conn| UserView::admins(conn)).await??;
let creator_index = admins.iter().position(|r| r.id == site_creator_id).unwrap();
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
@@ -686,14 +714,15 @@ impl Perform for Oper<AddAdmin> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<BanUser> {
type Response = BanUserResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<BanUserResponse, Error> {
+ ) -> Result<BanUserResponse, LemmyError> {
let data: &BanUser = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -703,17 +732,17 @@ impl Perform for Oper<BanUser> {
let user_id = claims.id;
- let conn = pool.get()?;
-
// Make sure user is an admin
- if !UserView::read(&conn, user_id)?.admin {
+ let is_admin = move |conn: &'_ _| UserView::read(conn, user_id).map(|u| u.admin);
+ if !blocking(pool, is_admin).await?? {
return Err(APIError::err("not_an_admin").into());
}
- match User_::ban_user(&conn, user_id, data.ban) {
- Ok(user) => user,
- Err(_e) => return Err(APIError::err("couldnt_update_user").into()),
- };
+ let ban = data.ban;
+ let ban_user = move |conn: &'_ _| User_::ban_user(conn, user_id, ban);
+ if blocking(pool, ban_user).await?.is_err() {
+ return Err(APIError::err("couldnt_update_user").into());
+ }
// Mod tables
let expires = match data.expires {
@@ -729,9 +758,10 @@ impl Perform for Oper<BanUser> {
expires,
};
- ModBan::create(&conn, &form)?;
+ blocking(pool, move |conn| ModBan::create(conn, &form)).await??;
- let user_view = UserView::read(&conn, data.user_id)?;
+ let user_id = data.user_id;
+ let user_view = blocking(pool, move |conn| UserView::read(conn, user_id)).await??;
let res = BanUserResponse {
user: user_view,
@@ -750,14 +780,15 @@ impl Perform for Oper<BanUser> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetReplies> {
type Response = GetRepliesResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetRepliesResponse, Error> {
+ ) -> Result<GetRepliesResponse, LemmyError> {
let data: &GetReplies = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -769,27 +800,32 @@ impl Perform for Oper<GetReplies> {
let sort = SortType::from_str(&data.sort)?;
- let conn = pool.get()?;
-
- let replies = ReplyQueryBuilder::create(&conn, user_id)
- .sort(&sort)
- .unread_only(data.unread_only)
- .page(data.page)
- .limit(data.limit)
- .list()?;
+ let page = data.page;
+ let limit = data.limit;
+ let unread_only = data.unread_only;
+ let replies = blocking(pool, move |conn| {
+ ReplyQueryBuilder::create(conn, user_id)
+ .sort(&sort)
+ .unread_only(unread_only)
+ .page(page)
+ .limit(limit)
+ .list()
+ })
+ .await??;
Ok(GetRepliesResponse { replies })
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetUserMentions> {
type Response = GetUserMentionsResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetUserMentionsResponse, Error> {
+ ) -> Result<GetUserMentionsResponse, LemmyError> {
let data: &GetUserMentions = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -801,27 +837,32 @@ impl Perform for Oper<GetUserMentions> {
let sort = SortType::from_str(&data.sort)?;
- let conn = pool.get()?;
-
- let mentions = UserMentionQueryBuilder::create(&conn, user_id)
- .sort(&sort)
- .unread_only(data.unread_only)
- .page(data.page)
- .limit(data.limit)
- .list()?;
+ let page = data.page;
+ let limit = data.limit;
+ let unread_only = data.unread_only;
+ let mentions = blocking(pool, move |conn| {
+ UserMentionQueryBuilder::create(conn, user_id)
+ .sort(&sort)
+ .unread_only(unread_only)
+ .page(page)
+ .limit(limit)
+ .list()
+ })
+ .await??;
Ok(GetUserMentionsResponse { mentions })
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<EditUserMention> {
type Response = UserMentionResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<UserMentionResponse, Error> {
+ ) -> Result<UserMentionResponse, LemmyError> {
let data: &EditUserMention = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -831,9 +872,9 @@ impl Perform for Oper<EditUserMention> {
let user_id = claims.id;
- let conn = pool.get()?;
-
- let user_mention = UserMention::read(&conn, data.user_mention_id)?;
+ let user_mention_id = data.user_mention_id;
+ let user_mention =
+ blocking(pool, move |conn| UserMention::read(conn, user_mention_id)).await??;
let user_mention_form = UserMentionForm {
recipient_id: user_id,
@@ -841,13 +882,18 @@ impl Perform for Oper<EditUserMention> {
read: data.read.to_owned(),
};
- let _updated_user_mention =
- match UserMention::update(&conn, user_mention.id, &user_mention_form) {
- Ok(comment) => comment,
- Err(_e) => return Err(APIError::err("couldnt_update_comment").into()),
- };
+ let user_mention_id = user_mention.id;
+ let update_mention =
+ move |conn: &'_ _| UserMention::update(conn, user_mention_id, &user_mention_form);
+ if blocking(pool, update_mention).await?.is_err() {
+ return Err(APIError::err("couldnt_update_comment").into());
+ };
- let user_mention_view = UserMentionView::read(&conn, user_mention.id, user_id)?;
+ let user_mention_id = user_mention.id;
+ let user_mention_view = blocking(pool, move |conn| {
+ UserMentionView::read(conn, user_mention_id, user_id)
+ })
+ .await??;
Ok(UserMentionResponse {
mention: user_mention_view,
@@ -855,14 +901,15 @@ impl Perform for Oper<EditUserMention> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<MarkAllAsRead> {
type Response = GetRepliesResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetRepliesResponse, Error> {
+ ) -> Result<GetRepliesResponse, LemmyError> {
let data: &MarkAllAsRead = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -872,28 +919,35 @@ impl Perform for Oper<MarkAllAsRead> {
let user_id = claims.id;
- let conn = pool.get()?;
-
- let replies = ReplyQueryBuilder::create(&conn, user_id)
- .unread_only(true)
- .page(1)
- .limit(999)
- .list()?;
+ let replies = blocking(pool, move |conn| {
+ ReplyQueryBuilder::create(conn, user_id)
+ .unread_only(true)
+ .page(1)
+ .limit(999)
+ .list()
+ })
+ .await??;
+ // TODO: this should probably be a bulk operation
for reply in &replies {
- match Comment::mark_as_read(&conn, reply.id) {
- Ok(comment) => comment,
- Err(_e) => return Err(APIError::err("couldnt_update_comment").into()),
- };
+ let reply_id = reply.id;
+ let mark_as_read = move |conn: &'_ _| Comment::mark_as_read(conn, reply_id);
+ if blocking(pool, mark_as_read).await?.is_err() {
+ return Err(APIError::err("couldnt_update_comment").into());
+ }
}
// Mentions
- let mentions = UserMentionQueryBuilder::create(&conn, user_id)
- .unread_only(true)
- .page(1)
- .limit(999)
- .list()?;
+ let mentions = blocking(pool, move |conn| {
+ UserMentionQueryBuilder::create(conn, user_id)
+ .unread_only(true)
+ .page(1)
+ .limit(999)
+ .list()
+ })
+ .await??;
+ // TODO: this should probably be a bulk operation
for mention in &mentions {
let mention_form = UserMentionForm {
recipient_id: mention.to_owned().recipient_id,
@@ -901,20 +955,25 @@ impl Perform for Oper<MarkAllAsRead> {
read: Some(true),
};
- let _updated_mention =
- match UserMention::update(&conn, mention.user_mention_id, &mention_form) {
- Ok(mention) => mention,
- Err(_e) => return Err(APIError::err("couldnt_update_comment").into()),
- };
+ let user_mention_id = mention.user_mention_id;
+ let update_mention =
+ move |conn: &'_ _| UserMention::update(conn, user_mention_id, &mention_form);
+ if blocking(pool, update_mention).await?.is_err() {
+ return Err(APIError::err("couldnt_update_comment").into());
+ }
}
// messages
- let messages = PrivateMessageQueryBuilder::create(&conn, user_id)
- .page(1)
- .limit(999)
- .unread_only(true)
- .list()?;
+ let messages = blocking(pool, move |conn| {
+ PrivateMessageQueryBuilder::create(conn, user_id)
+ .page(1)
+ .limit(999)
+ .unread_only(true)
+ .list()
+ })
+ .await??;
+ // TODO: this should probably be a bulk operation
for message in &messages {
let private_message_form = PrivateMessageForm {
content: message.to_owned().content,
@@ -928,25 +987,27 @@ impl Perform for Oper<MarkAllAsRead> {
published: None,
};
- let _updated_message = match PrivateMessage::update(&conn, message.id, &private_message_form)
- {
- Ok(message) => message,
- Err(_e) => return Err(APIError::err("couldnt_update_private_message").into()),
- };
+ let message_id = message.id;
+ let update_pm =
+ move |conn: &'_ _| PrivateMessage::update(conn, message_id, &private_message_form);
+ if blocking(pool, update_pm).await?.is_err() {
+ return Err(APIError::err("couldnt_update_private_message").into());
+ }
}
Ok(GetRepliesResponse { replies: vec![] })
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<DeleteAccount> {
type Response = LoginResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<LoginResponse, Error> {
+ ) -> Result<LoginResponse, LemmyError> {
let data: &DeleteAccount = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -956,9 +1017,7 @@ impl Perform for Oper<DeleteAccount> {
let user_id = claims.id;
- let conn = pool.get()?;
-
- let user: User_ = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
// Verify the password
let valid: bool = verify(&data.password, &user.password_encrypted).unwrap_or(false);
@@ -967,30 +1026,40 @@ impl Perform for Oper<DeleteAccount> {
}
// Comments
- let comments = CommentQueryBuilder::create(&conn)
- .for_creator_id(user_id)
- .limit(std::i64::MAX)
- .list()?;
+ let comments = blocking(pool, move |conn| {
+ CommentQueryBuilder::create(conn)
+ .for_creator_id(user_id)
+ .limit(std::i64::MAX)
+ .list()
+ })
+ .await??;
+ // TODO: this should probably be a bulk operation
for comment in &comments {
- let _updated_comment = match Comment::permadelete(&conn, comment.id) {
- Ok(comment) => comment,
- Err(_e) => return Err(APIError::err("couldnt_update_comment").into()),
- };
+ let comment_id = comment.id;
+ let permadelete = move |conn: &'_ _| Comment::permadelete(conn, comment_id);
+ if blocking(pool, permadelete).await?.is_err() {
+ return Err(APIError::err("couldnt_update_comment").into());
+ }
}
// Posts
- let posts = PostQueryBuilder::create(&conn)
- .sort(&SortType::New)
- .for_creator_id(user_id)
- .limit(std::i64::MAX)
- .list()?;
+ let posts = blocking(pool, move |conn| {
+ PostQueryBuilder::create(conn)
+ .sort(&SortType::New)
+ .for_creator_id(user_id)
+ .limit(std::i64::MAX)
+ .list()
+ })
+ .await??;
+ // TODO: this should probably be a bulk operation
for post in &posts {
- let _updated_post = match Post::permadelete(&conn, post.id) {
- Ok(post) => post,
- Err(_e) => return Err(APIError::err("couldnt_update_post").into()),
- };
+ let post_id = post.id;
+ let permadelete = move |conn: &'_ _| Post::permadelete(conn, post_id);
+ if blocking(pool, permadelete).await?.is_err() {
+ return Err(APIError::err("couldnt_update_post").into());
+ }
}
Ok(LoginResponse {
@@ -999,20 +1068,20 @@ impl Perform for Oper<DeleteAccount> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<PasswordReset> {
type Response = PasswordResetResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<PasswordResetResponse, Error> {
+ ) -> Result<PasswordResetResponse, LemmyError> {
let data: &PasswordReset = &self.data;
- let conn = pool.get()?;
-
// Fetch that email
- let user: User_ = match User_::find_by_email(&conn, &data.email) {
+ let email = data.email.clone();
+ let user = match blocking(pool, move |conn| User_::find_by_email(conn, &email)).await? {
Ok(user) => user,
Err(_e) => return Err(APIError::err("couldnt_find_that_username_or_email").into()),
};
@@ -1021,7 +1090,12 @@ impl Perform for Oper<PasswordReset> {
let token = generate_random_string();
// Insert the row
- PasswordResetRequest::create_token(&conn, user.id, &token)?;
+ let token2 = token.clone();
+ let user_id = user.id;
+ blocking(pool, move |conn| {
+ PasswordResetRequest::create_token(conn, user_id, &token2)
+ })
+ .await??;
// Email the pure token to the user.
// TODO no i18n support here.
@@ -1038,20 +1112,23 @@ impl Perform for Oper<PasswordReset> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<PasswordChange> {
type Response = LoginResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<LoginResponse, Error> {
+ ) -> Result<LoginResponse, LemmyError> {
let data: &PasswordChange = &self.data;
- let conn = pool.get()?;
-
// Fetch the user_id from the token
- let user_id = PasswordResetRequest::read_from_token(&conn, &data.token)?.user_id;
+ let token = data.token.clone();
+ let user_id = blocking(pool, move |conn| {
+ PasswordResetRequest::read_from_token(conn, &token).map(|p| p.user_id)
+ })
+ .await??;
// Make sure passwords match
if data.password != data.password_verify {
@@ -1059,7 +1136,12 @@ impl Perform for Oper<PasswordChange> {
}
// Update the user with the new password
- let updated_user = match User_::update_password(&conn, user_id, &data.password) {
+ let password = data.password.clone();
+ let updated_user = match blocking(pool, move |conn| {
+ User_::update_password(conn, user_id, &password)
+ })
+ .await?
+ {
Ok(user) => user,
Err(_e) => return Err(APIError::err("couldnt_update_user").into()),
};
@@ -1071,14 +1153,15 @@ impl Perform for Oper<PasswordChange> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreatePrivateMessage> {
type Response = PrivateMessageResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<PrivateMessageResponse, Error> {
+ ) -> Result<PrivateMessageResponse, LemmyError> {
let data: &CreatePrivateMessage = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -1090,10 +1173,8 @@ impl Perform for Oper<CreatePrivateMessage> {
let hostname = &format!("https://{}", Settings::get().hostname);
- let conn = pool.get()?;
-
// Check for a site ban
- let user = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
@@ -1112,23 +1193,34 @@ impl Perform for Oper<CreatePrivateMessage> {
published: None,
};
- let inserted_private_message = match PrivateMessage::create(&conn, &private_message_form) {
+ let inserted_private_message = match blocking(pool, move |conn| {
+ PrivateMessage::create(conn, &private_message_form)
+ })
+ .await?
+ {
Ok(private_message) => private_message,
Err(_e) => {
return Err(APIError::err("couldnt_create_private_message").into());
}
};
- let updated_private_message =
- match PrivateMessage::update_ap_id(&conn, inserted_private_message.id) {
- Ok(private_message) => private_message,
- Err(_e) => return Err(APIError::err("couldnt_create_private_message").into()),
- };
+ let inserted_private_message_id = inserted_private_message.id;
+ let updated_private_message = match blocking(pool, move |conn| {
+ PrivateMessage::update_ap_id(&conn, inserted_private_message_id)
+ })
+ .await?
+ {
+ Ok(private_message) => private_message,
+ Err(_e) => return Err(APIError::err("couldnt_create_private_message").into()),
+ };
- updated_private_message.send_create(&user, &conn)?;
+ updated_private_message
+ .send_create(&user, &self.client, pool)
+ .await?;
// Send notifications to the recipient
- let recipient_user = User_::read(&conn, data.recipient_id)?;
+ let recipient_id = data.recipient_id;
+ let recipient_user = blocking(pool, move |conn| User_::read(conn, recipient_id)).await??;
if recipient_user.send_notifications_to_email {
if let Some(email) = recipient_user.email {
let subject = &format!(
@@ -1147,7 +1239,10 @@ impl Perform for Oper<CreatePrivateMessage> {
}
}
- let message = PrivateMessageView::read(&conn, inserted_private_message.id)?;
+ let message = blocking(pool, move |conn| {
+ PrivateMessageView::read(conn, inserted_private_message.id)
+ })
+ .await??;
let res = PrivateMessageResponse { message };
@@ -1164,14 +1259,15 @@ impl Perform for Oper<CreatePrivateMessage> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<EditPrivateMessage> {
type Response = PrivateMessageResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<PrivateMessageResponse, Error> {
+ ) -> Result<PrivateMessageResponse, LemmyError> {
let data: &EditPrivateMessage = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -1181,12 +1277,12 @@ impl Perform for Oper<EditPrivateMessage> {
let user_id = claims.id;
- let conn = pool.get()?;
-
- let orig_private_message = PrivateMessage::read(&conn, data.edit_id)?;
+ let edit_id = data.edit_id;
+ let orig_private_message =
+ blocking(pool, move |conn| PrivateMessage::read(conn, edit_id)).await??;
// Check for a site ban
- let user = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;