summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorColin Reeder <vpzomtrrfrt@gmail.com>2020-11-06 20:22:17 -0700
committerColin Reeder <vpzomtrrfrt@gmail.com>2020-11-06 20:22:17 -0700
commita2c65e98e69a5f0e22dc83ff62399de6850cb94b (patch)
treed6ca0befb537ca67a375ea1a5b681102f2492383
parent50b3d3172e0e9e8613eb2cb1061fe724810bea7b (diff)
Use new ingestion for fetch_actor, merge BaseContext and RouteContext
-rw-r--r--src/apub_util/ingest.rs29
-rw-r--r--src/apub_util/mod.rs133
-rw-r--r--src/main.rs28
-rw-r--r--src/routes/api/mod.rs4
-rw-r--r--src/routes/apub/mod.rs8
-rw-r--r--src/tasks.rs13
-rw-r--r--src/worker.rs9
7 files changed, 40 insertions, 184 deletions
diff --git a/src/apub_util/ingest.rs b/src/apub_util/ingest.rs
index 32c6d6c..f603f14 100644
--- a/src/apub_util/ingest.rs
+++ b/src/apub_util/ingest.rs
@@ -27,7 +27,7 @@ impl FoundFrom {
pub async fn ingest_object(
object: Verified<KnownObject>,
found_from: FoundFrom,
- ctx: Arc<crate::RouteContext>,
+ ctx: Arc<crate::BaseContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
let db = ctx.db_pool.get().await?;
match object.into_inner() {
@@ -157,13 +157,8 @@ pub async fn ingest_object(
crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));
- let follower_local_id = crate::apub_util::get_or_fetch_user_local_id(
- follower_ap_id,
- &db,
- &ctx.host_url_apub,
- &ctx.http_client,
- )
- .await?;
+ let follower_local_id =
+ crate::apub_util::get_or_fetch_user_local_id(follower_ap_id, &db, &ctx).await?;
if let Some(target) = target {
if let Some(community_id) =
@@ -363,7 +358,7 @@ pub async fn ingest_object(
pub fn ingest_object_boxed(
object: Verified<KnownObject>,
found_from: FoundFrom,
- ctx: Arc<crate::RouteContext>,
+ ctx: Arc<crate::BaseContext>,
) -> std::pin::Pin<
Box<dyn Future<Output = Result<Option<super::ActorLocalInfo>, crate::Error>> + Send>,
> {
@@ -383,9 +378,7 @@ pub async fn ingest_like(
if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
super::require_containment(activity_id, actor_id)?;
- let actor_local_id =
- super::get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client)
- .await?;
+ let actor_local_id = super::get_or_fetch_user_local_id(actor_id, &db, &ctx).await?;
if let Some(object_id) = activity.object().as_single_id() {
let thing_local_ref = if let Some(remaining) =
@@ -566,7 +559,7 @@ pub async fn ingest_undo(
pub async fn ingest_create(
activity: Verified<activitystreams::activity::Create>,
- ctx: Arc<crate::RouteContext>,
+ ctx: Arc<crate::BaseContext>,
) -> Result<(), crate::Error> {
for req_obj in activity.object().iter() {
let object_id = req_obj.id();
@@ -819,10 +812,7 @@ async fn handle_recieved_reply(
let db = ctx.db_pool.get().await?;
let author = match author {
- Some(author) => Some(
- super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
- .await?,
- ),
+ Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
None => None,
};
@@ -999,10 +989,7 @@ async fn handle_recieved_post(
) -> Result<(), crate::Error> {
let db = ctx.db_pool.get().await?;
let author = match author {
- Some(author) => Some(
- super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
- .await?,
- ),
+ Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
None => None,
};
diff --git a/src/apub_util/mod.rs b/src/apub_util/mod.rs
index 8925aa4..e49c2e8 100644
--- a/src/apub_util/mod.rs
+++ b/src/apub_util/mod.rs
@@ -361,116 +361,11 @@ pub async fn fetch_ap_object(
pub async fn fetch_actor(
req_ap_id: &url::Url,
- db: &tokio_postgres::Client,
- http_client: &crate::HttpClient,
+ ctx: Arc<crate::BaseContext>,
) -> Result<ActorLocalInfo, crate::Error> {
- let obj = fetch_ap_object(req_ap_id, http_client).await?;
- let ap_id = req_ap_id;
-
- match obj.deref() {
- KnownObject::Person(person) => {
- let username = person
- .preferred_username()
- .or_else(|| {
- person
- .name()
- .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
- })
- .unwrap_or("");
- let inbox = person.inbox_unchecked().as_str();
- let shared_inbox = person
- .endpoints_unchecked()
- .and_then(|endpoints| endpoints.shared_inbox)
- .map(|url| url.as_str());
- let public_key = person
- .ext_one
- .public_key
- .as_ref()
- .map(|key| key.public_key_pem.as_bytes());
- let public_key_sigalg = person
- .ext_one
- .public_key
- .as_ref()
- .and_then(|key| key.signature_algorithm.as_deref());
- let description = person
- .summary()
- .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
- .unwrap_or("");
-
- let avatar = person.icon().and_then(|icon| {
- icon.iter()
- .filter_map(|icon| {
- if icon.kind_str() == Some("Image") {
- match activitystreams::object::Image::from_any_base(icon.clone()) {
- Err(_) | Ok(None) => None,
- Ok(Some(icon)) => Some(icon),
- }
- } else {
- None
- }
- })
- .next()
- });
- let avatar = avatar
- .as_ref()
- .and_then(|icon| icon.url().and_then(|url| url.as_single_id()))
- .map(|x| x.as_str());
-
- let id = UserLocalID(db.query_one(
- "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id",
- &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
- ).await?.get(0));
-
- Ok(ActorLocalInfo::User {
- id,
- public_key: public_key.map(|key| PubKeyInfo {
- algorithm: get_message_digest(public_key_sigalg),
- key: key.to_owned(),
- }),
- })
- }
- KnownObject::Group(group) => {
- let name = group
- .preferred_username()
- .or_else(|| {
- group
- .name()
- .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
- })
- .unwrap_or("");
- let description = group
- .summary()
- .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
- .unwrap_or("");
- let inbox = group.inbox_unchecked().as_str();
- let shared_inbox = group
- .endpoints_unchecked()
- .and_then(|endpoints| endpoints.shared_inbox)
- .map(|url| url.as_str());
- let public_key = group
- .ext_one
- .public_key
- .as_ref()
- .map(|key| key.public_key_pem.as_bytes());
- let public_key_sigalg = group
- .ext_one
- .public_key
- .as_ref()
- .and_then(|key| key.signature_algorithm.as_deref());
-
- let id = CommunityLocalID(db.query_one(
- "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id",
- &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
- ).await?.get(0));
-
- Ok(ActorLocalInfo::Community {
- id,
- public_key: public_key.map(|key| PubKeyInfo {
- algorithm: get_message_digest(public_key_sigalg),
- key: key.to_owned(),
- }),
- })
- }
+ let obj = fetch_ap_object(req_ap_id, &ctx.http_client).await?;
+ match ingest::ingest_object_boxed(obj, ingest::FoundFrom::Other, ctx).await? {
+ Some(info) => Ok(info),
_ => Err(crate::Error::InternalStrStatic("Unrecognized actor type")),
}
}
@@ -478,10 +373,9 @@ pub async fn fetch_actor(
pub async fn get_or_fetch_user_local_id(
ap_id: &url::Url,
db: &tokio_postgres::Client,
- host_url_apub: &BaseURL,
- http_client: &crate::HttpClient,
+ ctx: &Arc<crate::BaseContext>,
) -> Result<UserLocalID, crate::Error> {
- if let Some(remaining) = try_strip_host(ap_id, host_url_apub) {
+ if let Some(remaining) = try_strip_host(ap_id, &ctx.host_url_apub) {
if remaining.starts_with("/users/") {
Ok(remaining[7..].parse()?)
} else {
@@ -499,7 +393,7 @@ pub async fn get_or_fetch_user_local_id(
None => {
// Not known yet, time to fetch
- let actor = fetch_actor(ap_id, db, http_client).await?;
+ let actor = fetch_actor(ap_id, ctx.clone()).await?;
if let ActorLocalInfo::User { id, .. } = actor {
Ok(id)
@@ -1489,7 +1383,7 @@ pub async fn check_signature_for_actor(
headers: &hyper::header::HeaderMap,
actor_ap_id: &url::Url,
db: &tokio_postgres::Client,
- http_client: &crate::HttpClient,
+ ctx: &Arc<crate::BaseContext>,
) -> Result<bool, crate::Error> {
let found_key = db.query_opt("(SELECT public_key, public_key_sigalg FROM person WHERE ap_id=$1) UNION ALL (SELECT public_key, public_key_sigalg FROM community WHERE ap_id=$1) LIMIT 1", &[&actor_ap_id.as_str()]).await?
.and_then(|row| {
@@ -1521,7 +1415,7 @@ pub async fn check_signature_for_actor(
// Either no key found or failed to verify
// Try fetching the actor/key
- let actor = fetch_actor(actor_ap_id, db, http_client).await?;
+ let actor = fetch_actor(actor_ap_id, ctx.clone()).await?;
if let Some(key_info) = actor.public_key() {
let key = openssl::pkey::PKey::public_key_from_pem(&key_info.key)?;
@@ -1544,8 +1438,7 @@ pub async fn check_signature_for_actor(
pub async fn verify_incoming_object(
mut req: hyper::Request<hyper::Body>,
db: &tokio_postgres::Client,
- http_client: &crate::HttpClient,
- apub_proxy_rewrites: bool,
+ ctx: &Arc<crate::BaseContext>,
) -> Result<Verified<KnownObject>, crate::Error> {
let req_body = hyper::body::to_bytes(req.body_mut()).await?;
@@ -1556,7 +1449,7 @@ pub async fn verify_incoming_object(
crate::Error::InternalStrStatic("Missing id in received activity")
})?;
- let res_body = fetch_ap_object(&ap_id, http_client).await?;
+ let res_body = fetch_ap_object(&ap_id, &ctx.http_client).await?;
Ok(res_body)
}
@@ -1582,7 +1475,7 @@ pub async fn verify_incoming_object(
.as_str();
// path ends up wrong with our recommended proxy config
- let path_and_query = if apub_proxy_rewrites {
+ let path_and_query = if ctx.apub_proxy_rewrites {
req.headers()
.get("x-forwarded-path")
.map(|x| x.to_str())
@@ -1599,7 +1492,7 @@ pub async fn verify_incoming_object(
&req.headers(),
&actor_ap_id,
db,
- http_client,
+ &ctx,
)
.await?
{
diff --git a/src/main.rs b/src/main.rs
index beb9dd3..91f4859 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -142,6 +142,8 @@ pub struct BaseContext {
pub api_ratelimit: henry::RatelimitBucket<std::net::IpAddr>,
pub local_hostname: String,
+
+ worker_trigger: tokio::sync::mpsc::Sender<()>,
}
impl BaseContext {
@@ -190,14 +192,7 @@ impl BaseContext {
href.into()
}
}
-}
-pub struct RouteContext {
- base: Arc<BaseContext>,
- worker_trigger: tokio::sync::mpsc::Sender<()>,
-}
-
-impl RouteContext {
pub async fn enqueue_task<T: crate::tasks::TaskDef>(
&self,
task: &T,
@@ -217,13 +212,7 @@ impl RouteContext {
}
}
-impl std::ops::Deref for RouteContext {
- type Target = BaseContext;
-
- fn deref(&self) -> &BaseContext {
- &self.base
- }
-}
+pub type RouteContext = BaseContext;
pub type RouteNode<P> = trout::Node<
P,
@@ -991,8 +980,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
panic!("SMTP_URL was provided, but SMTP_FROM was not");
}
+ let (worker_trigger, worker_rx) = tokio::sync::mpsc::channel(1);
+
let routes = Arc::new(routes::route_root());
- let base_context = Arc::new(BaseContext {
+ let context = Arc::new(BaseContext {
local_hostname: get_url_host(&host_url_apub)
.expect("Couldn't find host in HOST_URL_ACTIVITYPUB"),
@@ -1005,15 +996,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
http_client: hyper::Client::builder().build(hyper_tls::HttpsConnector::new()),
apub_proxy_rewrites,
api_ratelimit: henry::RatelimitBucket::new(300),
- });
-
- let worker_trigger = worker::start_worker(base_context.clone());
- let context = Arc::new(RouteContext {
- base: base_context,
worker_trigger,
});
+ worker::start_worker(context.clone(), worker_rx);
+
let server = hyper::Server::bind(&(std::net::Ipv6Addr::UNSPECIFIED, port).into()).serve(
hyper::service::make_service_fn(|sock: &hyper::server::conn::AddrStream| {
let addr_direct = sock.remote_addr().ip();
diff --git a/src/routes/api/mod.rs b/src/routes/api/mod.rs
index 05a338b..696b96f 100644
--- a/src/routes/api/mod.rs
+++ b/src/routes/api/mod.rs
@@ -258,8 +258,6 @@ async fn route_unstable_actors_lookup(
let (query,) = params;
println!("lookup {}", query);
- let db = ctx.db_pool.get().await?;
-
let lookup = parse_lookup(&query)?;
let uri = match lookup {
@@ -314,7 +312,7 @@ async fn route_unstable_actors_lookup(
}
};
- let actor = crate::apub_util::fetch_actor(&uri, &db, &ctx.http_client).await?;
+ let actor = crate::apub_util::fetch_actor(&uri, ctx).await?;
let info = match actor {
crate::apub_util::ActorLocalInfo::Community { id, .. } => {
diff --git a/src/routes/apub/mod.rs b/src/routes/apub/mod.rs
index c0ff8df..e5178ce 100644
--- a/src/routes/apub/mod.rs
+++ b/src/routes/apub/mod.rs
@@ -229,13 +229,7 @@ async fn inbox_common(
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
let db = ctx.db_pool.get().await?;
- let object = crate::apub_util::verify_incoming_object(
- req,
- &db,
- &ctx.http_client,
- ctx.apub_proxy_rewrites,
- )
- .await?;
+ let object = crate::apub_util::verify_incoming_object(req, &db, &ctx).await?;
crate::apub_util::ingest::ingest_object(
object,
diff --git a/src/tasks.rs b/src/tasks.rs
index e336505..475fa1c 100644
--- a/src/tasks.rs
+++ b/src/tasks.rs
@@ -1,12 +1,13 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
+use std::sync::Arc;
#[async_trait]
pub trait TaskDef: Serialize + std::fmt::Debug + Sync {
const KIND: &'static str;
const MAX_ATTEMPTS: i16 = 8;
- async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error>;
+ async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error>;
}
#[derive(Deserialize, Serialize, Debug)]
@@ -20,7 +21,7 @@ pub struct DeliverToInbox<'a> {
impl<'a> TaskDef for DeliverToInbox<'a> {
const KIND: &'static str = "deliver_to_inbox";
- async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
+ async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
let db = ctx.db_pool.get().await?;
let signing_info: Option<(_, _)> = match self.sign_as {
@@ -75,7 +76,7 @@ pub struct DeliverToFollowers {
impl TaskDef for DeliverToFollowers {
const KIND: &'static str = "deliver_to_followers";
- async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
+ async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
let community_id = match self.actor {
crate::ActorLocalRef::Community(id) => id,
crate::ActorLocalRef::Person(_) => return Ok(()), // We don't have user followers at this point
@@ -101,10 +102,8 @@ pub struct FetchActor<'a> {
impl<'a> TaskDef for FetchActor<'a> {
const KIND: &'static str = "fetch_actor";
- async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
- let db = ctx.db_pool.get().await?;
-
- crate::apub_util::fetch_actor(&self.actor_ap_id, &db, &ctx.http_client).await?;
+ async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
+ crate::apub_util::fetch_actor(&self.actor_ap_id, ctx).await?;
Ok(())
}
diff --git a/src/worker.rs b/src/worker.rs
index 6ab7916..a5ae46e 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -1,10 +1,7 @@
use std::sync::Arc;
-pub fn start_worker(ctx: Arc<crate::BaseContext>) -> tokio::sync::mpsc::Sender<()> {
- let (tx, rx) = tokio::sync::mpsc::channel(1);
+pub fn start_worker(ctx: Arc<crate::BaseContext>, rx: tokio::sync::mpsc::Receiver<()>) {
crate::spawn_task(run_worker(ctx, rx));
-
- tx
}
async fn run_worker(
@@ -39,7 +36,7 @@ async fn run_worker(
let kind: &str = row.get(1);
let params: serde_json::Value = row.get(2);
- let result = perform_task(&ctx, kind, params).await;
+ let result = perform_task(ctx.clone(), kind, params).await;
if let Err(err) = result {
let err = format!("{:?}", err);
db.execute(
@@ -63,7 +60,7 @@ async fn run_worker(
}
async fn perform_task(
- ctx: &crate::BaseContext,
+ ctx: Arc<crate::BaseContext>,
kind: &str,
params: serde_json::Value,
) -> Result<(), crate::Error> {