From 567887cc75170437f75f19f5966f2b32bf49ab72 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 5 Feb 2018 20:45:12 -0800 Subject: Add a chat example (#112) --- examples/chat-combinator.rs | 138 +++++++++++ examples/chat.rs | 541 +++++++++++++++++++++++++++++++++++--------- examples/hello_world.rs | 80 +++++++ 3 files changed, 652 insertions(+), 107 deletions(-) create mode 100644 examples/chat-combinator.rs create mode 100644 examples/hello_world.rs (limited to 'examples') diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs new file mode 100644 index 00000000..667f0e9a --- /dev/null +++ b/examples/chat-combinator.rs @@ -0,0 +1,138 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a simple line-based server which accepts connections, reads lines +//! from those connections, and broadcasts the lines to all other connected +//! clients. In a sense this is a bit of a "poor man's chat server". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +extern crate futures; +extern crate futures_cpupool; +extern crate tokio; +extern crate tokio_io; + +use std::collections::HashMap; +use std::iter; +use std::env; +use std::io::{Error, ErrorKind, BufReader}; +use std::sync::{Arc, Mutex}; + +use futures::Future; +use futures::future::{self, Executor}; +use futures::stream::{self, Stream}; +use futures_cpupool::CpuPool; +use tokio::net::TcpListener; +use tokio_io::io; +use tokio_io::AsyncRead; + +fn main() { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse().unwrap(); + + // Create the TCP listener we'll accept connections on. + let socket = TcpListener::bind(&addr).unwrap(); + println!("Listening on: {}", addr); + + // This is currently a multi threaded server. + // + // Once the same thread executor lands, transition to single threaded. + let connections = Arc::new(Mutex::new(HashMap::new())); + + let srv = socket.incoming().for_each(move |stream| { + let addr = stream.peer_addr().unwrap(); + + println!("New Connection: {}", addr); + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + connections.lock().unwrap().insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, Error>(iter::repeat(())); + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| { + (reader, String::from_utf8(vec)) + }); + let connections = connections_inner.clone(); + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.lock().unwrap(); + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns.iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); + } + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + let pool = CpuPool::new(1); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + pool.execute(connection.then(move |_| { + connections.lock().unwrap().remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + })).unwrap(); + + Ok(()) + }); + + // execute server + future::blocking(srv).wait().unwrap(); +} diff --git a/examples/chat.rs b/examples/chat.rs index 667f0e9a..da8889fd 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -1,138 +1,465 @@ //! A chat server that broadcasts a message to all connections. //! -//! This is a simple line-based server which accepts connections, reads lines -//! from those connections, and broadcasts the lines to all other connected -//! clients. In a sense this is a bit of a "poor man's chat server". +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines send by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". //! //! You can test this out by running: //! //! cargo run --example chat //! -//! And then in another window run: +//! And then in another terminal run: +//! +//! telnet localhost 6142 //! -//! cargo run --example connect 127.0.0.1:8080 +//! You can run the `telnet` command in any number of additional windows. //! //! You can run the second command in multiple windows and then chat between the //! two, seeing the messages from the other client as they're received. For all //! connected clients they'll all join the same room and see everyone else's //! messages. +#![deny(warnings)] + +#[macro_use] extern crate futures; -extern crate futures_cpupool; extern crate tokio; +#[macro_use] extern crate tokio_io; +extern crate bytes; + +use tokio::net::{TcpListener, TcpStream}; +use tokio_io::{AsyncRead}; +use futures::prelude::*; +use futures::current_thread; +use futures::sync::mpsc; +use futures::future::{self, Either}; +use bytes::{BytesMut, Bytes, BufMut}; +use std::io::{self, Write}; +use std::cell::RefCell; use std::collections::HashMap; -use std::iter; -use std::env; -use std::io::{Error, ErrorKind, BufReader}; -use std::sync::{Arc, Mutex}; - -use futures::Future; -use futures::future::{self, Executor}; -use futures::stream::{self, Stream}; -use futures_cpupool::CpuPool; -use tokio::net::TcpListener; -use tokio_io::io; -use tokio_io::AsyncRead; - -fn main() { - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse().unwrap(); - - // Create the TCP listener we'll accept connections on. - let socket = TcpListener::bind(&addr).unwrap(); - println!("Listening on: {}", addr); - - // This is currently a multi threaded server. - // - // Once the same thread executor lands, transition to single threaded. - let connections = Arc::new(Mutex::new(HashMap::new())); - - let srv = socket.incoming().for_each(move |stream| { - let addr = stream.peer_addr().unwrap(); - - println!("New Connection: {}", addr); - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - connections.lock().unwrap().insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, Error>(iter::repeat(())); - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) +use std::net::SocketAddr; +use std::rc::Rc; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap, +} + +/// The state for each connected client. +struct Peer { + /// Name of the peer. + /// + /// When a client connects, the first line sent is treated as the client's + /// name (like alice or bob). The name is used to preface all messages that + /// arrive from the client so that we can simulate a real chat server: + /// + /// ```text + /// alice: Hello everyone. + /// bob: Welcome to telnet chat! + /// ``` + name: BytesMut, + + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Lines, + + /// Handle to the shared chat state. + /// + /// This is used to broadcast messages read off the socket to all connected + /// peers. + state: Rc>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, + + /// Client socket address. + /// + /// The socket address is used as the key in the `peers` HashMap. The + /// address is saved so that the `Peer` drop implementation can clean up its + /// entry. + addr: SocketAddr, +} + +/// Line based codec +/// +/// This decorates a socket and presents a line based read / write interface. +/// +/// As a user of `Lines`, we can focus on working at the line level. So, we send +/// and receive values that represent entire lines. The `Lines` codec will +/// handle the encoding and decoding as well as reading from and writing to the +/// socket. +#[derive(Debug)] +struct Lines { + /// The TCP socket. + socket: TcpStream, + + /// Buffer used when reading from the socket. Data is not returned from this + /// buffer until an entire line has been read. + rd: BytesMut, + + /// Buffer used to stage data before writing it to the socket. + wr: BytesMut, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } +} + +impl Peer { + /// Create a new instance of `Peer`. + fn new(name: BytesMut, + state: Rc>, + lines: Lines) -> Peer + { + // Get the client socket address + let addr = lines.socket.peer_addr().unwrap(); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + // Add an entry for this `Peer` in the shared state map. + state.borrow_mut().peers.insert(addr, tx); + + Peer { + name, + lines, + state, + rx, + addr, + } + } +} + +/// This is where a connected client is managed. +/// +/// A `Peer` is also a future representing completly processing the client. +/// +/// When a `Peer` is created, the first line (representing the client's name) +/// has already been read. When the socket closes, the `Peer` future completes. +/// +/// While processing, the peer future implementation will: +/// +/// 1) Receive messages on its message channel and write them to the socket. +/// 2) Receive messages from the socket and broadcast them to all peers. +/// +impl Future for Peer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + // Receive all messages from peers. + loop { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the line. Once all lines are buffered, they will + // be flushed to the socket (right below). + self.lines.buffer(&v); } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| { - (reader, String::from_utf8(vec)) - }); - let connections = connections_inner.clone(); - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.lock().unwrap(); - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns.iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.lines.poll_flush()?; + + // Read new lines from the socket + while let Async::Ready(line) = self.lines.poll()? { + println!("Received line ({:?}) : {:?}", self.name, line); + + if let Some(message) = line { + // Append the peer's name to the front of the line: + let mut line = self.name.clone(); + line.put(": "); + line.put(&message); + line.put("\r\n"); + + // We're using `Bytes`, which allows zero-copy clones (by + // storing the data in an Arc internally). + // + // However, before cloning, we must freeze the data. This + // converts it from mutable -> immutable, allowing zero copy + // cloning. + let line = line.freeze(); + + // Now, send the line to all other peers + for (addr, tx) in &self.state.borrow().peers { + // Don't send the message to ourselves + if *addr != self.addr { + // The send only fails if the rx half has been dropped, + // however this is impossible as the `tx` half will be + // removed from the map before the `rx` is dropped. + tx.unbounded_send(line.clone()).unwrap(); } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); } - reader - }) - }); + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.lines`, so + // the contract is respected. + Ok(Async::NotReady) + } +} - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) +impl Drop for Peer { + fn drop(&mut self) { + self.state.borrow_mut().peers + .remove(&self.addr); + } +} + +impl Lines { + /// Create a new `Lines` codec backed by the socket + fn new(socket: TcpStream) -> Self { + Lines { + socket, + rd: BytesMut::new(), + wr: BytesMut::new(), + } + } + + /// Buffer a line. + /// + /// This writes the line to an internal buffer. Calls to `poll_flush` will + /// attempt to flush this buffer to the socket. + fn buffer(&mut self, line: &[u8]) { + // Push the line onto the end of the write buffer. + // + // The `put` function is from the `BufMut` trait. + self.wr.put(line); + } + + /// Flush the write buffer to the socket + fn poll_flush(&mut self) -> Poll<(), io::Error> { + // As long as there is buffered data to write, try to write it. + while !self.wr.is_empty() { + // `try_nb` is kind of like `try_ready`, but for operations that + // return `io::Result` instead of `Async`. + // + // In the case of `io::Result`, an error of `WouldBlock` is + // equivalent to `Async::NotReady. + let n = try_nb!(self.socket.write(&self.wr)); + + // As long as the wr is not empty, a successful write should + // never write 0 bytes. + assert!(n > 0); + + // This discards the first `n` bytes of the buffer. + let _ = self.wr.split_to(n); + } + + Ok(Async::Ready(())) + } + + /// Read data from the socket. + /// + /// This only returns `Ready` when the socket has closed. + fn fill_read_buf(&mut self) -> Poll<(), io::Error> { + loop { + // Ensure the read buffer has capacity. + // + // This might result in an internal allocation. + self.rd.reserve(1024); + + // Read data into the buffer. + let n = try_ready!(self.socket.read_buf(&mut self.rd)); + + if n == 0 { + return Ok(Async::Ready(())); + } + } + } +} + +impl Stream for Lines { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // First, read any new data that might have been received off the socket + let sock_closed = self.fill_read_buf()?.is_ready(); + + // Now, try finding lines + let pos = self.rd.windows(2).enumerate() + .find(|&(_, bytes)| bytes == b"\r\n") + .map(|(i, _)| i); + + if let Some(pos) = pos { + // Remove the line from the read buffer and set it to `line`. + let mut line = self.rd.split_to(pos + 2); + + // Drop the trailing \r\n + line.split_off(pos); + + // Return the line + return Ok(Async::Ready(Some(line))); + } + + if sock_closed { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Spawn a task to manage the socket. +/// +/// This will read the first line from the socket to identify the client, then +/// add the client to the set of connected peers in the chat service. +fn process(socket: TcpStream, state: Rc>) { + // Wrap the socket with the `Lines` codec that we wrote above. + // + // By doing this, we can operate at the line level instead of doing raw byte + // manipulation. + let lines = Lines::new(socket); + + // The first line is treated as the client's name. The client is not added + // to the set of connected peers until this line is received. + // + // We use the `into_future` combinator to extract the first item from the + // lines stream. `into_future` takes a `Stream` and converts it to a future + // of `(first, rest)` where `rest` is the original stream instance. + let connection = lines.into_future() + // `into_future` doesn't have the right error type, so map the error to + // make it work. + .map_err(|(e, _)| e) + // Process the first received line as the client's name. + .and_then(|(name, lines)| { + // If `name` is `None`, then the client disconnected without + // actually sending a line of data. + // + // Since the connection is closed, there is no further work that we + // need to do. So, we just terminate processing by returning + // `future::ok()`. + // + // The problem is that only a single future type can be returned + // from a combinator closure, but we want to return both + // `future::ok()` and `Peer` (below). + // + // This is a common problem, so the `futures` crate solves this by + // providing the `Either` helper enum that allows creating a single + // return type that covers two concrete future types. + let name = match name { + Some(name) => name, + None => { + // The remote client closed the connection without sending + // any data. + return Either::A(future::ok(())); + } + }; + + println!("`{:?}` is joining the chat", name); + + // Create the peer. + // + // This is also a future that processes the connection, only + // completing when the socket closes. + let peer = Peer::new( + name, + state, + lines); + + // Wrap `peer` with `Either::B` to make the return type fit. + Either::B(peer) + }) + // Task futures have an error of type `()`, this ensures we handle the + // error. We do this by printing the error to STDOUT. + .map_err(|e| { + println!("connection error = {:?}", e); }); - let pool = CpuPool::new(1); + // Spawn a new task that processes the socket: + current_thread::spawn(connection); +} + +pub fn main() { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Rc::new(RefCell::new(Shared::new())); - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - pool.execute(connection.then(move |_| { - connections.lock().unwrap().remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - })).unwrap(); + let addr = "127.0.0.1:6142".parse().unwrap(); + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr).unwrap(); + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener.incoming().for_each(move |socket| { + process(socket, state.clone()); Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); }); - // execute server - future::blocking(srv).wait().unwrap(); + // This starts the `current_thread` executor. + // + // Executors are responsible for scheduling many asynchronous tasks, driving + // them to completion. There are a number of different executor + // implementations, each providing different scheduling characteristics. + // + // The `current_thread` executor multiplexes all scheduled tasks on the + // current thread. This means that spawned tasks must not implement `Send`. + current_thread::run(|_| { + // Now, the server task must be spawned. + // + // It's important to note that all futures / tasks are lazy. No work + // will happen unless they are spawned onto an executor. + current_thread::spawn(server); + + println!("server running on localhost:6142"); + + // The `current_thread::run` function will now block until *all* spawned + // tasks complete. + // + // In our example, we have not defined a shutdown strategy, so + // this will block until `ctrl-c` is pressed at the terminal. + }); } diff --git a/examples/hello_world.rs b/examples/hello_world.rs new file mode 100644 index 00000000..5cac1259 --- /dev/null +++ b/examples/hello_world.rs @@ -0,0 +1,80 @@ +//! Hello world server. +//! +//! A simple server that accepts connections, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! cargo run --example hello_world +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_io; +extern crate futures; + +use tokio::net::TcpListener; +use tokio_io::io; +use futures::{current_thread, Future, Stream}; + +pub fn main() { + let addr = "127.0.0.1:6142".parse().unwrap(); + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr).unwrap(); + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener.incoming().for_each(|socket| { + println!("accepted socket; addr={:?}", socket.peer_addr().unwrap()); + + let connection = io::write_all(socket, "hello world\n") + .then(|res| { + println!("wrote message; success={:?}", res.is_ok()); + Ok(()) + }); + + // Spawn a new task that processes the socket: + current_thread::spawn(connection); + + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + // This starts the `current_thread` executor. + // + // Executors are responsible for scheduling many asynchronous tasks, driving + // them to completion. There are a number of different executor + // implementations, each providing different scheduling characteristics. + // + // The `current_thread` executor multiplexes all scheduled tasks on the + // current thread. This means that spawned tasks must not implement `Send`. + current_thread::run(|_| { + // Now, the server task must be spawned. + // + // It's important to note that all futures / tasks are lazy. No work + // will happen unless they are spawned onto an executor. + current_thread::spawn(server); + + println!("server running on localhost:6142"); + + // The `current_thread::run` function will now block until *all* spawned + // tasks complete. + // + // In our example, we have not defined a shutdown strategy, so + // this will block until `ctrl-c` is pressed at the terminal. + }); +} -- cgit v1.2.3