summaryrefslogtreecommitdiffstats
path: root/server/src/apub/shared_inbox.rs
diff options
context:
space:
mode:
authorRiley <asonix@asonix.dog>2020-07-01 07:54:29 -0500
committerGitHub <noreply@github.com>2020-07-01 08:54:29 -0400
commita074564458b8a108b77d98e5e8ce24168656763a (patch)
tree8cfb4e463b6b2dbd3c4b3ac2f312a42542f38d64 /server/src/apub/shared_inbox.rs
parent4c1cb5999cad496714cec67f101be38cd281d416 (diff)
Federation async (#848)
* Asyncify more * I guess these changed * Clean PR a bit * Convert more away from failure error * config changes for testing federation * It was DNS So actix-web's client relies on TRust DNS Resolver to figure out where to send data, but TRust DNS Resolver seems to not play nice with docker, which expressed itself as not resolving the name to an IP address _the first time_ when making a request. The fix was literally to make the request again (which I limited to 3 times total, and not exceeding the request timeout in total) * Only retry for connecterror Since TRust DNS Resolver was causing ConnectError::Timeout, this change limits the retry to only this error, returning immediately for any other error * Use http sig norm 0.4.0-alpha for actix-web 3.0 support * Blocking function, retry http requests * cargo +nightly fmt * Only create one pictrs dir * Don't yarn build * cargo +nightly fmt
Diffstat (limited to 'server/src/apub/shared_inbox.rs')
-rw-r--r--server/src/apub/shared_inbox.rs877
1 files changed, 531 insertions, 346 deletions
diff --git a/server/src/apub/shared_inbox.rs b/server/src/apub/shared_inbox.rs
index 1ada6ad1..66773252 100644
--- a/server/src/apub/shared_inbox.rs
+++ b/server/src/apub/shared_inbox.rs
@@ -16,6 +16,7 @@ use crate::{
GroupExt,
PageExt,
},
+ blocking,
db::{
activity::insert_activity,
comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
@@ -34,6 +35,8 @@ use crate::{
server::{SendComment, SendCommunityRoomMessage, SendPost},
UserOperation,
},
+ DbPool,
+ LemmyError,
};
use activitystreams::{
activity::{Announce, Create, Delete, Dislike, Like, Remove, Undo, Update},
@@ -42,11 +45,10 @@ use activitystreams::{
Base,
BaseBox,
};
-use actix_web::{web, HttpRequest, HttpResponse, Result};
-use diesel::PgConnection;
-use failure::{Error, _core::fmt::Debug};
+use actix_web::{client::Client, web, HttpRequest, HttpResponse};
use log::debug;
use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
#[serde(untagged)]
#[derive(Serialize, Deserialize, Debug)]
@@ -112,11 +114,13 @@ impl SharedAcceptedObjects {
pub async fn shared_inbox(
request: HttpRequest,
input: web::Json<SharedAcceptedObjects>,
- db: DbPoolParam,
+ client: web::Data<Client>,
+ pool: DbPoolParam,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner();
- let conn = &db.get().unwrap();
+ let pool = &pool;
+ let client = &client;
let json = serde_json::to_string(&activity)?;
debug!("Shared inbox received activity: {}", json);
@@ -128,112 +132,120 @@ pub async fn shared_inbox(
let to = cc.replace("/followers", "");
// TODO: this is ugly
- match get_or_fetch_and_upsert_remote_user(&sender.to_string(), &conn) {
- Ok(u) => verify(&request, &u),
+ match get_or_fetch_and_upsert_remote_user(&sender.to_string(), &client, pool).await {
+ Ok(u) => verify(&request, &u)?,
Err(_) => {
- let c = get_or_fetch_and_upsert_remote_community(&sender.to_string(), &conn)?;
- verify(&request, &c)
+ let c = get_or_fetch_and_upsert_remote_community(&sender.to_string(), &client, pool).await?;
+ verify(&request, &c)?;
}
- }?;
+ }
match (activity, object.kind()) {
(SharedAcceptedObjects::Create(c), Some("Page")) => {
- receive_create_post(&c, &conn, chat_server)?;
- announce_activity_if_valid::<Create>(*c, &to, sender, conn)
+ receive_create_post((*c).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Create>(*c, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Update(u), Some("Page")) => {
- receive_update_post(&u, &conn, chat_server)?;
- announce_activity_if_valid::<Update>(*u, &to, sender, conn)
+ receive_update_post((*u).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Update>(*u, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Like(l), Some("Page")) => {
- receive_like_post(&l, &conn, chat_server)?;
- announce_activity_if_valid::<Like>(*l, &to, sender, conn)
+ receive_like_post((*l).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Like>(*l, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Dislike(d), Some("Page")) => {
- receive_dislike_post(&d, &conn, chat_server)?;
- announce_activity_if_valid::<Dislike>(*d, &to, sender, conn)
+ receive_dislike_post((*d).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Dislike>(*d, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Delete(d), Some("Page")) => {
- receive_delete_post(&d, &conn, chat_server)?;
- announce_activity_if_valid::<Delete>(*d, &to, sender, conn)
+ receive_delete_post((*d).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Delete>(*d, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Remove(r), Some("Page")) => {
- receive_remove_post(&r, &conn, chat_server)?;
- announce_activity_if_valid::<Remove>(*r, &to, sender, conn)
+ receive_remove_post((*r).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Remove>(*r, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Create(c), Some("Note")) => {
- receive_create_comment(&c, &conn, chat_server)?;
- announce_activity_if_valid::<Create>(*c, &to, sender, conn)
+ receive_create_comment((*c).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Create>(*c, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Update(u), Some("Note")) => {
- receive_update_comment(&u, &conn, chat_server)?;
- announce_activity_if_valid::<Update>(*u, &to, sender, conn)
+ receive_update_comment((*u).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Update>(*u, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Like(l), Some("Note")) => {
- receive_like_comment(&l, &conn, chat_server)?;
- announce_activity_if_valid::<Like>(*l, &to, sender, conn)
+ receive_like_comment((*l).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Like>(*l, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Dislike(d), Some("Note")) => {
- receive_dislike_comment(&d, &conn, chat_server)?;
- announce_activity_if_valid::<Dislike>(*d, &to, sender, conn)
+ receive_dislike_comment((*d).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Dislike>(*d, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Delete(d), Some("Note")) => {
- receive_delete_comment(&d, &conn, chat_server)?;
- announce_activity_if_valid::<Delete>(*d, &to, sender, conn)
+ receive_delete_comment((*d).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Delete>(*d, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Remove(r), Some("Note")) => {
- receive_remove_comment(&r, &conn, chat_server)?;
- announce_activity_if_valid::<Remove>(*r, &to, sender, conn)
+ receive_remove_comment((*r).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Remove>(*r, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Delete(d), Some("Group")) => {
- receive_delete_community(&d, &conn, chat_server)?;
- announce_activity_if_valid::<Delete>(*d, &to, sender, conn)
+ receive_delete_community((*d).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Delete>(*d, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Remove(r), Some("Group")) => {
- receive_remove_community(&r, &conn, chat_server)?;
- announce_activity_if_valid::<Remove>(*r, &to, sender, conn)
+ receive_remove_community((*r).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Remove>(*r, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Undo(u), Some("Delete")) => {
- receive_undo_delete(&u, &conn, chat_server)?;
- announce_activity_if_valid::<Undo>(*u, &to, sender, conn)
+ receive_undo_delete((*u).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Undo>(*u, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Undo(u), Some("Remove")) => {
- receive_undo_remove(&u, &conn, chat_server)?;
- announce_activity_if_valid::<Undo>(*u, &to, sender, conn)
+ receive_undo_remove((*u).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Undo>(*u, &to, sender, client, pool).await
}
(SharedAcceptedObjects::Undo(u), Some("Like")) => {
- receive_undo_like(&u, &conn, chat_server)?;
- announce_activity_if_valid::<Undo>(*u, &to, sender, conn)
+ receive_undo_like((*u).clone(), client, pool, chat_server).await?;
+ announce_activity_if_valid::<Undo>(*u, &to, sender, client, pool).await
}
- (SharedAcceptedObjects::Announce(a), _) => receive_announce(a, &conn, chat_server),
+ (SharedAcceptedObjects::Announce(a), _) => receive_announce(a, client, pool, chat_server).await,
(a, _) => receive_unhandled_activity(a),
}
}
// TODO: should pass in sender as ActorType, but thats a bit tricky in shared_inbox()
-fn announce_activity_if_valid<A>(
+async fn announce_activity_if_valid<A>(
activity: A,
community_uri: &str,
sender: &str,
- conn: &PgConnection,
-) -> Result<HttpResponse, Error>
+ client: &Client,
+ pool: &DbPool,
+) -> Result<HttpResponse, LemmyError>
where
A: Activity + Base + Serialize + Debug,
{
- let community = Community::read_from_actor_id(conn, &community_uri)?;
+ let community_uri = community_uri.to_owned();
+ let community = blocking(pool, move |conn| {
+ Community::read_from_actor_id(conn, &community_uri)
+ })
+ .await??;
+
if community.local {
- let sending_user = get_or_fetch_and_upsert_remote_user(&sender.to_string(), &conn)?;
- Community::do_announce(activity, &community, &sending_user, conn)
+ let sending_user = get_or_fetch_and_upsert_remote_user(sender, client, pool).await?;
+
+ Community::do_announce(activity, &community, &sending_user, client, pool).await
} else {
Ok(HttpResponse::NotFound().finish())
}
}
-fn receive_announce(
+async fn receive_announce(
announce: Box<Announce>,
- conn: &PgConnection,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let object = announce
.announce_props
.get_object_base_box()
@@ -245,8 +257,8 @@ fn receive_announce(
let create = object.into_concrete::<Create>()?;
let inner_object = create.create_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_create_post(&create, &conn, chat_server),
- Some("Note") => receive_create_comment(&create, &conn, chat_server),
+ Some("Page") => receive_create_post(create, client, pool, chat_server).await,
+ Some("Note") => receive_create_comment(create, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -254,8 +266,8 @@ fn receive_announce(
let update = object.into_concrete::<Update>()?;
let inner_object = update.update_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_update_post(&update, &conn, chat_server),
- Some("Note") => receive_update_comment(&update, &conn, chat_server),
+ Some("Page") => receive_update_post(update, client, pool, chat_server).await,
+ Some("Note") => receive_update_comment(update, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -263,8 +275,8 @@ fn receive_announce(
let like = object.into_concrete::<Like>()?;
let inner_object = like.like_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_like_post(&like, &conn, chat_server),
- Some("Note") => receive_like_comment(&like, &conn, chat_server),
+ Some("Page") => receive_like_post(like, client, pool, chat_server).await,
+ Some("Note") => receive_like_comment(like, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -272,8 +284,8 @@ fn receive_announce(
let dislike = object.into_concrete::<Dislike>()?;
let inner_object = dislike.dislike_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_dislike_post(&dislike, &conn, chat_server),
- Some("Note") => receive_dislike_comment(&dislike, &conn, chat_server),
+ Some("Page") => receive_dislike_post(dislike, client, pool, chat_server).await,
+ Some("Note") => receive_dislike_comment(dislike, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -281,8 +293,8 @@ fn receive_announce(
let delete = object.into_concrete::<Delete>()?;
let inner_object = delete.delete_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_delete_post(&delete, &conn, chat_server),
- Some("Note") => receive_delete_comment(&delete, &conn, chat_server),
+ Some("Page") => receive_delete_post(delete, client, pool, chat_server).await,
+ Some("Note") => receive_delete_comment(delete, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -290,8 +302,8 @@ fn receive_announce(
let remove = object.into_concrete::<Remove>()?;
let inner_object = remove.remove_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Page") => receive_remove_post(&remove, &conn, chat_server),
- Some("Note") => receive_remove_comment(&remove, &conn, chat_server),
+ Some("Page") => receive_remove_post(remove, client, pool, chat_server).await,
+ Some("Note") => receive_remove_comment(remove, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -299,9 +311,9 @@ fn receive_announce(
let undo = object.into_concrete::<Undo>()?;
let inner_object = undo.undo_props.get_object_base_box().unwrap();
match inner_object.kind() {
- Some("Delete") => receive_undo_delete(&undo, &conn, chat_server),
- Some("Remove") => receive_undo_remove(&undo, &conn, chat_server),
- Some("Like") => receive_undo_like(&undo, &conn, chat_server),
+ Some("Delete") => receive_undo_delete(undo, client, pool, chat_server).await,
+ Some("Remove") => receive_undo_remove(undo, client, pool, chat_server).await,
+ Some("Like") => receive_undo_like(undo, client, pool, chat_server).await,
_ => receive_unhandled_activity(announce),
}
}
@@ -309,7 +321,7 @@ fn receive_announce(
}
}
-fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, Error>
+fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
where
A: Debug,
{
@@ -317,11 +329,12 @@ where
Ok(HttpResponse::NotImplemented().finish())
}
-fn receive_create_post(
- create: &Create,
- conn: &PgConnection,
+async fn receive_create_post(
+ create: Create,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let page = create
.create_props
.get_object_base_box()
@@ -336,15 +349,20 @@ fn receive_create_post(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, create, false, pool).await?;
- insert_activity(&conn, user.id, &create, false)?;
+ let post = PostForm::from_apub(&page, client, pool).await?;
- let post = PostForm::from_apub(&page, &conn)?;
- let inserted_post = Post::create(conn, &post)?;
+ let inserted_post = blocking(pool, move |conn| Post::create(conn, &post)).await??;
// Refetch the view
- let post_view = PostView::read(&conn, inserted_post.id, None)?;
+ let inserted_post_id = inserted_post.id;
+ let post_view = blocking(pool, move |conn| {
+ PostView::read(conn, inserted_post_id, None)
+ })
+ .await??;
let res = PostResponse { post: post_view };
@@ -357,11 +375,12 @@ fn receive_create_post(
Ok(HttpResponse::Ok().finish())
}
-fn receive_create_comment(
- create: &Create,
- conn: &PgConnection,
+async fn receive_create_comment(
+ create: Create,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let note = create
.create_props
.get_object_base_box()
@@ -376,23 +395,30 @@ fn receive_create_comment(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
- insert_activity(&conn, user.id, &create, false)?;
+ insert_activity(user.id, create, false, pool).await?;
- let comment = CommentForm::from_apub(&note, &conn)?;
- let inserted_comment = Comment::create(conn, &comment)?;
- let post = Post::read(&conn, inserted_comment.post_id)?;
+ let comment = CommentForm::from_apub(&note, client, pool).await?;
+
+ let inserted_comment = blocking(pool, move |conn| Comment::create(conn, &comment)).await??;
+
+ let post_id = inserted_comment.post_id;
+ let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
// Note:
// Although mentions could be gotten from the post tags (they are included there), or the ccs,
// Its much easier to scrape them from the comment body, since the API has to do that
// anyway.
let mentions = scrape_text_for_mentions(&inserted_comment.content);
- let recipient_ids = send_local_notifs(&conn, &mentions, &inserted_comment, &user, &post);
+ let recipient_ids =
+ send_local_notifs(mentions, inserted_comment.clone(), user, post, pool).await?;
// Refetch the view
- let comment_view = CommentView::read(&conn, inserted_comment.id, None)?;
+ let comment_view = blocking(pool, move |conn| {
+ CommentView::read(conn, inserted_comment.id, None)
+ })
+ .await??;
let res = CommentResponse {
comment: comment_view,
@@ -408,11 +434,12 @@ fn receive_create_comment(
Ok(HttpResponse::Ok().finish())
}
-fn receive_update_post(
- update: &Update,
- conn: &PgConnection,
+async fn receive_update_post(
+ update: Update,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let page = update
.update_props
.get_object_base_box()
@@ -427,16 +454,20 @@ fn receive_update_post(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, update, false, pool).await?;
+
+ let post = PostForm::from_apub(&page, client, pool).await?;
- insert_activity(&conn, user.id, &update, false)?;
+ let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, client, pool)
+ .await?
+ .id;
- let post = PostForm::from_apub(&page, conn)?;
- let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, &conn)?.id;
- Post::update(conn, post_id, &post)?;
+ blocking(pool, move |conn| Post::update(conn, post_id, &post)).await??;
// Refetch the view
- let post_view = PostView::read(&conn, post_id, None)?;
+ let post_view = blocking(pool, move |conn| PostView::read(conn, post_id, None)).await??;
let res = PostResponse { post: post_view };
@@ -449,12 +480,12 @@ fn receive_update_post(
Ok(HttpResponse::Ok().finish())
}
-fn receive_like_post(
- like: &Like,
-
- conn: &PgConnection,
+async fn receive_like_post(
+ like: Like,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let page = like
.like_props
.get_object_base_box()
@@ -465,23 +496,29 @@ fn receive_like_post(
let user_uri = like.like_props.get_actor_xsd_any_uri().unwrap().to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, like, false, pool).await?;
- insert_activity(&conn, user.id, &like, false)?;
+ let post = PostForm::from_apub(&page, client, pool).await?;
- let post = PostForm::from_apub(&page, conn)?;
- let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, &conn)?.id;
+ let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, client, pool)
+ .await?
+ .id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: 1,
};
- PostLike::remove(&conn, &like_form)?;
- PostLike::like(&conn, &like_form)?;
+ blocking(pool, move |conn| {
+ PostLike::remove(conn, &like_form)?;
+ PostLike::like(conn, &like_form)
+ })
+ .await??;
// Refetch the view
- let post_view = PostView::read(&conn, post_id, None)?;
+ let post_view = blocking(pool, move |conn| PostView::read(conn, post_id, None)).await??;
let res = PostResponse { post: post_view };
@@ -494,12 +531,12 @@ fn receive_like_post(
Ok(HttpResponse::Ok().finish())
}
-fn receive_dislike_post(
- dislike: &Dislike,
-
- conn: &PgConnection,
+async fn receive_dislike_post(
+ dislike: Dislike,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let page = dislike
.dislike_props
.get_object_base_box()
@@ -514,23 +551,29 @@ fn receive_dislike_post(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, dislike, false, pool).await?;
- insert_activity(&conn, user.id, &dislike, false)?;
+ let post = PostForm::from_apub(&page, client, pool).await?;
- let post = PostForm::from_apub(&page, conn)?;
- let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, &conn)?.id;
+ let post_id = get_or_fetch_and_insert_remote_post(&post.ap_id, client, pool)
+ .await?
+ .id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: -1,
};
- PostLike::remove(&conn, &like_form)?;
- PostLike::like(&conn, &like_form)?;
+ blocking(pool, move |conn| {
+ PostLike::remove(conn, &like_form)?;
+ PostLike::like(conn, &like_form)
+ })
+ .await??;
// Refetch the view
- let post_view = PostView::read(&conn, post_id, None)?;
+ let post_view = blocking(pool, move |conn| PostView::read(conn, post_id, None)).await??;
let res = PostResponse { post: post_view };
@@ -543,12 +586,12 @@ fn receive_dislike_post(
Ok(HttpResponse::Ok().finish())
}
-fn receive_update_comment(
- update: &Update,
-
- conn: &PgConnection,
+async fn receive_update_comment(
+ update: Update,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let note = update
.update_props
.get_object_base_box()
@@ -563,20 +606,30 @@ fn receive_update_comment(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, update, false, pool).await?;
+
+ let comment = CommentForm::from_apub(&note, client, pool).await?;
+
+ let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, client, pool)
+ .await?
+ .id;
- insert_activity(&conn, user.id, &update, false)?;
+ let updated_comment = blocking(pool, move |conn| {
+ Comment::update(conn, comment_id, &comment)
+ })
+ .await??;
- let comment = CommentForm::from_apub(&note, &conn)?;
- let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, &conn)?.id;
- let updated_comment = Comment::update(conn, comment_id, &comment)?;
- let post = Post::read(&conn, updated_comment.post_id)?;
+ let post_id = updated_comment.post_id;
+ let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
let mentions = scrape_text_for_mentions(&updated_comment.content);
- let recipient_ids = send_local_notifs(&conn, &mentions, &updated_comment, &user, &post);
+ let recipient_ids = send_local_notifs(mentions, updated_comment, user, post, pool).await?;
// Refetch the view
- let comment_view = CommentView::read(&conn, comment_id, None)?;
+ let comment_view =
+ blocking(pool, move |conn| CommentView::read(conn, comment_id, None)).await??;
let res = CommentResponse {
comment: comment_view,
@@ -592,12 +645,12 @@ fn receive_update_comment(
Ok(HttpResponse::Ok().finish())
}
-fn receive_like_comment(
- like: &Like,
-
- conn: &PgConnection,
+async fn receive_like_comment(
+ like: Like,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let note = like
.like_props
.get_object_base_box()
@@ -608,23 +661,31 @@ fn receive_like_comment(
let user_uri = like.like_props.get_actor_xsd_any_uri().unwrap().to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, like, false, pool).await?;
- insert_activity(&conn, user.id, &like, false)?;
+ let comment = CommentForm::from_apub(&note, client, pool).await?;
+
+ let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, client, pool)
+ .await?
+ .id;
- let comment = CommentForm::from_apub(&note, &conn)?;
- let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, &conn)?.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: 1,
};
- CommentLike::remove(&conn, &like_form)?;
- CommentLike::like(&conn, &like_form)?;
+ blocking(pool, move |conn| {
+ CommentLike::remove(conn, &like_form)?;
+ CommentLike::like(conn, &like_form)
+ })
+ .await??;
// Refetch the view
- let comment_view = CommentView::read(&conn, comment_id, None)?;
+ let comment_view =
+ blocking(pool, move |conn| CommentView::read(conn, comment_id, None)).await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
@@ -642,12 +703,12 @@ fn receive_like_comment(
Ok(HttpResponse::Ok().finish())
}
-fn receive_dislike_comment(
- dislike: &Dislike,
-
- conn: &PgConnection,
+async fn receive_dislike_comment(
+ dislike: Dislike,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let note = dislike
.dislike_props
.get_object_base_box()
@@ -662,23 +723,31 @@ fn receive_dislike_comment(
.unwrap()
.to_string();
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
- insert_activity(&conn, user.id, &dislike, false)?;
+ insert_activity(user.id, dislike, false, pool).await?;
+
+ let comment = CommentForm::from_apub(&note, client, pool).await?;
+
+ let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, client, pool)
+ .await?
+ .id;
- let comment = CommentForm::from_apub(&note, &conn)?;
- let comment_id = get_or_fetch_and_insert_remote_comment(&comment.ap_id, &conn)?.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: -1,
};
- CommentLike::remove(&conn, &like_form)?;
- CommentLike::like(&conn, &like_form)?;
+ blocking(pool, move |conn| {
+ CommentLike::remove(conn, &like_form)?;
+ CommentLike::like(conn, &like_form)
+ })
+ .await??;
// Refetch the view
- let comment_view = CommentView::read(&conn, comment_id, None)?;
+ let comment_view =
+ blocking(pool, move |conn| CommentView::read(conn, comment_id, None)).await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
@@ -696,12 +765,12 @@ fn receive_dislike_comment(
Ok(HttpResponse::Ok().finish())
}
-fn receive_delete_community(
- delete: &Delete,
-
- conn: &PgConnection,
+async fn receive_delete_community(
+ delete: Delete,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let user_uri = delete
.delete_props
.get_actor_xsd_any_uri()
@@ -716,12 +785,18 @@ fn receive_delete_community(
.to_owned()
.into_concrete::<GroupExt>()?;
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, delete, false, pool).await?;
- insert_activity(&conn, user.id, &delete, false)?;
+ let community_actor_id = CommunityForm::from_apub(&group, client, pool)
+ .await?
+ .actor_id;
- let community_actor_id = CommunityForm::from_apub(&group, &conn)?.actor_id;
- let community = Community::read_from_actor_id(conn, &community_actor_id)?;
+ let community = blocking(pool, move |conn| {
+ Community::read_from_actor_id(conn, &community_actor_id)
+ })
+ .await??;
let community_form = CommunityForm {
name: community.name.to_owned(),
@@ -741,28 +816,38 @@ fn receive_delete_community(
last_refreshed_at: None,
};
- Community::update(&conn, community.id, &community_form)?;
+ let community_id = community.id;
+ blocking(pool, move |conn| {
+ Community::update(conn, community_id, &community_form)
+ })
+ .await??;
+ let community_id = community.id;
let res = CommunityResponse {
- community: CommunityView::read(&conn, community.id, None)?,
+ community: blocking(pool, move |conn| {
+ CommunityView::read(conn, community_id, None)
+ })
+ .await??,
};
+ let community_id = res.community.id;
+
chat_server.do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
- community_id: community.id,
+ community_id,
my_id: None,
});
Ok(HttpResponse::Ok().finish())
}
-fn receive_remove_community(
- remove: &Remove,
-
- conn: &PgConnection,
+async fn receive_remove_community(
+ remove: Remove,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let mod_uri = remove
.remove_props
.get_actor_xsd_any_uri()
@@ -777,12 +862,18 @@ fn receive_remove_community(
.to_owned()
.into_concrete::<GroupExt>()?;
- let mod_ = get_or_fetch_and_upsert_remote_user(&mod_uri, &conn)?;
+ let mod_ = get_or_fetch_and_upsert_remote_user(&mod_uri, client, pool).await?;
- insert_activity(&conn, mod_.id, &remove, false)?;
+ insert_activity(mod_.id, remove, false, pool).await?;
- let community_actor_id = CommunityForm::from_apub(&group, &conn)?.actor_id;
- let community = Community::read_from_actor_id(conn, &community_actor_id)?;
+ let community_actor_id = CommunityForm::from_apub(&group, client, pool)
+ .await?
+ .actor_id;
+
+ let community = blocking(pool, move |conn| {
+ Community::read_from_actor_id(conn, &community_actor_id)
+ })
+ .await??;
let community_form = CommunityForm {
name: community.name.to_owned(),
@@ -802,28 +893,38 @@ fn receive_remove_community(
last_refreshed_at: None,
};
- Community::update(&conn, community.id, &community_form)?;
+ let community_id = community.id;
+ blocking(pool, move |conn| {
+ Community::update(conn, community_id, &community_form)
+ })
+ .await??;
+ let community_id = community.id;
let res = CommunityResponse {
- community: CommunityView::read(&conn, community.id, None)?,
+ community: blocking(pool, move |conn| {
+ CommunityView::read(conn, community_id, None)
+ })
+ .await??,
};
+ let community_id = res.community.id;
+
chat_server.do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
- community_id: community.id,
+ community_id,
my_id: None,
});
Ok(HttpResponse::Ok().finish())
}
-fn receive_delete_post(
- delete: &Delete,
-
- conn: &PgConnection,
+async fn receive_delete_post(
+ delete: Delete,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let user_uri = delete
.delete_props
.get_actor_xsd_any_uri()
@@ -838,12 +939,13 @@ fn receive_delete_post(
.to_owned()
.into_concrete::<PageExt>()?;
- let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?;
+ let user = get_or_fetch_and_upsert_remote_user(&user_uri, client, pool).await?;
+
+ insert_activity(user.id, delete, false, pool).await?;
- insert_activity(&conn, user.id, &delete, false)?;
+ let post_ap_id = PostForm::from_apub(&page, client, pool).await?.ap_id;
- let post_ap_id = PostForm::from_apub(&page, conn)?.ap_id;
- let post = get_or_fetch_and_insert_remote_post(&post_ap_id, &conn)?;
+ let post = get_or_fetch_and_insert_remote_post(&post_ap_id, client, pool).await?;
let post_form = PostForm {
name: post.name.to_owned(),
@@ -865,10 +967,12 @@ fn receive_delete_post(
local: post.local,
published: None,
};
- Post::update(&conn, post.id, &post_form)?;
+ let post_id = post.id;
+ blocking(pool, move |conn| Post::update(conn, post_id, &post_form)).await??;
// Refetch the view
- let post_view = PostView::read(&conn, post.id, None)?;
+ let post_id = post.id;
+ let post_view = blocking(pool, move |conn| PostView::read(conn, post_id, None)).await??;
let res = PostResponse { post: post_view };
@@ -881,12 +985,12 @@ fn receive_delete_post(
Ok(HttpResponse::Ok().finish())
}
-fn receive_remove_post(
- remove: &Remove,
-
- conn: &PgConnection,
+async fn receive_remove_post(
+ remove: Remove,
+ client: &Client,
+ pool: &DbPool,
chat_server: ChatServerParam,
-) -> Result<HttpResponse, Error> {
+) -> Result<HttpResponse, LemmyError> {
let mod_uri = remove
.remove_props
.get_actor_xsd_any_uri()
@@ -901,12 +1005,13 @@ fn receive_remove_post(
.to_owned()
.into_concrete::<PageExt>()?;
- let mod_ = get_or_fetch_and_upsert_remote_user(&mod_uri, &conn)?;
+ let mod_ = get_or_fetch_and_upsert_remote_user(&mod_uri, client, pool).await?;
- insert_activity(&conn, mod_.id, &remove, false)?;
+ insert_activity(mod_.id, remove, false, pool).await?;
- let