diff options
author | Dessalines <tyhou13@gmx.com> | 2020-01-12 10:31:51 -0500 |
---|---|---|
committer | Dessalines <tyhou13@gmx.com> | 2020-01-12 10:31:51 -0500 |
commit | dff8b947bb779d27d4b5953fe1f34166ac5c4104 (patch) | |
tree | 2cad663e4ef1689ae741cbce995be42095a8c500 /server/src/routes | |
parent | 677d716ae6b21ecb0569d661c1d76520dade1643 (diff) |
Trying to add r2d2 connection pooling to websockets.
Diffstat (limited to 'server/src/routes')
-rw-r--r-- | server/src/routes/feeds.rs | 122 | ||||
-rw-r--r-- | server/src/routes/nodeinfo.rs | 64 | ||||
-rw-r--r-- | server/src/routes/webfinger.rs | 93 | ||||
-rw-r--r-- | server/src/routes/websocket.rs | 20 |
4 files changed, 166 insertions, 133 deletions
diff --git a/server/src/routes/feeds.rs b/server/src/routes/feeds.rs index ae1631e2..ad0f28d5 100644 --- a/server/src/routes/feeds.rs +++ b/server/src/routes/feeds.rs @@ -7,11 +7,12 @@ use crate::db::post_view::{PostQueryBuilder, PostView}; use crate::db::site_view::SiteView; use crate::db::user::{Claims, User_}; use crate::db::user_mention_view::{UserMentionQueryBuilder, UserMentionView}; -use crate::db::{establish_connection, ListingType, SortType}; +use crate::db::{ListingType, SortType}; use crate::Settings; -use actix_web::body::Body; use actix_web::{web, HttpResponse, Result}; use chrono::{DateTime, Utc}; +use diesel::r2d2::{ConnectionManager, Pool}; +use diesel::PgConnection; use failure::Error; use rss::{CategoryBuilder, ChannelBuilder, GuidBuilder, Item, ItemBuilder}; use serde::Deserialize; @@ -37,54 +38,61 @@ pub fn config(cfg: &mut web::ServiceConfig) { .route("/feeds/all.xml", web::get().to(feeds::get_all_feed)); } -async fn get_all_feed(info: web::Query<Params>) -> HttpResponse<Body> { - let sort_type = match get_sort_type(info) { - Ok(sort_type) => sort_type, - Err(_) => return HttpResponse::BadRequest().finish(), - }; - - let feed_result = get_feed_all_data(&sort_type); - - match feed_result { - Ok(rss) => HttpResponse::Ok() +async fn get_all_feed( + info: web::Query<Params>, + db: web::Data<Pool<ConnectionManager<PgConnection>>>, +) -> Result<HttpResponse, actix_web::Error> { + let res = web::block(move || { + let conn = db.get()?; + + let sort_type = get_sort_type(info)?; + get_feed_all_data(&conn, &sort_type) + }) + .await + .map(|rss| { + HttpResponse::Ok() .content_type("application/rss+xml") - .body(rss), - Err(_) => HttpResponse::NotFound().finish(), - } + .body(rss) + }) + .map_err(|_| HttpResponse::InternalServerError())?; + Ok(res) } async fn get_feed( path: web::Path<(String, String)>, info: web::Query<Params>, -) -> HttpResponse<Body> { - let sort_type = match get_sort_type(info) { - Ok(sort_type) => sort_type, - Err(_) => return HttpResponse::BadRequest().finish(), - }; - - let request_type = match path.0.as_ref() { - "u" => RequestType::User, - "c" => RequestType::Community, - "front" => RequestType::Front, - "inbox" => RequestType::Inbox, - _ => return HttpResponse::NotFound().finish(), - }; - - let param = path.1.to_owned(); - - let feed_result = match request_type { - RequestType::User => get_feed_user(&sort_type, param), - RequestType::Community => get_feed_community(&sort_type, param), - RequestType::Front => get_feed_front(&sort_type, param), - RequestType::Inbox => get_feed_inbox(param), - }; - - match feed_result { - Ok(rss) => HttpResponse::Ok() + db: web::Data<Pool<ConnectionManager<PgConnection>>>, +) -> Result<HttpResponse, actix_web::Error> { + let res = web::block(move || { + let conn = db.get()?; + + let sort_type = get_sort_type(info)?; + + let request_type = match path.0.as_ref() { + "u" => RequestType::User, + "c" => RequestType::Community, + "front" => RequestType::Front, + "inbox" => RequestType::Inbox, + _ => return Err(format_err!("wrong_type")), + }; + + let param = path.1.to_owned(); + + match request_type { + RequestType::User => get_feed_user(&conn, &sort_type, param), + RequestType::Community => get_feed_community(&conn, &sort_type, param), + RequestType::Front => get_feed_front(&conn, &sort_type, param), + RequestType::Inbox => get_feed_inbox(&conn, param), + } + }) + .await + .map(|rss| { + HttpResponse::Ok() .content_type("application/rss+xml") - .body(rss), - Err(_) => HttpResponse::NotFound().finish(), - } + .body(rss) + }) + .map_err(|_| HttpResponse::InternalServerError())?; + Ok(res) } fn get_sort_type(info: web::Query<Params>) -> Result<SortType, ParseError> { @@ -95,9 +103,7 @@ fn get_sort_type(info: web::Query<Params>) -> Result<SortType, ParseError> { SortType::from_str(&sort_query) } -fn get_feed_all_data(sort_type: &SortType) -> Result<String, Error> { - let conn = establish_connection(); - +fn get_feed_all_data(conn: &PgConnection, sort_type: &SortType) -> Result<String, failure::Error> { let site_view = SiteView::read(&conn)?; let posts = PostQueryBuilder::create(&conn) @@ -120,9 +126,11 @@ fn get_feed_all_data(sort_type: &SortType) -> Result<String, Error> { Ok(channel_builder.build().unwrap().to_string()) } -fn get_feed_user(sort_type: &SortType, user_name: String) -> Result<String, Error> { - let conn = establish_connection(); - +fn get_feed_user( + conn: &PgConnection, + sort_type: &SortType, + user_name: String, +) -> Result<String, Error> { let site_view = SiteView::read(&conn)?; let user = User_::find_by_username(&conn, &user_name)?; let user_url = user.get_profile_url(); @@ -144,9 +152,11 @@ fn get_feed_user(sort_type: &SortType, user_name: String) -> Result<String, Erro Ok(channel_builder.build().unwrap().to_string()) } -fn get_feed_community(sort_type: &SortType, community_name: String) -> Result<String, Error> { - let conn = establish_connection(); - +fn get_feed_community( + conn: &PgConnection, + sort_type: &SortType, + community_name: String, +) -> Result<String, Error> { let site_view = SiteView::read(&conn)?; let community = Community::read_from_name(&conn, community_name)?; let community_url = community.get_url(); @@ -172,9 +182,7 @@ fn get_feed_community(sort_type: &SortType, community_name: String) -> Result<St Ok(channel_builder.build().unwrap().to_string()) } -fn get_feed_front(sort_type: &SortType, jwt: String) -> Result<String, Error> { - let conn = establish_connection(); - +fn get_feed_front(conn: &PgConnection, sort_type: &SortType, jwt: String) -> Result<String, Error> { let site_view = SiteView::read(&conn)?; let user_id = Claims::decode(&jwt)?.claims.id; @@ -199,9 +207,7 @@ fn get_feed_front(sort_type: &SortType, jwt: String) -> Result<String, Error> { Ok(channel_builder.build().unwrap().to_string()) } -fn get_feed_inbox(jwt: String) -> Result<String, Error> { - let conn = establish_connection(); - +fn get_feed_inbox(conn: &PgConnection, jwt: String) -> Result<String, Error> { let site_view = SiteView::read(&conn)?; let user_id = Claims::decode(&jwt)?.claims.id; diff --git a/server/src/routes/nodeinfo.rs b/server/src/routes/nodeinfo.rs index 2b7135fb..6ab540f9 100644 --- a/server/src/routes/nodeinfo.rs +++ b/server/src/routes/nodeinfo.rs @@ -1,10 +1,11 @@ -use crate::db::establish_connection; use crate::db::site_view::SiteView; use crate::version; use crate::Settings; use actix_web::body::Body; use actix_web::web; use actix_web::HttpResponse; +use diesel::r2d2::{ConnectionManager, Pool}; +use diesel::PgConnection; use serde_json::json; pub fn config(cfg: &mut web::ServiceConfig) { @@ -26,34 +27,39 @@ async fn node_info_well_known() -> HttpResponse<Body> { .body(json.to_string()) } -async fn node_info() -> HttpResponse<Body> { - let conn = establish_connection(); - let site_view = match SiteView::read(&conn) { - Ok(site_view) => site_view, - Err(_e) => return HttpResponse::InternalServerError().finish(), - }; - let protocols = if Settings::get().federation_enabled { - vec!["activitypub"] - } else { - vec![] - }; - let json = json!({ - "version": "2.0", - "software": { - "name": "lemmy", - "version": version::VERSION, - }, - "protocols": protocols, - "usage": { - "users": { - "total": site_view.number_of_users +async fn node_info( + db: web::Data<Pool<ConnectionManager<PgConnection>>>, +) -> Result<HttpResponse, actix_web::Error> { + let res = web::block(move || { + let conn = db.get()?; + let site_view = match SiteView::read(&conn) { + Ok(site_view) => site_view, + Err(_) => return Err(format_err!("not_found")), + }; + let protocols = if Settings::get().federation_enabled { + vec!["activitypub"] + } else { + vec![] + }; + Ok(json!({ + "version": "2.0", + "software": { + "name": "lemmy", + "version": version::VERSION, }, - "localPosts": site_view.number_of_posts, - "localComments": site_view.number_of_comments, - "openRegistrations": site_view.open_registration, + "protocols": protocols, + "usage": { + "users": { + "total": site_view.number_of_users + }, + "localPosts": site_view.number_of_posts, + "localComments": site_view.number_of_comments, + "openRegistrations": site_view.open_registration, } - }); - HttpResponse::Ok() - .content_type("application/json") - .body(json.to_string()) + })) + }) + .await + .map(|json| HttpResponse::Ok().json(json)) + .map_err(|_| HttpResponse::InternalServerError())?; + Ok(res) } diff --git a/server/src/routes/webfinger.rs b/server/src/routes/webfinger.rs index c538f5b1..20f76a9a 100644 --- a/server/src/routes/webfinger.rs +++ b/server/src/routes/webfinger.rs @@ -1,10 +1,10 @@ use crate::db::community::Community; -use crate::db::establish_connection; use crate::Settings; -use actix_web::body::Body; use actix_web::web; use actix_web::web::Query; use actix_web::HttpResponse; +use diesel::r2d2::{ConnectionManager, Pool}; +use diesel::PgConnection; use regex::Regex; use serde::Deserialize; use serde_json::json; @@ -37,54 +37,61 @@ lazy_static! { /// /// You can also view the webfinger response that Mastodon sends: /// https://radical.town/.well-known/webfinger?resource=acct:felix@radical.town -async fn get_webfinger_response(info: Query<Params>) -> HttpResponse<Body> { - let regex_parsed = WEBFINGER_COMMUNITY_REGEX - .captures(&info.resource) - .map(|c| c.get(1)); - // TODO: replace this with .flatten() once we are running rust 1.40 - let regex_parsed_flattened = match regex_parsed { - Some(s) => s, - None => None, - }; - let community_name = match regex_parsed_flattened { - Some(c) => c.as_str(), - None => return HttpResponse::NotFound().finish(), - }; +async fn get_webfinger_response( + info: Query<Params>, + db: web::Data<Pool<ConnectionManager<PgConnection>>>, +) -> Result<HttpResponse, actix_web::Error> { + let res = web::block(move || { + let conn = db.get()?; - // Make sure the requested community exists. - let conn = establish_connection(); - let community = match Community::read_from_name(&conn, community_name.to_string()) { - Ok(o) => o, - Err(_) => return HttpResponse::NotFound().finish(), - }; + let regex_parsed = WEBFINGER_COMMUNITY_REGEX + .captures(&info.resource) + .map(|c| c.get(1)); + // TODO: replace this with .flatten() once we are running rust 1.40 + let regex_parsed_flattened = match regex_parsed { + Some(s) => s, + None => None, + }; + let community_name = match regex_parsed_flattened { + Some(c) => c.as_str(), + None => return Err(format_err!("not_found")), + }; - let community_url = community.get_url(); + // Make sure the requested community exists. + let community = match Community::read_from_name(&conn, community_name.to_string()) { + Ok(o) => o, + Err(_) => return Err(format_err!("not_found")), + }; - let json = json!({ + let community_url = community.get_url(); + + Ok(json!({ "subject": info.resource, "aliases": [ community_url, ], "links": [ - { - "rel": "http://webfinger.net/rel/profile-page", - "type": "text/html", - "href": community_url - }, - { - "rel": "self", - "type": "application/activity+json", - // Yes this is correct, this link doesn't include the `.json` extension - "href": community_url - } - // TODO: this also needs to return the subscribe link once that's implemented - //{ - // "rel": "http://ostatus.org/schema/1.0/subscribe", - // "template": "https://my_instance.com/authorize_interaction?uri={uri}" - //} + { + "rel": "http://webfinger.net/rel/profile-page", + "type": "text/html", + "href": community_url + }, + { + "rel": "self", + "type": "application/activity+json", + // Yes this is correct, this link doesn't include the `.json` extension + "href": community_url + } + // TODO: this also needs to return the subscribe link once that's implemented + //{ + // "rel": "http://ostatus.org/schema/1.0/subscribe", + // "template": "https://my_instance.com/authorize_interaction?uri={uri}" + //} ] - }); - HttpResponse::Ok() - .content_type("application/activity+json") - .body(json.to_string()) + })) + }) + .await + .map(|json| HttpResponse::Ok().json(json)) + .map_err(|_| HttpResponse::InternalServerError())?; + Ok(res) } diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs index 8113a613..0d953d24 100644 --- a/server/src/routes/websocket.rs +++ b/server/src/routes/websocket.rs @@ -1,13 +1,24 @@ use crate::websocket::server::*; +use crate::Settings; use actix::prelude::*; use actix_web::web; use actix_web::*; use actix_web_actors::ws; +use diesel::r2d2::{ConnectionManager, Pool}; +use diesel::PgConnection; use std::time::{Duration, Instant}; pub fn config(cfg: &mut web::ServiceConfig) { + // TODO couldn't figure out how to get this method to recieve the other pool + let settings = Settings::get(); + let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url()); + let pool = Pool::builder() + .max_size(settings.database.pool_size) + .build(manager) + .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url())); + // Start chat server actor in separate thread - let server = ChatServer::default().start(); + let server = ChatServer::startup(pool).start(); cfg .data(server) .service(web::resource("/api/v1/ws").to(chat_route)); @@ -24,9 +35,11 @@ async fn chat_route( stream: web::Payload, chat_server: web::Data<Addr<ChatServer>>, ) -> Result<HttpResponse, Error> { + // TODO not sure if the blocking should be here or not ws::start( WSSession { - cs_addr: chat_server.get_ref().to_owned(), + // db: db.get_ref().clone(), + cs_addr: chat_server.get_ref().clone(), id: 0, hb: Instant::now(), ip: req @@ -51,6 +64,7 @@ struct WSSession { /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), /// otherwise we drop connection. hb: Instant, + // db: Pool<ConnectionManager<PgConnection>>, } impl Actor for WSSession { @@ -127,7 +141,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession { } ws::Message::Text(text) => { let m = text.trim().to_owned(); - println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); + // println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); self .cs_addr |