diff options
Diffstat (limited to 'server/src/main.rs')
-rw-r--r-- | server/src/main.rs | 329 |
1 files changed, 159 insertions, 170 deletions
diff --git a/server/src/main.rs b/server/src/main.rs index 48074316..75a48865 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,11 +1,13 @@ extern crate lemmy_server; -#[macro_use] extern crate diesel_migrations; +#[macro_use] +extern crate diesel_migrations; use std::time::{Instant, Duration}; use std::env; -use lemmy_server::actix::*; -use lemmy_server::actix_web::server::HttpServer; -use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs}; +use actix_web::*; +use actix::prelude::*; +use actix_files::NamedFile; +use actix_web_actors::ws; use lemmy_server::websocket::server::*; use lemmy_server::db::establish_connection; @@ -16,203 +18,190 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); -/// This is our websocket route state, this state is shared with all route -/// instances via `HttpContext::state()` -struct WsChatSessionState { - addr: Addr<ChatServer>, -} /// Entry point for our route -fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { - ws::start( - req, - WSSession { - id: 0, - hb: Instant::now(), - ip: req.connection_info() - .remote() - .unwrap_or("127.0.0.1:12345") - .split(":") - .next() - .unwrap_or("127.0.0.1") - .to_string() - }, +fn chat_route(req: HttpRequest, stream: web::Payload, chat_server: web::Data<Addr<ChatServer>>) -> Result<HttpResponse, Error> { + ws::start( + WSSession { + cs_addr: chat_server.get_ref().to_owned(), + id: 0, + hb: Instant::now(), + ip: req.connection_info() + .remote() + .unwrap_or("127.0.0.1:12345") + .split(":") + .next() + .unwrap_or("127.0.0.1") + .to_string(), + }, + &req, + stream, ) } struct WSSession { - /// unique session id - id: usize, - ip: String, - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), - /// otherwise we drop connection. - hb: Instant + cs_addr: Addr<ChatServer>, + /// unique session id + id: usize, + ip: String, + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant, } impl Actor for WSSession { - type Context = ws::WebsocketContext<Self, WsChatSessionState>; - - /// Method is called on actor start. - /// We register ws session with ChatServer - fn started(&mut self, ctx: &mut Self::Context) { - // we'll start heartbeat process on session start. - self.hb(ctx); - - // register self in chat server. `AsyncContext::wait` register - // future within context, but context waits until this future resolves - // before processing any other events. - // HttpContext::state() is instance of WsChatSessionState, state is shared - // across all routes within application - let addr = ctx.address(); - ctx.state() - .addr - .send(Connect { - addr: addr.recipient(), - ip: self.ip.to_owned(), - }) - .into_actor(self) - .then(|res, act, ctx| { - match res { - Ok(res) => act.id = res, - // something is wrong with chat server - _ => ctx.stop(), - } - fut::ok(()) - }) - .wait(ctx); - } - - fn stopping(&mut self, ctx: &mut Self::Context) -> Running { - // notify chat server - ctx.state().addr.do_send(Disconnect { - id: self.id, - ip: self.ip.to_owned(), - }); - Running::Stop - } + type Context = ws::WebsocketContext<Self>; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // across all routes within application + let addr = ctx.address(); + self.cs_addr + .send(Connect { + addr: addr.recipient(), + ip: self.ip.to_owned(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + // notify chat server + self.cs_addr.do_send(Disconnect { + id: self.id, + ip: self.ip.to_owned(), + }); + Running::Stop + } } /// Handle messages from chat server, we simply send it to peer websocket /// These are room messages, IE sent to others in the room impl Handler<WSMessage> for WSSession { - type Result = (); + type Result = (); - fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { - // println!("id: {} msg: {}", self.id, msg.0); - ctx.text(msg.0); - } + fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + // println!("id: {} msg: {}", self.id, msg.0); + ctx.text(msg.0); + } } /// WebSocket message handler impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); - match msg { - ws::Message::Ping(msg) => { - self.hb = Instant::now(); - ctx.pong(&msg); - } - ws::Message::Pong(_) => { - self.hb = Instant::now(); - } - ws::Message::Text(text) => { - let m = text.trim().to_owned(); - println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); - - ctx.state() - .addr - .send(StandardMessage { - id: self.id, - msg: m, - }) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(res) => ctx.text(res), - Err(e) => { - eprintln!("{}", &e); - } + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); } - fut::ok(()) - }) - .wait(ctx); - } - ws::Message::Binary(_bin) => println!("Unexpected binary"), - ws::Message::Close(_) => { - ctx.stop(); - }, + ws::Message::Text(text) => { + let m = text.trim().to_owned(); + println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); + + self.cs_addr + .send(StandardMessage { + id: self.id, + msg: m, + }) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(res) => ctx.text(res), + Err(e) => { + eprintln!("{}", &e); + } + } + fut::ok(()) + }) + .wait(ctx); + } + ws::Message::Binary(_bin) => println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + } + _ => {} + } } - } } impl WSSession { - /// helper method that sends ping to client every second. - /// - /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // heartbeat timed out - println!("Websocket Client heartbeat failed, disconnecting!"); - - // notify chat server - ctx.state() - .addr - .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() }); - - // stop actor - ctx.stop(); - - // don't try to send a ping - return; - } + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + act.cs_addr + .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() }); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } - ctx.ping(""); - }); - } + ctx.ping(""); + }); + } } fn main() { - let _ = env_logger::init(); - let sys = actix::System::new("lemmy"); - - // Run the migrations from code - let conn = establish_connection(); - embedded_migrations::run(&conn).unwrap(); - - // Start chat server actor in separate thread - let server = Arbiter::start(|_| ChatServer::default()); - - // Create Http server with websocket support - HttpServer::new(move || { - // Websocket sessions state - let state = WsChatSessionState { - addr: server.clone(), - }; - - App::with_state(state) - // .resource("/api/v1/rest", |r| r.method(http::Method::POST).f(|_| {}) - .resource("/api/v1/ws", |r| r.route().f(chat_route)) - // static resources - .resource("/", |r| r.route().f(index)) - .handler( - "/static", - fs::StaticFiles::new(front_end_dir()).unwrap() - ) - .finish() - }).bind("0.0.0.0:8536") - .unwrap() - .start(); - - println!("Started http server: 0.0.0.0:8536"); - let _ = sys.run(); + let _ = env_logger::init(); + let sys = actix::System::new("lemmy"); + + // Run the migrations from code + let conn = establish_connection(); + embedded_migrations::run(&conn).unwrap(); + + // Start chat server actor in separate thread + let server = ChatServer::default().start(); + // Create Http server with websocket support + HttpServer::new(move || { + + App::new() + .data(server.clone()) + .service(web::resource("/api/v1/ws").to(chat_route)) +// .service(web::resource("/api/v1/rest").route(web::post().to(||{}))) + .service(web::resource("/").to(index)) + // static resources + .service(actix_files::Files::new("/static", front_end_dir())) + }).bind("0.0.0.0:8536") + .unwrap() + .start(); + + println!("Started http server: 0.0.0.0:8536"); + let _ = sys.run(); } -fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> { - Ok(NamedFile::open(front_end_dir() + "/index.html")?) +fn index() -> Result<NamedFile, actix_web::error::Error> { + Ok(NamedFile::open(front_end_dir() + "/index.html")?) } fn front_end_dir() -> String { - env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string()) + env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string()) } |