summaryrefslogtreecommitdiffstats
path: root/server/src/routes
diff options
context:
space:
mode:
authorDessalines <tyhou13@gmx.com>2020-01-12 10:31:51 -0500
committerDessalines <tyhou13@gmx.com>2020-01-12 10:31:51 -0500
commitdff8b947bb779d27d4b5953fe1f34166ac5c4104 (patch)
tree2cad663e4ef1689ae741cbce995be42095a8c500 /server/src/routes
parent677d716ae6b21ecb0569d661c1d76520dade1643 (diff)
Trying to add r2d2 connection pooling to websockets.
Diffstat (limited to 'server/src/routes')
-rw-r--r--server/src/routes/feeds.rs122
-rw-r--r--server/src/routes/nodeinfo.rs64
-rw-r--r--server/src/routes/webfinger.rs93
-rw-r--r--server/src/routes/websocket.rs20
4 files changed, 166 insertions, 133 deletions
diff --git a/server/src/routes/feeds.rs b/server/src/routes/feeds.rs
index ae1631e2..ad0f28d5 100644
--- a/server/src/routes/feeds.rs
+++ b/server/src/routes/feeds.rs
@@ -7,11 +7,12 @@ use crate::db::post_view::{PostQueryBuilder, PostView};
use crate::db::site_view::SiteView;
use crate::db::user::{Claims, User_};
use crate::db::user_mention_view::{UserMentionQueryBuilder, UserMentionView};
-use crate::db::{establish_connection, ListingType, SortType};
+use crate::db::{ListingType, SortType};
use crate::Settings;
-use actix_web::body::Body;
use actix_web::{web, HttpResponse, Result};
use chrono::{DateTime, Utc};
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
use failure::Error;
use rss::{CategoryBuilder, ChannelBuilder, GuidBuilder, Item, ItemBuilder};
use serde::Deserialize;
@@ -37,54 +38,61 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.route("/feeds/all.xml", web::get().to(feeds::get_all_feed));
}
-async fn get_all_feed(info: web::Query<Params>) -> HttpResponse<Body> {
- let sort_type = match get_sort_type(info) {
- Ok(sort_type) => sort_type,
- Err(_) => return HttpResponse::BadRequest().finish(),
- };
-
- let feed_result = get_feed_all_data(&sort_type);
-
- match feed_result {
- Ok(rss) => HttpResponse::Ok()
+async fn get_all_feed(
+ info: web::Query<Params>,
+ db: web::Data<Pool<ConnectionManager<PgConnection>>>,
+) -> Result<HttpResponse, actix_web::Error> {
+ let res = web::block(move || {
+ let conn = db.get()?;
+
+ let sort_type = get_sort_type(info)?;
+ get_feed_all_data(&conn, &sort_type)
+ })
+ .await
+ .map(|rss| {
+ HttpResponse::Ok()
.content_type("application/rss+xml")
- .body(rss),
- Err(_) => HttpResponse::NotFound().finish(),
- }
+ .body(rss)
+ })
+ .map_err(|_| HttpResponse::InternalServerError())?;
+ Ok(res)
}
async fn get_feed(
path: web::Path<(String, String)>,
info: web::Query<Params>,
-) -> HttpResponse<Body> {
- let sort_type = match get_sort_type(info) {
- Ok(sort_type) => sort_type,
- Err(_) => return HttpResponse::BadRequest().finish(),
- };
-
- let request_type = match path.0.as_ref() {
- "u" => RequestType::User,
- "c" => RequestType::Community,
- "front" => RequestType::Front,
- "inbox" => RequestType::Inbox,
- _ => return HttpResponse::NotFound().finish(),
- };
-
- let param = path.1.to_owned();
-
- let feed_result = match request_type {
- RequestType::User => get_feed_user(&sort_type, param),
- RequestType::Community => get_feed_community(&sort_type, param),
- RequestType::Front => get_feed_front(&sort_type, param),
- RequestType::Inbox => get_feed_inbox(param),
- };
-
- match feed_result {
- Ok(rss) => HttpResponse::Ok()
+ db: web::Data<Pool<ConnectionManager<PgConnection>>>,
+) -> Result<HttpResponse, actix_web::Error> {
+ let res = web::block(move || {
+ let conn = db.get()?;
+
+ let sort_type = get_sort_type(info)?;
+
+ let request_type = match path.0.as_ref() {
+ "u" => RequestType::User,
+ "c" => RequestType::Community,
+ "front" => RequestType::Front,
+ "inbox" => RequestType::Inbox,
+ _ => return Err(format_err!("wrong_type")),
+ };
+
+ let param = path.1.to_owned();
+
+ match request_type {
+ RequestType::User => get_feed_user(&conn, &sort_type, param),
+ RequestType::Community => get_feed_community(&conn, &sort_type, param),
+ RequestType::Front => get_feed_front(&conn, &sort_type, param),
+ RequestType::Inbox => get_feed_inbox(&conn, param),
+ }
+ })
+ .await
+ .map(|rss| {
+ HttpResponse::Ok()
.content_type("application/rss+xml")
- .body(rss),
- Err(_) => HttpResponse::NotFound().finish(),
- }
+ .body(rss)
+ })
+ .map_err(|_| HttpResponse::InternalServerError())?;
+ Ok(res)
}
fn get_sort_type(info: web::Query<Params>) -> Result<SortType, ParseError> {
@@ -95,9 +103,7 @@ fn get_sort_type(info: web::Query<Params>) -> Result<SortType, ParseError> {
SortType::from_str(&sort_query)
}
-fn get_feed_all_data(sort_type: &SortType) -> Result<String, Error> {
- let conn = establish_connection();
-
+fn get_feed_all_data(conn: &PgConnection, sort_type: &SortType) -> Result<String, failure::Error> {
let site_view = SiteView::read(&conn)?;
let posts = PostQueryBuilder::create(&conn)
@@ -120,9 +126,11 @@ fn get_feed_all_data(sort_type: &SortType) -> Result<String, Error> {
Ok(channel_builder.build().unwrap().to_string())
}
-fn get_feed_user(sort_type: &SortType, user_name: String) -> Result<String, Error> {
- let conn = establish_connection();
-
+fn get_feed_user(
+ conn: &PgConnection,
+ sort_type: &SortType,
+ user_name: String,
+) -> Result<String, Error> {
let site_view = SiteView::read(&conn)?;
let user = User_::find_by_username(&conn, &user_name)?;
let user_url = user.get_profile_url();
@@ -144,9 +152,11 @@ fn get_feed_user(sort_type: &SortType, user_name: String) -> Result<String, Erro
Ok(channel_builder.build().unwrap().to_string())
}
-fn get_feed_community(sort_type: &SortType, community_name: String) -> Result<String, Error> {
- let conn = establish_connection();
-
+fn get_feed_community(
+ conn: &PgConnection,
+ sort_type: &SortType,
+ community_name: String,
+) -> Result<String, Error> {
let site_view = SiteView::read(&conn)?;
let community = Community::read_from_name(&conn, community_name)?;
let community_url = community.get_url();
@@ -172,9 +182,7 @@ fn get_feed_community(sort_type: &SortType, community_name: String) -> Result<St
Ok(channel_builder.build().unwrap().to_string())
}
-fn get_feed_front(sort_type: &SortType, jwt: String) -> Result<String, Error> {
- let conn = establish_connection();
-
+fn get_feed_front(conn: &PgConnection, sort_type: &SortType, jwt: String) -> Result<String, Error> {
let site_view = SiteView::read(&conn)?;
let user_id = Claims::decode(&jwt)?.claims.id;
@@ -199,9 +207,7 @@ fn get_feed_front(sort_type: &SortType, jwt: String) -> Result<String, Error> {
Ok(channel_builder.build().unwrap().to_string())
}
-fn get_feed_inbox(jwt: String) -> Result<String, Error> {
- let conn = establish_connection();
-
+fn get_feed_inbox(conn: &PgConnection, jwt: String) -> Result<String, Error> {
let site_view = SiteView::read(&conn)?;
let user_id = Claims::decode(&jwt)?.claims.id;
diff --git a/server/src/routes/nodeinfo.rs b/server/src/routes/nodeinfo.rs
index 2b7135fb..6ab540f9 100644
--- a/server/src/routes/nodeinfo.rs
+++ b/server/src/routes/nodeinfo.rs
@@ -1,10 +1,11 @@
-use crate::db::establish_connection;
use crate::db::site_view::SiteView;
use crate::version;
use crate::Settings;
use actix_web::body::Body;
use actix_web::web;
use actix_web::HttpResponse;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
use serde_json::json;
pub fn config(cfg: &mut web::ServiceConfig) {
@@ -26,34 +27,39 @@ async fn node_info_well_known() -> HttpResponse<Body> {
.body(json.to_string())
}
-async fn node_info() -> HttpResponse<Body> {
- let conn = establish_connection();
- let site_view = match SiteView::read(&conn) {
- Ok(site_view) => site_view,
- Err(_e) => return HttpResponse::InternalServerError().finish(),
- };
- let protocols = if Settings::get().federation_enabled {
- vec!["activitypub"]
- } else {
- vec![]
- };
- let json = json!({
- "version": "2.0",
- "software": {
- "name": "lemmy",
- "version": version::VERSION,
- },
- "protocols": protocols,
- "usage": {
- "users": {
- "total": site_view.number_of_users
+async fn node_info(
+ db: web::Data<Pool<ConnectionManager<PgConnection>>>,
+) -> Result<HttpResponse, actix_web::Error> {
+ let res = web::block(move || {
+ let conn = db.get()?;
+ let site_view = match SiteView::read(&conn) {
+ Ok(site_view) => site_view,
+ Err(_) => return Err(format_err!("not_found")),
+ };
+ let protocols = if Settings::get().federation_enabled {
+ vec!["activitypub"]
+ } else {
+ vec![]
+ };
+ Ok(json!({
+ "version": "2.0",
+ "software": {
+ "name": "lemmy",
+ "version": version::VERSION,
},
- "localPosts": site_view.number_of_posts,
- "localComments": site_view.number_of_comments,
- "openRegistrations": site_view.open_registration,
+ "protocols": protocols,
+ "usage": {
+ "users": {
+ "total": site_view.number_of_users
+ },
+ "localPosts": site_view.number_of_posts,
+ "localComments": site_view.number_of_comments,
+ "openRegistrations": site_view.open_registration,
}
- });
- HttpResponse::Ok()
- .content_type("application/json")
- .body(json.to_string())
+ }))
+ })
+ .await
+ .map(|json| HttpResponse::Ok().json(json))
+ .map_err(|_| HttpResponse::InternalServerError())?;
+ Ok(res)
}
diff --git a/server/src/routes/webfinger.rs b/server/src/routes/webfinger.rs
index c538f5b1..20f76a9a 100644
--- a/server/src/routes/webfinger.rs
+++ b/server/src/routes/webfinger.rs
@@ -1,10 +1,10 @@
use crate::db::community::Community;
-use crate::db::establish_connection;
use crate::Settings;
-use actix_web::body::Body;
use actix_web::web;
use actix_web::web::Query;
use actix_web::HttpResponse;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
use regex::Regex;
use serde::Deserialize;
use serde_json::json;
@@ -37,54 +37,61 @@ lazy_static! {
///
/// You can also view the webfinger response that Mastodon sends:
/// https://radical.town/.well-known/webfinger?resource=acct:felix@radical.town
-async fn get_webfinger_response(info: Query<Params>) -> HttpResponse<Body> {
- let regex_parsed = WEBFINGER_COMMUNITY_REGEX
- .captures(&info.resource)
- .map(|c| c.get(1));
- // TODO: replace this with .flatten() once we are running rust 1.40
- let regex_parsed_flattened = match regex_parsed {
- Some(s) => s,
- None => None,
- };
- let community_name = match regex_parsed_flattened {
- Some(c) => c.as_str(),
- None => return HttpResponse::NotFound().finish(),
- };
+async fn get_webfinger_response(
+ info: Query<Params>,
+ db: web::Data<Pool<ConnectionManager<PgConnection>>>,
+) -> Result<HttpResponse, actix_web::Error> {
+ let res = web::block(move || {
+ let conn = db.get()?;
- // Make sure the requested community exists.
- let conn = establish_connection();
- let community = match Community::read_from_name(&conn, community_name.to_string()) {
- Ok(o) => o,
- Err(_) => return HttpResponse::NotFound().finish(),
- };
+ let regex_parsed = WEBFINGER_COMMUNITY_REGEX
+ .captures(&info.resource)
+ .map(|c| c.get(1));
+ // TODO: replace this with .flatten() once we are running rust 1.40
+ let regex_parsed_flattened = match regex_parsed {
+ Some(s) => s,
+ None => None,
+ };
+ let community_name = match regex_parsed_flattened {
+ Some(c) => c.as_str(),
+ None => return Err(format_err!("not_found")),
+ };
- let community_url = community.get_url();
+ // Make sure the requested community exists.
+ let community = match Community::read_from_name(&conn, community_name.to_string()) {
+ Ok(o) => o,
+ Err(_) => return Err(format_err!("not_found")),
+ };
- let json = json!({
+ let community_url = community.get_url();
+
+ Ok(json!({
"subject": info.resource,
"aliases": [
community_url,
],
"links": [
- {
- "rel": "http://webfinger.net/rel/profile-page",
- "type": "text/html",
- "href": community_url
- },
- {
- "rel": "self",
- "type": "application/activity+json",
- // Yes this is correct, this link doesn't include the `.json` extension
- "href": community_url
- }
- // TODO: this also needs to return the subscribe link once that's implemented
- //{
- // "rel": "http://ostatus.org/schema/1.0/subscribe",
- // "template": "https://my_instance.com/authorize_interaction?uri={uri}"
- //}
+ {
+ "rel": "http://webfinger.net/rel/profile-page",
+ "type": "text/html",
+ "href": community_url
+ },
+ {
+ "rel": "self",
+ "type": "application/activity+json",
+ // Yes this is correct, this link doesn't include the `.json` extension
+ "href": community_url
+ }
+ // TODO: this also needs to return the subscribe link once that's implemented
+ //{
+ // "rel": "http://ostatus.org/schema/1.0/subscribe",
+ // "template": "https://my_instance.com/authorize_interaction?uri={uri}"
+ //}
]
- });
- HttpResponse::Ok()
- .content_type("application/activity+json")
- .body(json.to_string())
+ }))
+ })
+ .await
+ .map(|json| HttpResponse::Ok().json(json))
+ .map_err(|_| HttpResponse::InternalServerError())?;
+ Ok(res)
}
diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs
index 8113a613..0d953d24 100644
--- a/server/src/routes/websocket.rs
+++ b/server/src/routes/websocket.rs
@@ -1,13 +1,24 @@
use crate::websocket::server::*;
+use crate::Settings;
use actix::prelude::*;
use actix_web::web;
use actix_web::*;
use actix_web_actors::ws;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
use std::time::{Duration, Instant};
pub fn config(cfg: &mut web::ServiceConfig) {
+ // TODO couldn't figure out how to get this method to recieve the other pool
+ let settings = Settings::get();
+ let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
+ let pool = Pool::builder()
+ .max_size(settings.database.pool_size)
+ .build(manager)
+ .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
+
// Start chat server actor in separate thread
- let server = ChatServer::default().start();
+ let server = ChatServer::startup(pool).start();
cfg
.data(server)
.service(web::resource("/api/v1/ws").to(chat_route));
@@ -24,9 +35,11 @@ async fn chat_route(
stream: web::Payload,
chat_server: web::Data<Addr<ChatServer>>,
) -> Result<HttpResponse, Error> {
+ // TODO not sure if the blocking should be here or not
ws::start(
WSSession {
- cs_addr: chat_server.get_ref().to_owned(),
+ // db: db.get_ref().clone(),
+ cs_addr: chat_server.get_ref().clone(),
id: 0,
hb: Instant::now(),
ip: req
@@ -51,6 +64,7 @@ struct WSSession {
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
+ // db: Pool<ConnectionManager<PgConnection>>,
}
impl Actor for WSSession {
@@ -127,7 +141,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
}
ws::Message::Text(text) => {
let m = text.trim().to_owned();
- println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
+ // println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
self
.cs_addr