diff options
author | Colin Reeder <vpzomtrrfrt@gmail.com> | 2020-11-06 20:22:17 -0700 |
---|---|---|
committer | Colin Reeder <vpzomtrrfrt@gmail.com> | 2020-11-06 20:22:17 -0700 |
commit | a2c65e98e69a5f0e22dc83ff62399de6850cb94b (patch) | |
tree | d6ca0befb537ca67a375ea1a5b681102f2492383 | |
parent | 50b3d3172e0e9e8613eb2cb1061fe724810bea7b (diff) |
Use new ingestion for fetch_actor, merge BaseContext and RouteContext
-rw-r--r-- | src/apub_util/ingest.rs | 29 | ||||
-rw-r--r-- | src/apub_util/mod.rs | 133 | ||||
-rw-r--r-- | src/main.rs | 28 | ||||
-rw-r--r-- | src/routes/api/mod.rs | 4 | ||||
-rw-r--r-- | src/routes/apub/mod.rs | 8 | ||||
-rw-r--r-- | src/tasks.rs | 13 | ||||
-rw-r--r-- | src/worker.rs | 9 |
7 files changed, 40 insertions, 184 deletions
diff --git a/src/apub_util/ingest.rs b/src/apub_util/ingest.rs index 32c6d6c..f603f14 100644 --- a/src/apub_util/ingest.rs +++ b/src/apub_util/ingest.rs @@ -27,7 +27,7 @@ impl FoundFrom { pub async fn ingest_object( object: Verified<KnownObject>, found_from: FoundFrom, - ctx: Arc<crate::RouteContext>, + ctx: Arc<crate::BaseContext>, ) -> Result<Option<super::ActorLocalInfo>, crate::Error> { let db = ctx.db_pool.get().await?; match object.into_inner() { @@ -157,13 +157,8 @@ pub async fn ingest_object( crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?; let follow = crate::apub_util::Contained(Cow::Borrowed(&follow)); - let follower_local_id = crate::apub_util::get_or_fetch_user_local_id( - follower_ap_id, - &db, - &ctx.host_url_apub, - &ctx.http_client, - ) - .await?; + let follower_local_id = + crate::apub_util::get_or_fetch_user_local_id(follower_ap_id, &db, &ctx).await?; if let Some(target) = target { if let Some(community_id) = @@ -363,7 +358,7 @@ pub async fn ingest_object( pub fn ingest_object_boxed( object: Verified<KnownObject>, found_from: FoundFrom, - ctx: Arc<crate::RouteContext>, + ctx: Arc<crate::BaseContext>, ) -> std::pin::Pin< Box<dyn Future<Output = Result<Option<super::ActorLocalInfo>, crate::Error>> + Send>, > { @@ -383,9 +378,7 @@ pub async fn ingest_like( if let Some(actor_id) = activity.actor_unchecked().as_single_id() { super::require_containment(activity_id, actor_id)?; - let actor_local_id = - super::get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client) - .await?; + let actor_local_id = super::get_or_fetch_user_local_id(actor_id, &db, &ctx).await?; if let Some(object_id) = activity.object().as_single_id() { let thing_local_ref = if let Some(remaining) = @@ -566,7 +559,7 @@ pub async fn ingest_undo( pub async fn ingest_create( activity: Verified<activitystreams::activity::Create>, - ctx: Arc<crate::RouteContext>, + ctx: Arc<crate::BaseContext>, ) -> Result<(), crate::Error> { for req_obj in activity.object().iter() { let object_id = req_obj.id(); @@ -819,10 +812,7 @@ async fn handle_recieved_reply( let db = ctx.db_pool.get().await?; let author = match author { - Some(author) => Some( - super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client) - .await?, - ), + Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?), None => None, }; @@ -999,10 +989,7 @@ async fn handle_recieved_post( ) -> Result<(), crate::Error> { let db = ctx.db_pool.get().await?; let author = match author { - Some(author) => Some( - super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client) - .await?, - ), + Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?), None => None, }; diff --git a/src/apub_util/mod.rs b/src/apub_util/mod.rs index 8925aa4..e49c2e8 100644 --- a/src/apub_util/mod.rs +++ b/src/apub_util/mod.rs @@ -361,116 +361,11 @@ pub async fn fetch_ap_object( pub async fn fetch_actor( req_ap_id: &url::Url, - db: &tokio_postgres::Client, - http_client: &crate::HttpClient, + ctx: Arc<crate::BaseContext>, ) -> Result<ActorLocalInfo, crate::Error> { - let obj = fetch_ap_object(req_ap_id, http_client).await?; - let ap_id = req_ap_id; - - match obj.deref() { - KnownObject::Person(person) => { - let username = person - .preferred_username() - .or_else(|| { - person - .name() - .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next()) - }) - .unwrap_or(""); - let inbox = person.inbox_unchecked().as_str(); - let shared_inbox = person - .endpoints_unchecked() - .and_then(|endpoints| endpoints.shared_inbox) - .map(|url| url.as_str()); - let public_key = person - .ext_one - .public_key - .as_ref() - .map(|key| key.public_key_pem.as_bytes()); - let public_key_sigalg = person - .ext_one - .public_key - .as_ref() - .and_then(|key| key.signature_algorithm.as_deref()); - let description = person - .summary() - .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next()) - .unwrap_or(""); - - let avatar = person.icon().and_then(|icon| { - icon.iter() - .filter_map(|icon| { - if icon.kind_str() == Some("Image") { - match activitystreams::object::Image::from_any_base(icon.clone()) { - Err(_) | Ok(None) => None, - Ok(Some(icon)) => Some(icon), - } - } else { - None - } - }) - .next() - }); - let avatar = avatar - .as_ref() - .and_then(|icon| icon.url().and_then(|url| url.as_single_id())) - .map(|x| x.as_str()); - - let id = UserLocalID(db.query_one( - "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id", - &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar], - ).await?.get(0)); - - Ok(ActorLocalInfo::User { - id, - public_key: public_key.map(|key| PubKeyInfo { - algorithm: get_message_digest(public_key_sigalg), - key: key.to_owned(), - }), - }) - } - KnownObject::Group(group) => { - let name = group - .preferred_username() - .or_else(|| { - group - .name() - .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next()) - }) - .unwrap_or(""); - let description = group - .summary() - .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next()) - .unwrap_or(""); - let inbox = group.inbox_unchecked().as_str(); - let shared_inbox = group - .endpoints_unchecked() - .and_then(|endpoints| endpoints.shared_inbox) - .map(|url| url.as_str()); - let public_key = group - .ext_one - .public_key - .as_ref() - .map(|key| key.public_key_pem.as_bytes()); - let public_key_sigalg = group - .ext_one - .public_key - .as_ref() - .and_then(|key| key.signature_algorithm.as_deref()); - - let id = CommunityLocalID(db.query_one( - "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id", - &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description], - ).await?.get(0)); - - Ok(ActorLocalInfo::Community { - id, - public_key: public_key.map(|key| PubKeyInfo { - algorithm: get_message_digest(public_key_sigalg), - key: key.to_owned(), - }), - }) - } + let obj = fetch_ap_object(req_ap_id, &ctx.http_client).await?; + match ingest::ingest_object_boxed(obj, ingest::FoundFrom::Other, ctx).await? { + Some(info) => Ok(info), _ => Err(crate::Error::InternalStrStatic("Unrecognized actor type")), } } @@ -478,10 +373,9 @@ pub async fn fetch_actor( pub async fn get_or_fetch_user_local_id( ap_id: &url::Url, db: &tokio_postgres::Client, - host_url_apub: &BaseURL, - http_client: &crate::HttpClient, + ctx: &Arc<crate::BaseContext>, ) -> Result<UserLocalID, crate::Error> { - if let Some(remaining) = try_strip_host(ap_id, host_url_apub) { + if let Some(remaining) = try_strip_host(ap_id, &ctx.host_url_apub) { if remaining.starts_with("/users/") { Ok(remaining[7..].parse()?) } else { @@ -499,7 +393,7 @@ pub async fn get_or_fetch_user_local_id( None => { // Not known yet, time to fetch - let actor = fetch_actor(ap_id, db, http_client).await?; + let actor = fetch_actor(ap_id, ctx.clone()).await?; if let ActorLocalInfo::User { id, .. } = actor { Ok(id) @@ -1489,7 +1383,7 @@ pub async fn check_signature_for_actor( headers: &hyper::header::HeaderMap, actor_ap_id: &url::Url, db: &tokio_postgres::Client, - http_client: &crate::HttpClient, + ctx: &Arc<crate::BaseContext>, ) -> Result<bool, crate::Error> { let found_key = db.query_opt("(SELECT public_key, public_key_sigalg FROM person WHERE ap_id=$1) UNION ALL (SELECT public_key, public_key_sigalg FROM community WHERE ap_id=$1) LIMIT 1", &[&actor_ap_id.as_str()]).await? .and_then(|row| { @@ -1521,7 +1415,7 @@ pub async fn check_signature_for_actor( // Either no key found or failed to verify // Try fetching the actor/key - let actor = fetch_actor(actor_ap_id, db, http_client).await?; + let actor = fetch_actor(actor_ap_id, ctx.clone()).await?; if let Some(key_info) = actor.public_key() { let key = openssl::pkey::PKey::public_key_from_pem(&key_info.key)?; @@ -1544,8 +1438,7 @@ pub async fn check_signature_for_actor( pub async fn verify_incoming_object( mut req: hyper::Request<hyper::Body>, db: &tokio_postgres::Client, - http_client: &crate::HttpClient, - apub_proxy_rewrites: bool, + ctx: &Arc<crate::BaseContext>, ) -> Result<Verified<KnownObject>, crate::Error> { let req_body = hyper::body::to_bytes(req.body_mut()).await?; @@ -1556,7 +1449,7 @@ pub async fn verify_incoming_object( crate::Error::InternalStrStatic("Missing id in received activity") })?; - let res_body = fetch_ap_object(&ap_id, http_client).await?; + let res_body = fetch_ap_object(&ap_id, &ctx.http_client).await?; Ok(res_body) } @@ -1582,7 +1475,7 @@ pub async fn verify_incoming_object( .as_str(); // path ends up wrong with our recommended proxy config - let path_and_query = if apub_proxy_rewrites { + let path_and_query = if ctx.apub_proxy_rewrites { req.headers() .get("x-forwarded-path") .map(|x| x.to_str()) @@ -1599,7 +1492,7 @@ pub async fn verify_incoming_object( &req.headers(), &actor_ap_id, db, - http_client, + &ctx, ) .await? { diff --git a/src/main.rs b/src/main.rs index beb9dd3..91f4859 100644 --- a/src/main.rs +++ b/src/main.rs @@ -142,6 +142,8 @@ pub struct BaseContext { pub api_ratelimit: henry::RatelimitBucket<std::net::IpAddr>, pub local_hostname: String, + + worker_trigger: tokio::sync::mpsc::Sender<()>, } impl BaseContext { @@ -190,14 +192,7 @@ impl BaseContext { href.into() } } -} -pub struct RouteContext { - base: Arc<BaseContext>, - worker_trigger: tokio::sync::mpsc::Sender<()>, -} - -impl RouteContext { pub async fn enqueue_task<T: crate::tasks::TaskDef>( &self, task: &T, @@ -217,13 +212,7 @@ impl RouteContext { } } -impl std::ops::Deref for RouteContext { - type Target = BaseContext; - - fn deref(&self) -> &BaseContext { - &self.base - } -} +pub type RouteContext = BaseContext; pub type RouteNode<P> = trout::Node< P, @@ -991,8 +980,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { panic!("SMTP_URL was provided, but SMTP_FROM was not"); } + let (worker_trigger, worker_rx) = tokio::sync::mpsc::channel(1); + let routes = Arc::new(routes::route_root()); - let base_context = Arc::new(BaseContext { + let context = Arc::new(BaseContext { local_hostname: get_url_host(&host_url_apub) .expect("Couldn't find host in HOST_URL_ACTIVITYPUB"), @@ -1005,15 +996,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { http_client: hyper::Client::builder().build(hyper_tls::HttpsConnector::new()), apub_proxy_rewrites, api_ratelimit: henry::RatelimitBucket::new(300), - }); - - let worker_trigger = worker::start_worker(base_context.clone()); - let context = Arc::new(RouteContext { - base: base_context, worker_trigger, }); + worker::start_worker(context.clone(), worker_rx); + let server = hyper::Server::bind(&(std::net::Ipv6Addr::UNSPECIFIED, port).into()).serve( hyper::service::make_service_fn(|sock: &hyper::server::conn::AddrStream| { let addr_direct = sock.remote_addr().ip(); diff --git a/src/routes/api/mod.rs b/src/routes/api/mod.rs index 05a338b..696b96f 100644 --- a/src/routes/api/mod.rs +++ b/src/routes/api/mod.rs @@ -258,8 +258,6 @@ async fn route_unstable_actors_lookup( let (query,) = params; println!("lookup {}", query); - let db = ctx.db_pool.get().await?; - let lookup = parse_lookup(&query)?; let uri = match lookup { @@ -314,7 +312,7 @@ async fn route_unstable_actors_lookup( } }; - let actor = crate::apub_util::fetch_actor(&uri, &db, &ctx.http_client).await?; + let actor = crate::apub_util::fetch_actor(&uri, ctx).await?; let info = match actor { crate::apub_util::ActorLocalInfo::Community { id, .. } => { diff --git a/src/routes/apub/mod.rs b/src/routes/apub/mod.rs index c0ff8df..e5178ce 100644 --- a/src/routes/apub/mod.rs +++ b/src/routes/apub/mod.rs @@ -229,13 +229,7 @@ async fn inbox_common( ) -> Result<hyper::Response<hyper::Body>, crate::Error> { let db = ctx.db_pool.get().await?; - let object = crate::apub_util::verify_incoming_object( - req, - &db, - &ctx.http_client, - ctx.apub_proxy_rewrites, - ) - .await?; + let object = crate::apub_util::verify_incoming_object(req, &db, &ctx).await?; crate::apub_util::ingest::ingest_object( object, diff --git a/src/tasks.rs b/src/tasks.rs index e336505..475fa1c 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,12 +1,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::sync::Arc; #[async_trait] pub trait TaskDef: Serialize + std::fmt::Debug + Sync { const KIND: &'static str; const MAX_ATTEMPTS: i16 = 8; - async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error>; + async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error>; } #[derive(Deserialize, Serialize, Debug)] @@ -20,7 +21,7 @@ pub struct DeliverToInbox<'a> { impl<'a> TaskDef for DeliverToInbox<'a> { const KIND: &'static str = "deliver_to_inbox"; - async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> { + async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> { let db = ctx.db_pool.get().await?; let signing_info: Option<(_, _)> = match self.sign_as { @@ -75,7 +76,7 @@ pub struct DeliverToFollowers { impl TaskDef for DeliverToFollowers { const KIND: &'static str = "deliver_to_followers"; - async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> { + async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> { let community_id = match self.actor { crate::ActorLocalRef::Community(id) => id, crate::ActorLocalRef::Person(_) => return Ok(()), // We don't have user followers at this point @@ -101,10 +102,8 @@ pub struct FetchActor<'a> { impl<'a> TaskDef for FetchActor<'a> { const KIND: &'static str = "fetch_actor"; - async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> { - let db = ctx.db_pool.get().await?; - - crate::apub_util::fetch_actor(&self.actor_ap_id, &db, &ctx.http_client).await?; + async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> { + crate::apub_util::fetch_actor(&self.actor_ap_id, ctx).await?; Ok(()) } diff --git a/src/worker.rs b/src/worker.rs index 6ab7916..a5ae46e 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,10 +1,7 @@ use std::sync::Arc; -pub fn start_worker(ctx: Arc<crate::BaseContext>) -> tokio::sync::mpsc::Sender<()> { - let (tx, rx) = tokio::sync::mpsc::channel(1); +pub fn start_worker(ctx: Arc<crate::BaseContext>, rx: tokio::sync::mpsc::Receiver<()>) { crate::spawn_task(run_worker(ctx, rx)); - - tx } async fn run_worker( @@ -39,7 +36,7 @@ async fn run_worker( let kind: &str = row.get(1); let params: serde_json::Value = row.get(2); - let result = perform_task(&ctx, kind, params).await; + let result = perform_task(ctx.clone(), kind, params).await; if let Err(err) = result { let err = format!("{:?}", err); db.execute( @@ -63,7 +60,7 @@ async fn run_worker( } async fn perform_task( - ctx: &crate::BaseContext, + ctx: Arc<crate::BaseContext>, kind: &str, params: serde_json::Value, ) -> Result<(), crate::Error> { |