From 9e4ddaeaf3ffcc6794c6669fd6a2019d3a113cbd Mon Sep 17 00:00:00 2001 From: jq-rs Date: Sat, 29 Dec 2018 17:16:30 +0200 Subject: examples: single-threaded chat combinator example (#794) --- examples/chat-combinator-current-thread.rs | 167 +++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 examples/chat-combinator-current-thread.rs (limited to 'examples') diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs new file mode 100644 index 00000000..c528eeec --- /dev/null +++ b/examples/chat-combinator-current-thread.rs @@ -0,0 +1,167 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! Because we are here running the reactor/executor on the same thread instead +//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use +//! Rc + RefCell instead. The max performance is however limited to a CPU HW +//! thread. +//! +//! You can test this out by running: +//! +//! cargo run --example chat-combinator-current-thread +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +extern crate futures; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio::runtime::current_thread::{Runtime, TaskExecutor}; + +use std::collections::HashMap; +use std::iter; +use std::env; +use std::io::{BufReader}; +use std::rc::Rc; +use std::cell::RefCell; + + +fn main() -> Result<(), Box> { + let mut runtime = Runtime::new().unwrap(); + + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse()?; + + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // This is running on the Tokio current_thread runtime, so it will be single- + // threaded. The `Rc>` allows state to be shared across the tasks. + let connections = Rc::new(RefCell::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket.incoming() + .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e}) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr()?; + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + let mut conns = connections.borrow_mut(); + conns.insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| { + (reader, String::from_utf8(vec)) + }); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.borrow_mut(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns.iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn locally a task to process the connection + TaskExecutor::current().spawn_local(Box::new(connection.then(move |_| { + let mut conns = connections.borrow_mut(); + conns.remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + }))).unwrap(); + + Ok(()) + }) + .map_err(|err| println!("error occurred: {:?}", err)); + + // Spawn srv itself + runtime.spawn(srv); + + // Execute server + runtime.run().unwrap(); + Ok(()) +} -- cgit v1.2.3