summaryrefslogtreecommitdiffstats
path: root/server/src/apub/activities.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/apub/activities.rs')
-rw-r--r--server/src/apub/activities.rs59
1 files changed, 37 insertions, 22 deletions
diff --git a/server/src/apub/activities.rs b/server/src/apub/activities.rs
index b5bb9d76..e5dc7045 100644
--- a/server/src/apub/activities.rs
+++ b/server/src/apub/activities.rs
@@ -1,20 +1,22 @@
use crate::{
apub::{extensions::signatures::sign, is_apub_id_valid, ActorType},
db::{activity::insert_activity, community::Community, user::User_},
+ request::retry_custom,
+ DbPool,
+ LemmyError,
};
use activitystreams::{context, object::properties::ObjectProperties, public, Activity, Base};
-use diesel::PgConnection;
-use failure::{Error, _core::fmt::Debug};
-use isahc::prelude::*;
+use actix_web::client::Client;
use log::debug;
use serde::Serialize;
+use std::fmt::Debug;
use url::Url;
pub fn populate_object_props(
props: &mut ObjectProperties,
addressed_ccs: Vec<String>,
object_id: &str,
-) -> Result<(), Error> {
+) -> Result<(), LemmyError> {
props
.set_context_xsd_any_uri(context())?
// TODO: the activity needs a seperate id from the object
@@ -26,48 +28,61 @@ pub fn populate_object_props(
Ok(())
}
-pub fn send_activity_to_community<A>(
+pub async fn send_activity_to_community<A>(
creator: &User_,
- conn: &PgConnection,
community: &Community,
to: Vec<String>,
activity: A,
-) -> Result<(), Error>
+ client: &Client,
+ pool: &DbPool,
+) -> Result<(), LemmyError>
where
- A: Activity + Base + Serialize + Debug,
+ A: Activity + Base + Serialize + Debug + Clone + Send + 'static,
{
- insert_activity(&conn, creator.id, &activity, true)?;
+ insert_activity(creator.id, activity.clone(), true, pool).await?;
// if this is a local community, we need to do an announce from the community instead
if community.local {
- Community::do_announce(activity, &community, creator, conn)?;
+ Community::do_announce(activity, &community, creator, client, pool).await?;
} else {
- send_activity(&activity, creator, to)?;
+ send_activity(client, &activity, creator, to).await?;
}
+
Ok(())
}
/// Send an activity to a list of recipients, using the correct headers etc.
-pub fn send_activity<A>(activity: &A, actor: &dyn ActorType, to: Vec<String>) -> Result<(), Error>
+pub async fn send_activity<A>(
+ client: &Client,
+ activity: &A,
+ actor: &dyn ActorType,
+ to: Vec<String>,
+) -> Result<(), LemmyError>
where
- A: Serialize + Debug,
+ A: Serialize,
{
- let json = serde_json::to_string(&activity)?;
- debug!("Sending activitypub activity {} to {:?}", json, to);
+ let activity = serde_json::to_string(&activity)?;
+ debug!("Sending activitypub activity {} to {:?}", activity, to);
+
for t in to {
let to_url = Url::parse(&t)?;
if !is_apub_id_valid(&to_url) {
debug!("Not sending activity to {} (invalid or blocklisted)", t);
continue;
}
- let request = Request::post(t).header("Host", to_url.domain().unwrap());
- let signature = sign(&request, actor)?;
- let res = request
- .header("Signature", signature)
- .header("Content-Type", "application/json")
- .body(json.to_owned())?
- .send()?;
+
+ let res = retry_custom(|| async {
+ let request = client.post(&t).header("Content-Type", "application/json");
+
+ match sign(request, actor, activity.clone()).await {
+ Ok(signed) => Ok(signed.send().await),
+ Err(e) => Err(e),
+ }
+ })
+ .await?;
+
debug!("Result for activity send: {:?}", res);
}
+
Ok(())
}