summaryrefslogtreecommitdiffstats
path: root/server/src/apub/community.rs
diff options
context:
space:
mode:
authorRiley <asonix@asonix.dog>2020-07-01 07:54:29 -0500
committerGitHub <noreply@github.com>2020-07-01 08:54:29 -0400
commita074564458b8a108b77d98e5e8ce24168656763a (patch)
tree8cfb4e463b6b2dbd3c4b3ac2f312a42542f38d64 /server/src/apub/community.rs
parent4c1cb5999cad496714cec67f101be38cd281d416 (diff)
Federation async (#848)
* Asyncify more * I guess these changed * Clean PR a bit * Convert more away from failure error * config changes for testing federation * It was DNS So actix-web's client relies on TRust DNS Resolver to figure out where to send data, but TRust DNS Resolver seems to not play nice with docker, which expressed itself as not resolving the name to an IP address _the first time_ when making a request. The fix was literally to make the request again (which I limited to 3 times total, and not exceeding the request timeout in total) * Only retry for connecterror Since TRust DNS Resolver was causing ConnectError::Timeout, this change limits the retry to only this error, returning immediately for any other error * Use http sig norm 0.4.0-alpha for actix-web 3.0 support * Blocking function, retry http requests * cargo +nightly fmt * Only create one pictrs dir * Don't yarn build * cargo +nightly fmt
Diffstat (limited to 'server/src/apub/community.rs')
-rw-r--r--server/src/apub/community.rs204
1 files changed, 139 insertions, 65 deletions
diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs
index 8c8c3b28..f866511c 100644
--- a/server/src/apub/community.rs
+++ b/server/src/apub/community.rs
@@ -12,6 +12,7 @@ use crate::{
GroupExt,
ToApub,
},
+ blocking,
convert_datetime,
db::{
activity::insert_activity,
@@ -21,6 +22,8 @@ use crate::{
},
naive_now,
routes::DbPoolParam,
+ DbPool,
+ LemmyError,
};
use activitystreams::{
activity::{Accept, Announce, Delete, Remove, Undo},
@@ -35,22 +38,22 @@ use activitystreams::{
};
use activitystreams_ext::Ext3;
use activitystreams_new::{activity::Follow, object::Tombstone};
-use actix_web::{body::Body, web::Path, HttpResponse, Result};
-use diesel::PgConnection;
-use failure::{Error, _core::fmt::Debug};
+use actix_web::{body::Body, client::Client, web, HttpResponse};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
#[derive(Deserialize)]
pub struct CommunityQuery {
community_name: String,
}
+#[async_trait::async_trait(?Send)]
impl ToApub for Community {
type Response = GroupExt;
// Turn a Lemmy Community into an ActivityPub group that can be sent out over the network.
- fn to_apub(&self, conn: &PgConnection) -> Result<GroupExt, Error> {
+ async fn to_apub(&self, pool: &DbPool) -> Result<GroupExt, LemmyError> {
let mut group = Group::default();
let oprops: &mut ObjectProperties = group.as_mut();
@@ -58,10 +61,12 @@ impl ToApub for Community {
// then the rest of the moderators
// TODO Technically the instance admins can mod the community, but lets
// ignore that for now
- let moderators = CommunityModeratorView::for_community(&conn, self.id)?
- .into_iter()
- .map(|m| m.user_actor_id)
- .collect();
+ let id = self.id;
+ let moderators = blocking(pool, move |conn| {
+ CommunityModeratorView::for_community(&conn, id)
+ })
+ .await??;
+ let moderators = moderators.into_iter().map(|m| m.user_actor_id).collect();
oprops
.set_context_xsd_any_uri(context())?
@@ -92,7 +97,12 @@ impl ToApub for Community {
.set_endpoints(endpoint_props)?
.set_followers(self.get_followers_url())?;
- let group_extension = GroupExtension::new(conn, self.category_id, self.nsfw)?;
+ let nsfw = self.nsfw;
+ let category_id = self.category_id;
+ let group_extension = blocking(pool, move |conn| {
+ GroupExtension::new(conn, category_id, nsfw)
+ })
+ .await??;
Ok(Ext3::new(
group,
@@ -102,7 +112,7 @@ impl ToApub for Community {
))
}
- fn to_tombstone(&self) -> Result<Tombstone, Error> {
+ fn to_tombstone(&self) -> Result<Tombstone, LemmyError> {
create_tombstone(
self.deleted,
&self.actor_id,
@@ -112,6 +122,7 @@ impl ToApub for Community {
}
}
+#[async_trait::async_trait(?Send)]
impl ActorType for Community {
fn actor_id(&self) -> String {
self.actor_id.to_owned()
@@ -125,7 +136,12 @@ impl ActorType for Community {
}
/// As a local community, accept the follow request from a remote user.
- fn send_accept_follow(&self, follow: &Follow, conn: &PgConnection) -> Result<(), Error> {
+ async fn send_accept_follow(
+ &self,
+ follow: &Follow,
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<(), LemmyError> {
let actor_uri = follow.actor.as_single_xsd_any_uri().unwrap().to_string();
let id = format!("{}/accept/{}", self.actor_id, uuid::Uuid::new_v4());
@@ -140,14 +156,20 @@ impl ActorType for Community {
.set_object_base_box(BaseBox::from_concrete(follow.clone())?)?;
let to = format!("{}/inbox", actor_uri);
- insert_activity(&conn, self.creator_id, &accept, true)?;
+ insert_activity(self.creator_id, accept.clone(), true, pool).await?;
- send_activity(&accept, self, vec![to])?;
+ send_activity(client, &accept, self, vec![to]).await?;
Ok(())
}
- fn send_delete(&self, creator: &User_, conn: &PgConnection) -> Result<(), Error> {
- let group = self.to_apub(conn)?;
+ async fn send_delete(
+ &self,
+ creator: &User_,
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<(), LemmyError> {
+ let group = self.to_apub(pool).await?;
+
let id = format!("{}/delete/{}", self.actor_id, uuid::Uuid::new_v4());
let mut delete = Delete::default();
@@ -162,17 +184,25 @@ impl ActorType for Community {
.set_actor_xsd_any_uri(creator.actor_id.to_owned())?
.set_object_base_box(BaseBox::from_concrete(group)?)?;
- insert_activity(&conn, self.creator_id, &delete, true)?;
+ insert_activity(self.creator_id, delete.clone(), true, pool).await?;
+
+ let inboxes = self.get_follower_inboxes(pool).await?;
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
- send_activity(&delete, creator, self.get_follower_inboxes(&conn)?)?;
+ send_activity(client, &delete, creator, inboxes).await?;
Ok(())
}
- fn send_undo_delete(&self, creator: &User_, conn: &PgConnection) -> Result<(), Error> {
- let group = self.to_apub(conn)?;
+ async fn send_undo_delete(
+ &self,
+ creator: &User_,
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<(), LemmyError> {
+ let group = self.to_apub(pool).await?;
+
let id = format!("{}/delete/{}", self.actor_id, uuid::Uuid::new_v4());
let mut delete = Delete::default();
@@ -203,17 +233,25 @@ impl ActorType for Community {
.set_actor_xsd_any_uri(creator.actor_id.to_owned())?
.set_object_base_box(delete)?;
- insert_activity(&conn, self.creator_id, &undo, true)?;
+ insert_activity(self.creator_id, undo.clone(), true, pool).await?;
+
+ let inboxes = self.get_follower_inboxes(pool).await?;
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
- send_activity(&undo, creator, self.get_follower_inboxes(&conn)?)?;
+ send_activity(client, &undo, creator, inboxes).await?;
Ok(())
}
- fn send_remove(&self, mod_: &User_, conn: &PgConnection) -> Result<(), Error> {
- let group = self.to_apub(conn)?;
+ async fn send_remove(
+ &self,
+ mod_: &User_,
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<(), LemmyError> {
+ let group = self.to_apub(pool).await?;
+
let id = format!("{}/remove/{}", self.actor_id, uuid::Uuid::new_v4());
let mut remove = Remove::default();
@@ -228,17 +266,25 @@ impl ActorType for Community {
.set_actor_xsd_any_uri(mod_.actor_id.to_owned())?
.set_object_base_box(BaseBox::from_concrete(group)?)?;
- insert_activity(&conn, mod_.id, &remove, true)?;
+ insert_activity(mod_.id, remove.clone(), true, pool).await?;
+
+ let inboxes = self.get_follower_inboxes(pool).await?;
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
- send_activity(&remove, mod_, self.get_follower_inboxes(&conn)?)?;
+ send_activity(client, &remove, mod_, inboxes).await?;
Ok(())
}
- fn send_undo_remove(&self, mod_: &User_, conn: &PgConnection) -> Result<(), Error> {
- let group = self.to_apub(conn)?;
+ async fn send_undo_remove(
+ &self,
+ mod_: &User_,
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<(), LemmyError> {
+ let group = self.to_apub(pool).await?;
+
let id = format!("{}/remove/{}", self.actor_id, uuid::Uuid::new_v4());
let mut remove = Remove::default();
@@ -268,51 +314,69 @@ impl ActorType for Community {
.set_actor_xsd_any_uri(mod_.actor_id.to_owned())?
.set_object_base_box(remove)?;
- insert_activity(&conn, mod_.id, &undo, true)?;
+ insert_activity(mod_.id, undo.clone(), true, pool).await?;
+
+ let inboxes = self.get_follower_inboxes(pool).await?;
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for remove , the creator is the actor, and does the signing
- send_activity(&undo, mod_, self.get_follower_inboxes(&conn)?)?;
+ send_activity(client, &undo, mod_, inboxes).await?;
Ok(())
}
/// For a given community, returns the inboxes of all followers.
- fn get_follower_inboxes(&self, conn: &PgConnection) -> Result<Vec<String>, Error> {
- Ok(
- CommunityFollowerView::for_community(conn, self.id)?
- .into_iter()
- .map(|c| get_shared_inbox(&c.user_actor_id))
- .filter(|s| !s.is_empty())
- .unique()
- .collect(),
- )
+ async fn get_follower_inboxes(&self, pool: &DbPool) -> Result<Vec<String>, LemmyError> {
+ let id = self.id;
+
+ let inboxes = blocking(pool, move |conn| {
+ CommunityFollowerView::for_community(conn, id)
+ })
+ .await??;
+ let inboxes = inboxes
+ .into_iter()
+ .map(|c| get_shared_inbox(&c.user_actor_id))
+ .filter(|s| !s.is_empty())
+ .unique()
+ .collect();
+
+ Ok(inboxes)
}
- fn send_follow(&self, _follow_actor_id: &str, _conn: &PgConnection) -> Result<(), Error> {
+ async fn send_follow(
+ &self,
+ _follow_actor_id: &str,
+ _client: &Client,
+ _pool: &DbPool,
+ ) -> Result<(), LemmyError> {
unimplemented!()
}
- fn send_unfollow(&self, _follow_actor_id: &str, _conn: &PgConnection) -> Result<(), Error> {
+ async fn send_unfollow(
+ &self,
+ _follow_actor_id: &str,
+ _client: &Client,
+ _pool: &DbPool,
+ ) -> Result<(), LemmyError> {
unimplemented!()
}
}
+#[async_trait::async_trait(?Send)]
impl FromApub for CommunityForm {
type ApubType = GroupExt;
/// Parse an ActivityPub group received from another instance into a Lemmy community.
- fn from_apub(group: &GroupExt, conn: &PgConnection) -> Result<Self, Error> {
+ async fn from_apub(group: &GroupExt, client: &Client, pool: &DbPool) -> Result<Self, LemmyError> {
let group_extensions: &GroupExtension = &group.ext_one;
let oprops = &group.inner.object_props;
let aprops = &group.ext_two;
let public_key: &PublicKey = &group.ext_three.public_key;
let mut creator_and_moderator_uris = oprops.get_many_attributed_to_xsd_any_uris().unwrap();
- let creator = creator_and_moderator_uris
- .next()
- .map(|c| get_or_fetch_and_upsert_remote_user(&c.to_string(), &conn).unwrap())
- .unwrap();
+ let creator_uri = creator_and_moderator_uris.next().unwrap();
+
+ let creator = get_or_fetch_and_upsert_remote_user(creator_uri.as_str(), client, pool).await?;
Ok(CommunityForm {
name: oprops.get_name_xsd_string().unwrap().to_string(),
@@ -342,14 +406,18 @@ impl FromApub for CommunityForm {
/// Return the community json over HTTP.
pub async fn get_apub_community_http(
- info: Path<CommunityQuery>,
+ info: web::Path<CommunityQuery>,
db: DbPoolParam,
-) -> Result<HttpResponse<Body>, Error> {
- let community = Community::read_from_name(&&db.get()?, &info.community_name)?;
+) -> Result<HttpResponse<Body>, LemmyError> {
+ let community = blocking(&db, move |conn| {
+ Community::read_from_name(conn, &info.community_name)
+ })
+ .await??;
+
if !community.deleted {
- Ok(create_apub_response(
- &community.to_apub(&db.get().unwrap())?,
- ))
+ let apub = community.to_apub(&db).await?;
+
+ Ok(create_apub_response(&apub))
} else {
Ok(create_apub_tombstone_response(&community.to_tombstone()?))
}
@@ -357,15 +425,19 @@ pub async fn get_apub_community_http(
/// Returns an empty followers collection, only populating the size (for privacy).
pub async fn get_apub_community_followers(
- info: Path<CommunityQuery>,
+ info: web::Path<CommunityQuery>,
db: DbPoolParam,
-) -> Result<HttpResponse<Body>, Error> {
- let community = Community::read_from_name(&&db.get()?, &info.community_name)?;
-
- let conn = db.get()?;
-
- //As we are an object, we validated that the community id was valid
- let community_followers = CommunityFollowerView::for_community(&conn, community.id).unwrap();
+) -> Result<HttpResponse<Body>, LemmyError> {
+ let community = blocking(&db, move |conn| {
+ Community::read_from_name(&conn, &info.community_name)
+ })
+ .await??;
+
+ let community_id = community.id;
+ let community_followers = blocking(&db, move |conn| {
+ CommunityFollowerView::for_community(&conn, community_id)
+ })
+ .await??;
let mut collection = UnorderedCollection::default();
let oprops: &mut ObjectProperties = collection.as_mut();
@@ -379,12 +451,13 @@ pub async fn get_apub_community_followers(
}
impl Community {
- pub fn do_announce<A>(
+ pub async fn do_announce<A>(
activity: A,
community: &Community,
sender: &dyn ActorType,
- conn: &PgConnection,
- ) -> Result<HttpResponse, Error>
+ client: &Client,
+ pool: &DbPool,
+ ) -> Result<HttpResponse, LemmyError>
where
A: Activity + Base + Serialize + Debug,
{
@@ -399,15 +472,16 @@ impl Community {
.set_actor_xsd_any_uri(community.actor_id.to_owned())?
.set_object_base_box(BaseBox::from_concrete(activity)?)?;
- insert_activity(&conn, community.creator_id, &announce, true)?;
+ insert_activity(community.creator_id, announce.clone(), true, pool).await?;
// dont send to the instance where the activity originally came from, because that would result
// in a database error (same data inserted twice)
- let mut to = community.get_follower_inboxes(&conn)?;
+ let mut to = community.get_follower_inboxes(pool).await?;
+
// this seems to be the "easiest" stable alternative for remove_item()
to.retain(|x| *x != sender.get_shared_inbox_url());
- send_activity(&announce, community, to)?;
+ send_activity(client, &announce, community, to).await?;
Ok(HttpResponse::Ok().finish())
}