summaryrefslogtreecommitdiffstats
path: root/server/src
diff options
context:
space:
mode:
Diffstat (limited to 'server/src')
-rw-r--r--server/src/actions/comment.rs7
-rw-r--r--server/src/apub.rs2
-rw-r--r--server/src/bin/main.rs270
-rw-r--r--server/src/lib.rs22
-rw-r--r--server/src/websocket_server/mod.rs1
-rw-r--r--server/src/websocket_server/server.rs269
6 files changed, 563 insertions, 8 deletions
diff --git a/server/src/actions/comment.rs b/server/src/actions/comment.rs
index 104c13f2..d23382c6 100644
--- a/server/src/actions/comment.rs
+++ b/server/src/actions/comment.rs
@@ -4,6 +4,13 @@ use diesel::*;
use diesel::result::Error;
use {Crud, Likeable};
+// WITH RECURSIVE MyTree AS (
+// SELECT * FROM comment WHERE parent_id IS NULL
+// UNION ALL
+// SELECT m.* FROM comment AS m JOIN MyTree AS t ON m.parent_id = t.id
+// )
+// SELECT * FROM MyTree;
+
#[derive(Queryable, Identifiable, PartialEq, Debug)]
#[table_name="comment"]
pub struct Comment {
diff --git a/server/src/apub.rs b/server/src/apub.rs
index 16b8be1b..6272fedc 100644
--- a/server/src/apub.rs
+++ b/server/src/apub.rs
@@ -34,7 +34,7 @@ mod tests {
use super::User_;
use naive_now;
- #[test]
+ #[test]
fn test_person() {
let expected_user = User_ {
id: 52,
diff --git a/server/src/bin/main.rs b/server/src/bin/main.rs
new file mode 100644
index 00000000..25181aaa
--- /dev/null
+++ b/server/src/bin/main.rs
@@ -0,0 +1,270 @@
+extern crate server;
+
+use std::time::{Instant, Duration};
+use server::actix::*;
+use server::actix_web::server::HttpServer;
+use server::actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse};
+
+
+/// How often heartbeat pings are sent
+const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
+/// How long before lack of client response causes a timeout
+const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
+
+use server::websocket_server::server::*;
+use std::str::FromStr;
+// use server::websocket_server::server::UserOperation::from_str;
+
+/// This is our websocket route state, this state is shared with all route
+/// instances via `HttpContext::state()`
+struct WsChatSessionState {
+ addr: Addr<ChatServer>,
+}
+
+/// Entry point for our route
+fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
+ ws::start(
+ req,
+ WSSession {
+ id: 0,
+ hb: Instant::now()
+ },
+ )
+}
+
+struct WSSession {
+ /// unique session id
+ id: usize,
+ /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
+ /// otherwise we drop connection.
+ hb: Instant
+}
+
+impl Actor for WSSession {
+ type Context = ws::WebsocketContext<Self, WsChatSessionState>;
+
+ /// Method is called on actor start.
+ /// We register ws session with ChatServer
+ fn started(&mut self, ctx: &mut Self::Context) {
+ // we'll start heartbeat process on session start.
+ self.hb(ctx);
+
+ // register self in chat server. `AsyncContext::wait` register
+ // future within context, but context waits until this future resolves
+ // before processing any other events.
+ // HttpContext::state() is instance of WsChatSessionState, state is shared
+ // across all routes within application
+ let addr = ctx.address();
+ ctx.state()
+ .addr
+ .send(Connect {
+ addr: addr.recipient(),
+ })
+ .into_actor(self)
+ .then(|res, act, ctx| {
+ match res {
+ Ok(res) => act.id = res,
+ // something is wrong with chat server
+ _ => ctx.stop(),
+ }
+ fut::ok(())
+ })
+ .wait(ctx);
+ }
+
+ fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
+ // notify chat server
+ ctx.state().addr.do_send(Disconnect { id: self.id });
+ Running::Stop
+ }
+}
+
+/// Handle messages from chat server, we simply send it to peer websocket
+impl Handler<WSMessage> for WSSession {
+ type Result = ();
+
+ fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
+ ctx.text(msg.0);
+ }
+}
+
+use server::serde_json::Value;
+/// WebSocket message handler
+impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
+ fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
+ // println!("WEBSOCKET MESSAGE: {:?}", msg);
+ match msg {
+ ws::Message::Ping(msg) => {
+ self.hb = Instant::now();
+ ctx.pong(&msg);
+ }
+ ws::Message::Pong(_) => {
+ self.hb = Instant::now();
+ }
+ ws::Message::Text(text) => {
+ let m = text.trim();
+ let json: Value = serde_json::from_str(m).unwrap();
+
+ // Get the OP command, and its data
+ let op: &str = &json["op"].as_str().unwrap();
+ let data: &Value = &json["data"];
+
+ let user_operation: UserOperation = UserOperation::from_str(op).unwrap();
+
+ match user_operation {
+ UserOperation::Login => {
+ let login: Login = serde_json::from_str(&data.to_string()).unwrap();
+ ctx.state()
+ .addr
+ .do_send(login);
+ },
+ UserOperation::Register => {
+ let register: Register = serde_json::from_str(&data.to_string()).unwrap();
+ ctx.state()
+ .addr
+ .send(register)
+ .into_actor(self)
+ .then(|res, _, ctx| {
+ match res {
+ Ok(wut) => ctx.text(wut),
+ _ => println!("Something is wrong"),
+ }
+ fut::ok(())
+ })
+ .wait(ctx)
+ }
+ _ => ctx.text(format!("!!! unknown command: {:?}", m)),
+ }
+
+ // we check for /sss type of messages
+ // if m.starts_with('/') {
+ // let v: Vec<&str> = m.splitn(2, ' ').collect();
+ // match v[0] {
+ // "/list" => {
+ // // Send ListRooms message to chat server and wait for
+ // // response
+ // println!("List rooms");
+ // ctx.state()
+ // .addr
+ // .send(ListRooms)
+ // .into_actor(self)
+ // .then(|res, _, ctx| {
+ // match res {
+ // Ok(rooms) => {
+ // for room in rooms {
+ // ctx.text(room);
+ // }
+ // }
+ // _ => println!("Something is wrong"),
+ // }
+ // fut::ok(())
+ // })
+ // .wait(ctx)
+ // .wait(ctx) pauses all events in context,
+ // so actor wont receive any new messages until it get list
+ // of rooms back
+ // }
+ // "/join" => {
+ // if v.len() == 2 {
+ // self.room = v[1].to_owned();
+ // ctx.state().addr.do_send(Join {
+ // id: self.id,
+ // name: self.room.clone(),
+ // });
+
+ // ctx.text("joined");
+ // } else {
+ // ctx.text("!!! room name is required");
+ // }
+ // }
+ // "/name" => {
+ // if v.len() == 2 {
+ // self.name = Some(v[1].to_owned());
+ // } else {
+ // ctx.text("!!! name is required");
+ // }
+ // }
+ // _ => ctx.text(format!("!!! unknown command: {:?}", m)),
+ // }
+ // } else {
+ // let msg = if let Some(ref name) = self.name {
+ // format!("{}: {}", name, m)
+ // } else {
+ // m.to_owned()
+ // };
+ // send message to chat server
+ // ctx.state().addr.do_send(ClientMessage {
+ // id: self.id,
+ // msg: msg,
+ // room: self.room.clone(),
+ // })
+ // }
+ }
+ ws::Message::Binary(_bin) => println!("Unexpected binary"),
+ ws::Message::Close(_) => {
+ ctx.stop();
+ },
+ }
+ }
+}
+
+impl WSSession {
+ /// helper method that sends ping to client every second.
+ ///
+ /// also this method checks heartbeats from client
+ fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
+ ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
+ // check client heartbeats
+ if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
+ // heartbeat timed out
+ println!("Websocket Client heartbeat failed, disconnecting!");
+
+ // notify chat server
+ ctx.state()
+ .addr
+ .do_send(Disconnect { id: act.id });
+
+ // stop actor
+ ctx.stop();
+
+ // don't try to send a ping
+ return;
+ }
+
+ ctx.ping("");
+ });
+ }
+}
+
+fn main() {
+ let _ = env_logger::init();
+ let sys = actix::System::new("rust-reddit-fediverse-server");
+
+ // Start chat server actor in separate thread
+ let server = Arbiter::start(|_| ChatServer::default());
+
+ // Create Http server with websocket support
+ HttpServer::new(move || {
+ // Websocket sessions state
+ let state = WsChatSessionState {
+ addr: server.clone(),
+ };
+
+ App::with_state(state)
+ // redirect to websocket.html
+ // .resource("/", |r| r.method(http::Method::GET).f(|_| {
+ // HttpResponse::Found()
+ // .header("LOCATION", "/static/websocket.html")
+ // .finish()
+ // }))
+ // // websocket
+ .resource("/service/ws", |r| r.route().f(chat_route))
+ // static resources
+ // .handler("/static/", fs::StaticFiles::new("static/").unwrap())
+ }).bind("127.0.0.1:8080")
+ .unwrap()
+ .start();
+
+ println!("Started http server: 127.0.0.1:8080");
+ let _ = sys.run();
+}
diff --git a/server/src/lib.rs b/server/src/lib.rs
index b1a1f252..3daeb8d2 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -1,7 +1,19 @@
#[macro_use]
-extern crate diesel;
-extern crate dotenv;
-extern crate chrono;
+pub extern crate diesel;
+pub extern crate dotenv;
+pub extern crate chrono;
+pub extern crate serde;
+pub extern crate serde_json;
+pub extern crate actix;
+pub extern crate actix_web;
+pub extern crate rand;
+pub extern crate strum;
+#[macro_use] pub extern crate strum_macros;
+
+pub mod schema;
+pub mod apub;
+pub mod actions;
+pub mod websocket_server;
use diesel::*;
use diesel::pg::PgConnection;
@@ -9,11 +21,7 @@ use diesel::result::Error;
use dotenv::dotenv;
use std::env;
-pub mod schema;
-pub mod apub;
-pub mod actions;
-// pub trait Likeable;
pub trait Crud<T> {
fn create(conn: &PgConnection, form: T) -> Result<Self, Error> where Self: Sized;
fn read(conn: &PgConnection, id: i32) -> Self;
diff --git a/server/src/websocket_server/mod.rs b/server/src/websocket_server/mod.rs
new file mode 100644
index 00000000..74f47ad3
--- /dev/null
+++ b/server/src/websocket_server/mod.rs
@@ -0,0 +1 @@
+pub mod server;
diff --git a/server/src/websocket_server/server.rs b/server/src/websocket_server/server.rs
new file mode 100644
index 00000000..2d410176
--- /dev/null
+++ b/server/src/websocket_server/server.rs
@@ -0,0 +1,269 @@
+//! `ChatServer` is an actor. It maintains list of connection client session.
+//! And manages available rooms. Peers send messages to other peers in same
+//! room through `ChatServer`.
+
+use actix::prelude::*;
+use rand::{rngs::ThreadRng, Rng};
+use std::collections::{HashMap, HashSet};
+use serde::{Deserialize, Serialize};
+
+use {Crud,establish_connection};
+
+#[derive(EnumString,ToString,Debug)]
+pub enum UserOperation {
+ Login, Register, Logout, Join, Edit, Reply, Vote, Delete, NextPage, Sticky
+}
+
+pub enum MessageType {
+ Comments, Users, Ping, Pong
+}
+
+
+
+/// Chat server sends this messages to session
+#[derive(Message)]
+pub struct WSMessage(pub String);
+
+/// Message for chat server communications
+
+/// New chat session is created
+#[derive(Message)]
+#[rtype(usize)]
+pub struct Connect {
+ pub addr: Recipient<WSMessage>,
+}
+
+/// Session is disconnected
+#[derive(Message)]
+pub struct Disconnect {
+ pub id: usize,
+}
+
+/// Send message to specific room
+#[derive(Message)]
+pub struct ClientMessage {
+ /// Id of the client session
+ pub id: usize,
+ /// Peer message
+ pub msg: String,
+ /// Room name
+ pub room: String,
+}
+
+/// List of available rooms
+pub struct ListRooms;
+
+impl actix::Message for ListRooms {
+ type Result = Vec<String>;
+}
+
+/// Join room, if room does not exists create new one.
+#[derive(Message)]
+pub struct Join {
+ /// Client id
+ pub id: usize,
+ /// Room name
+ pub name: String,
+}
+
+#[derive(Message)]
+#[derive(Serialize, Deserialize)]
+pub struct Login {
+ pub username: String,
+ pub password: String
+}
+
+// #[derive(Message)]
+#[derive(Serialize, Deserialize)]
+pub struct Register {
+ username: String,
+ email: Option<String>,
+ password: String,
+ password_verify: String
+}
+
+impl actix::Message for Register {
+ type Result = String;
+}
+/// `ChatServer` manages chat rooms and responsible for coordinating chat
+/// session. implementation is super primitive
+pub struct ChatServer {
+ sessions: HashMap<usize, Recipient<WSMessage>>, // A map from generated random ID to session addr
+ rooms: HashMap<String, HashSet<usize>>, // A map from room name to set of connectionIDs
+ rng: ThreadRng,
+}
+
+impl Default for ChatServer {
+ fn default() -> ChatServer {
+ // default room
+ let mut rooms = HashMap::new();
+ rooms.insert("Main".to_owned(), HashSet::new());
+
+ ChatServer {
+ sessions: HashMap::new(),
+ rooms: rooms,
+ rng: rand::thread_rng(),
+ }
+ }
+}
+
+impl ChatServer {
+ /// Send message to all users in the room
+ fn send_room_message(&self, room: &str, message: &str, skip_id: usize) {
+ if let Some(sessions) = self.rooms.get(room) {
+ for id in sessions {
+ if *id != skip_id {
+ if let Some(addr) = self.sessions.get(id) {
+ let _ = addr.do_send(WSMessage(message.to_owned()));
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Make actor from `ChatServer`
+impl Actor for ChatServer {
+ /// We are going to use simple Context, we just need ability to communicate
+ /// with other actors.
+ type Context = Context<Self>;
+}
+
+/// Handler for Connect message.
+///
+/// Register new session and assign unique id to this session
+impl Handler<Connect> for ChatServer {
+ type Result = usize;
+
+ fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
+ println!("Someone joined");
+
+ // notify all users in same room
+ self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
+
+ // register session with random id
+ let id = self.rng.gen::<usize>();
+ self.sessions.insert(id, msg.addr);
+
+ // auto join session to Main room
+ self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
+
+ // send id back
+ id
+ }
+}
+
+/// Handler for Disconnect message.
+impl Handler<Disconnect> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
+ println!("Someone disconnected");
+
+ let mut rooms: Vec<String> = Vec::new();
+
+ // remove address
+ if self.sessions.remove(&msg.id).is_some() {
+ // remove session from all rooms
+ for (name, sessions) in &mut self.rooms {
+ if sessions.remove(&msg.id) {
+ rooms.push(name.to_owned());
+ }
+ }
+ }
+ // send message to other users
+ for room in rooms {
+ self.send_room_message(&room, "Someone disconnected", 0);
+ }
+ }
+}
+
+/// Handler for Message message.
+impl Handler<ClientMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: ClientMessage, _: &mut Context<Self>) {
+ self.send_room_message(&msg.room, msg.msg.as_str(), msg.id);
+ }
+}
+
+/// Handler for `ListRooms` message.
+impl Handler<ListRooms> for ChatServer {
+ type Result = MessageResult<ListRooms>;
+
+ fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result {
+ let mut rooms = Vec::new();
+
+ for key in self.rooms.keys() {
+ rooms.push(key.to_owned())
+ }
+
+ MessageResult(rooms)
+ }
+}
+
+/// Join room, send disconnect message to old room
+/// send join message to new room
+impl Handler<Join> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: Join, _: &mut Context<Self>) {
+ let Join { id, name } = msg;
+ let mut rooms = Vec::new();
+
+ // remove session from all rooms
+ for (n, sessions) in &mut self.rooms {
+ if sessions.remove(&id) {
+ rooms.push(n.to_owned());
+ }
+ }
+ // send message to other users
+ for room in rooms {
+ self.send_room_message(&room, "Someone disconnected", 0);
+ }
+
+ if self.rooms.get_mut(&name).is_none() {
+ self.rooms.insert(name.clone(), HashSet::new());
+ }
+ self.send_room_message(&name, "Someone connected", id);
+ self.rooms.get_mut(&name).unwrap().insert(id);
+ }
+
+}
+
+impl Handler<Login> for ChatServer {
+
+ type Result = ();
+ fn handle(&mut self, msg: Login, _: &mut Context<Self>) {
+ println!("{}", msg.password);
+
+ }
+}
+
+impl Handler<Register> for ChatServer {
+
+ type Result = MessageResult<Register>;
+ fn handle(&mut self, msg: Register, _: &mut Context<Self>) -> Self::Result {
+
+ use actions::user::*;
+ let conn = establish_connection();
+
+ // TODO figure out how to return values, and throw errors
+
+ // Register the new user
+ let user_form = UserForm {
+ name: &msg.username,
+ email: msg.email.as_ref().map(|x| &**x),
+ password_encrypted: &msg.password,
+ preferred_username: None,
+ updated: None
+ };
+
+ let inserted_user = User_::create(&conn, user_form).unwrap();
+
+
+ // Return the jwt
+ MessageResult("hi".to_string())
+
+ }
+}