summaryrefslogtreecommitdiffstats
path: root/server/src/api/post.rs
diff options
context:
space:
mode:
authorRiley <asonix@asonix.dog>2020-07-01 07:54:29 -0500
committerGitHub <noreply@github.com>2020-07-01 08:54:29 -0400
commita074564458b8a108b77d98e5e8ce24168656763a (patch)
tree8cfb4e463b6b2dbd3c4b3ac2f312a42542f38d64 /server/src/api/post.rs
parent4c1cb5999cad496714cec67f101be38cd281d416 (diff)
Federation async (#848)
* Asyncify more * I guess these changed * Clean PR a bit * Convert more away from failure error * config changes for testing federation * It was DNS So actix-web's client relies on TRust DNS Resolver to figure out where to send data, but TRust DNS Resolver seems to not play nice with docker, which expressed itself as not resolving the name to an IP address _the first time_ when making a request. The fix was literally to make the request again (which I limited to 3 times total, and not exceeding the request timeout in total) * Only retry for connecterror Since TRust DNS Resolver was causing ConnectError::Timeout, this change limits the retry to only this error, returning immediately for any other error * Use http sig norm 0.4.0-alpha for actix-web 3.0 support * Blocking function, retry http requests * cargo +nightly fmt * Only create one pictrs dir * Don't yarn build * cargo +nightly fmt
Diffstat (limited to 'server/src/api/post.rs')
-rw-r--r--server/src/api/post.rs280
1 files changed, 170 insertions, 110 deletions
diff --git a/server/src/api/post.rs b/server/src/api/post.rs
index a3ac4915..840f1530 100644
--- a/server/src/api/post.rs
+++ b/server/src/api/post.rs
@@ -1,6 +1,7 @@
use crate::{
api::{APIError, Oper, Perform},
apub::{ApubLikeableType, ApubObjectType},
+ blocking,
db::{
comment_view::*,
community_view::*,
@@ -26,12 +27,9 @@ use crate::{
UserOperation,
WebsocketInfo,
},
+ DbPool,
+ LemmyError,
};
-use diesel::{
- r2d2::{ConnectionManager, Pool},
- PgConnection,
-};
-use failure::Error;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
@@ -112,14 +110,15 @@ pub struct SavePost {
auth: String,
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreatePost> {
type Response = PostResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<PostResponse, Error> {
+ ) -> Result<PostResponse, LemmyError> {
let data: &CreatePost = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -139,22 +138,23 @@ impl Perform for Oper<CreatePost> {
let user_id = claims.id;
- let conn = pool.get()?;
-
// Check for a community ban
- if CommunityUserBanView::get(&conn, user_id, data.community_id).is_ok() {
+ let community_id = data.community_id;
+ let is_banned =
+ move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
+ if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
// Check for a site ban
- let user = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
// Fetch Iframely and pictrs cached image
let (iframely_title, iframely_description, iframely_html, pictrs_thumbnail) =
- fetch_iframely_and_pictrs_data(data.url.to_owned());
+ fetch_iframely_and_pictrs_data(&self.client, data.url.to_owned()).await;
let post_form = PostForm {
name: data.name.to_owned(),
@@ -177,7 +177,7 @@ impl Perform for Oper<CreatePost> {
published: None,
};
- let inserted_post = match Post::create(&conn, &post_form) {
+ let inserted_post = match blocking(pool, move |conn| Post::create(conn, &post_form)).await? {
Ok(post) => post,
Err(e) => {
let err_type = if e.to_string() == "value too long for type character varying(200)" {
@@ -190,12 +190,14 @@ impl Perform for Oper<CreatePost> {
}
};
- let updated_post = match Post::update_ap_id(&conn, inserted_post.id) {
- Ok(post) => post,
- Err(_e) => return Err(APIError::err("couldnt_create_post").into()),
- };
+ let inserted_post_id = inserted_post.id;
+ let updated_post =
+ match blocking(pool, move |conn| Post::update_ap_id(conn, inserted_post_id)).await? {
+ Ok(post) => post,
+ Err(_e) => return Err(APIError::err("couldnt_create_post").into()),
+ };
- updated_post.send_create(&user, &conn)?;
+ updated_post.send_create(&user, &self.client, pool).await?;
// They like their own post by default
let like_form = PostLikeForm {
@@ -204,15 +206,20 @@ impl Perform for Oper<CreatePost> {
score: 1,
};
- let _inserted_like = match PostLike::like(&conn, &like_form) {
- Ok(like) => like,
- Err(_e) => return Err(APIError::err("couldnt_like_post").into()),
- };
+ let like = move |conn: &'_ _| PostLike::like(conn, &like_form);
+ if blocking(pool, like).await?.is_err() {
+ return Err(APIError::err("couldnt_like_post").into());
+ }
- updated_post.send_like(&user, &conn)?;
+ updated_post.send_like(&user, &self.client, pool).await?;
// Refetch the view
- let post_view = match PostView::read(&conn, inserted_post.id, Some(user_id)) {
+ let inserted_post_id = inserted_post.id;
+ let post_view = match blocking(pool, move |conn| {
+ PostView::read(conn, inserted_post_id, Some(user_id))
+ })
+ .await?
+ {
Ok(post) => post,
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
};
@@ -231,14 +238,15 @@ impl Perform for Oper<CreatePost> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetPost> {
type Response = GetPostResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetPostResponse, Error> {
+ ) -> Result<GetPostResponse, LemmyError> {
let data: &GetPost = &self.data;
let user_id: Option<i32> = match &data.auth {
@@ -252,25 +260,38 @@ impl Perform for Oper<GetPost> {
None => None,
};
- let conn = pool.get()?;
-
- let post_view = match PostView::read(&conn, data.id, user_id) {
+ let id = data.id;
+ let post_view = match blocking(pool, move |conn| PostView::read(conn, id, user_id)).await? {
Ok(post) => post,
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
};
- let comments = CommentQueryBuilder::create(&conn)
- .for_post_id(data.id)
- .my_user_id(user_id)
- .limit(9999)
- .list()?;
+ let id = data.id;
+ let comments = blocking(pool, move |conn| {
+ CommentQueryBuilder::create(conn)
+ .for_post_id(id)
+ .my_user_id(user_id)
+ .limit(9999)
+ .list()
+ })
+ .await??;
+
+ let community_id = post_view.community_id;
+ let community = blocking(pool, move |conn| {
+ CommunityView::read(conn, community_id, user_id)
+ })
+ .await??;
- let community = CommunityView::read(&conn, post_view.community_id, user_id)?;
+ let community_id = post_view.community_id;
+ let moderators = blocking(pool, move |conn| {
+ CommunityModeratorView::for_community(conn, community_id)
+ })
+ .await??;
- let moderators = CommunityModeratorView::for_community(&conn, post_view.community_id)?;
+ let site_creator_id =
+ blocking(pool, move |conn| Site::read(conn, 1).map(|s| s.creator_id)).await??;
- let site_creator_id = Site::read(&conn, 1)?.creator_id;
- let mut admins = UserView::admins(&conn)?;
+ let mut admins = blocking(pool, move |conn| UserView::admins(conn)).await??;
let creator_index = admins.iter().position(|r| r.id == site_creator_id).unwrap();
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
@@ -305,14 +326,15 @@ impl Perform for Oper<GetPost> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetPosts> {
type Response = GetPostsResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<GetPostsResponse, Error> {
+ ) -> Result<GetPostsResponse, LemmyError> {
let data: &GetPosts = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@@ -336,17 +358,21 @@ impl Perform for Oper<GetPosts> {
let type_ = ListingType::from_str(&data.type_)?;
let sort = SortType::from_str(&data.sort)?;
- let conn = pool.get()?;
-
- let posts = match PostQueryBuilder::create(&conn)
- .listing_type(type_)
- .sort(&sort)
- .show_nsfw(show_nsfw)
- .for_community_id(data.community_id)
- .my_user_id(user_id)
- .page(data.page)
- .limit(data.limit)
- .list()
+ let page = data.page;
+ let limit = data.limit;
+ let community_id = data.community_id;
+ let posts = match blocking(pool, move |conn| {
+ PostQueryBuilder::create(conn)
+ .listing_type(type_)
+ .sort(&sort)
+ .show_nsfw(show_nsfw)
+ .for_community_id(community_id)
+ .my_user_id(user_id)
+ .page(page)
+ .limit(limit)
+ .list()
+ })
+ .await?
{
Ok(posts) => posts,
Err(_e) => return Err(APIError::err("couldnt_get_posts").into()),
@@ -370,14 +396,15 @@ impl Perform for Oper<GetPosts> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreatePostLike> {
type Response = PostResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<PostResponse, Error> {
+ ) -> Result<PostResponse, LemmyError> {
let data: &CreatePostLike = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -387,24 +414,27 @@ impl Perform for Oper<CreatePostLike> {
let user_id = claims.id;
- let conn = pool.get()?;
-
// Don't do a downvote if site has downvotes disabled
if data.score == -1 {
- let site = SiteView::read(&conn)?;
+ let site = blocking(pool, move |conn| SiteView::read(conn)).await??;
if !site.enable_downvotes {
return Err(APIError::err("downvotes_disabled").into());
}
}
// Check for a community ban
- let post = Post::read(&conn, data.post_id)?;
- if CommunityUserBanView::get(&conn, user_id, post.community_id).is_ok() {
+ let post_id = data.post_id;
+ let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
+
+ let community_id = post.community_id;
+ let is_banned =
+ move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
+ if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
// Check for a site ban
- let user = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
@@ -416,26 +446,33 @@ impl Perform for Oper<CreatePostLike> {
};
// Remove any likes first
- PostLike::remove(&conn, &like_form)?;
+ let like_form2 = like_form.clone();
+ blocking(pool, move |conn| PostLike::remove(conn, &like_form2)).await??;
// Only add the like if the score isnt 0
let do_add = like_form.score != 0 && (like_form.score == 1 || like_form.score == -1);
if do_add {
- let _inserted_like = match PostLike::like(&conn, &like_form) {
- Ok(like) => like,
- Err(_e) => return Err(APIError::err("couldnt_like_post").into()),
- };
+ let like_form2 = like_form.clone();
+ let like = move |conn: &'_ _| PostLike::like(conn, &like_form2);
+ if blocking(pool, like).await?.is_err() {
+ return Err(APIError::err("couldnt_like_post").into());
+ }
if like_form.score == 1 {
- post.send_like(&user, &conn)?;
+ post.send_like(&user, &self.client, pool).await?;
} else if like_form.score == -1 {
- post.send_dislike(&user, &conn)?;
+ post.send_dislike(&user, &self.client, pool).await?;
}
} else {
- post.send_undo_like(&user, &conn)?;
+ post.send_undo_like(&user, &self.client, pool).await?;
}
- let post_view = match PostView::read(&conn, data.post_id, Some(user_id)) {
+ let post_id = data.post_id;
+ let post_view = match blocking(pool, move |conn| {
+ PostView::read(conn, post_id, Some(user_id))
+ })
+ .await?
+ {
Ok(post) => post,
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
};
@@ -454,14 +491,15 @@ impl Perform for Oper<CreatePostLike> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<EditPost> {
type Response = PostResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
- ) -> Result<PostResponse, Error> {
+ ) -> Result<PostResponse, LemmyError> {
let data: &EditPost = &self.data;
if let Err(slurs) = slur_check(&data.name) {
@@ -481,37 +519,46 @@ impl Perform for Oper<EditPost> {
let user_id = claims.id;
- let conn = pool.get()?;
-
// Verify its the creator or a mod or admin
+ let community_id = data.community_id;
let mut editors: Vec<i32> = vec![data.creator_id];
editors.append(
- &mut CommunityModeratorView::for_community(&conn, data.community_id)?
- .into_iter()
- .map(|m| m.user_id)
- .collect(),
+ &mut blocking(pool, move |conn| {
+ CommunityModeratorView::for_community(conn, community_id)
+ .map(|v| v.into_iter().map(|m| m.user_id).collect())
+ })
+ .await??,
+ );
+ editors.append(
+ &mut blocking(pool, move |conn| {
+ UserView::admins(conn).map(|v| v.into_iter().map(|a| a.id).collect())
+ })
+ .await??,
);
- editors.append(&mut UserView::admins(&conn)?.into_iter().map(|a| a.id).collect());
if !editors.contains(&user_id) {
return Err(APIError::err("no_post_edit_allowed").into());
}
// Check for a community ban
- if CommunityUserBanView::get(&conn, user_id, data.community_id).is_ok() {
+ let community_id = data.community_id;
+ let is_banned =
+ move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
+ if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
// Check for a site ban
- let user = User_::read(&conn, user_id)?;
+ let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
// Fetch Iframely and Pictrs cached image
let (iframely_title, iframely_description, iframely_html, pictrs_thumbnail) =
- fetch_iframely_and_pictrs_data(data.url.to_owned());
+ fetch_iframely_and_pictrs_data(&self.client, data.url.to_owned()).await;
- let read_post = Post::read(&conn, data.edit_id)?;
+ let edit_id = data.edit_id;
+ let read_post = blocking(pool, move |conn| Post::read(conn, edit_id)).await??;
let post_form = PostForm {
name: data.name.to_owned(),
@@ -534,7 +581,9 @@ impl Perform for Oper<EditPost> {
published: None,
};
- let updated_post = match Post::update(&conn, data.edit_id, &post_form) {
+ let edit_id = data.edit_id;
+ let res = blocking(pool, move |conn| Post::update(conn, edit_id, &post_form)).await?;
+ let updated_post: Post = match res {
Ok(post) => post,
Err(e) => {
let err_type = if e.to_string() == "value too long for type character varying(200)" {
@@ -555,7 +604,7 @@ impl Perform for Oper<EditPost> {
removed: Some(removed),
reason: data.reason.to_owned(),
};
- ModRemovePost::create(&conn, &form)?;
+ blocking(pool, move |conn| ModRemovePost::create(conn, &form)).await??;
}
if let Some(locked) = data.locked.to_owned() {
@@ -564,7 +613,7 @@ impl Perform for Oper<EditPost> {
post_id: data.edit_id,
locked: Some(locked),
};
- ModLockPost::create(&conn, &form)?;
+ blocking(pool, move |conn| ModLockPost::create(conn, &form)).await??;
}
if let Some(stickied) = data.stickied.to_owned() {
@@ -573,26 +622,34 @@ impl Perform for Oper<EditPost> {
post_id: data.edit_id,
stickied: Some(stickied),
};
- ModStickyPost::create(&conn, &form)?;
+ blocking(pool, move |conn| ModStickyPost::create(conn, &form)).await??;
}
if let Some(deleted) = data.deleted.to_owned() {
if deleted {
- updated_post.send_delete(&user, &conn)?;
+ updated_post.send_delete(&user, &self.client, pool).await?;
} else {
- updated_post.send_undo_delete(&user, &conn)?;
+ updated_post
+ .send_undo_delete(&user, &self.client, pool)
+ .await?;
}
} else if let Some(removed) = data.removed.to_owned() {
if removed {
- updated_post.send_remove(&user, &conn)?;
+ updated_post.send_remove(&user, &self.client, pool).await?;
} else {
- updated_post.send_undo_remove(&user, &conn)?;
+ updated_post
+ .send_undo_remove(&user, &self.client, pool)
+ .await?;
}
} else {
- updated_post.send_update(&user, &conn)?;
+ updated_post.send_update(&user, &self.client, pool).await?;
}
- let post_view = PostView::read(&conn, data.edit_id, Some(user_id))?;
+ let edit_id = data.edit_id;
+ let post_view = blocking(pool, move |conn| {
+ PostView::read(conn, edit_id, Some(user_id))
+ })
+ .await??;
let res = PostResponse { post: post_view };
@@ -608,14 +665,15 @@ impl Perform for Oper<EditPost> {
}
}
+#[async_trait::async_trait(?Send)]
impl Perform for Oper<SavePost> {
type Response = PostResponse;
- fn perform(
+ async fn perform(
&self,
- pool: Pool<ConnectionManager<PgConnection>>,
+ pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
- ) -> Result<PostResponse, Error> {
+ ) -> Result<PostResponse, LemmyError> {
let data: &SavePost = &self.data;
let claims = match Claims::decode(&data.auth) {
@@ -630,21 +688,23 @@ impl Perform for Oper<SavePost> {
user_id,
};
- let conn = pool.get()?;
-
if data.save {
- match PostSaved::save(&conn, &post_saved_form) {
- Ok(post) => post,
- Err(_e) => return Err(APIError::err("couldnt_save_post").into()),
- };
+ let save = move |conn: &'_ _| PostSaved::save(conn, &post_saved_form);
+ if blocking(pool, save).await?.is_err() {
+ return Err(APIError::err("couldnt_save_post").into());
+ }
} else {
- match PostSaved::unsave(&conn, &post_saved_form) {
- Ok(post) => post,
- Err(_e) => return Err(APIError::err("couldnt_save_post").into()),
- };
+ let unsave = move |conn: &'_ _| PostSaved::unsave(conn, &post_saved_form);
+ if blocking(pool, unsave).await?.is_err() {
+ return Err(APIError::err("couldnt_save_post").into());
+ }
}
- let post_view = PostView::read(&conn, data.post_id, Some(user_id))?;
+ let post_id = data.post_id;
+ let post_view = blocking(pool, move |conn| {
+ PostView::read(conn, post_id, Some(user_id))
+ })
+ .await??;
Ok(PostResponse { post: post_view })
}