summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-02-05 20:45:12 -0800
committerGitHub <noreply@github.com>2018-02-05 20:45:12 -0800
commit567887cc75170437f75f19f5966f2b32bf49ab72 (patch)
tree78df1b14110e9c1aec9fc7006979fb5e907bbc97 /examples/chat.rs
parent3840ceafeeb43e9489ae7df2b0b1f54c9dd1d401 (diff)
Add a chat example (#112)
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs541
1 files changed, 434 insertions, 107 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 667f0e9a..da8889fd 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -1,138 +1,465 @@
//! A chat server that broadcasts a message to all connections.
//!
-//! This is a simple line-based server which accepts connections, reads lines
-//! from those connections, and broadcasts the lines to all other connected
-//! clients. In a sense this is a bit of a "poor man's chat server".
+//! This example is explicitly more verbose than it has to be. This is to
+//! illustrate more concepts.
+//!
+//! A chat server for telnet clients. After a telnet client connects, the first
+//! line should contain the client's name. After that, all lines send by a
+//! client are broadcasted to all other connected clients.
+//!
+//! Because the client is telnet, lines are delimited by "\r\n".
//!
//! You can test this out by running:
//!
//! cargo run --example chat
//!
-//! And then in another window run:
+//! And then in another terminal run:
+//!
+//! telnet localhost 6142
//!
-//! cargo run --example connect 127.0.0.1:8080
+//! You can run the `telnet` command in any number of additional windows.
//!
//! You can run the second command in multiple windows and then chat between the
//! two, seeing the messages from the other client as they're received. For all
//! connected clients they'll all join the same room and see everyone else's
//! messages.
+#![deny(warnings)]
+
+#[macro_use]
extern crate futures;
-extern crate futures_cpupool;
extern crate tokio;
+#[macro_use]
extern crate tokio_io;
+extern crate bytes;
+
+use tokio::net::{TcpListener, TcpStream};
+use tokio_io::{AsyncRead};
+use futures::prelude::*;
+use futures::current_thread;
+use futures::sync::mpsc;
+use futures::future::{self, Either};
+use bytes::{BytesMut, Bytes, BufMut};
+use std::io::{self, Write};
+use std::cell::RefCell;
use std::collections::HashMap;
-use std::iter;
-use std::env;
-use std::io::{Error, ErrorKind, BufReader};
-use std::sync::{Arc, Mutex};
-
-use futures::Future;
-use futures::future::{self, Executor};
-use futures::stream::{self, Stream};
-use futures_cpupool::CpuPool;
-use tokio::net::TcpListener;
-use tokio_io::io;
-use tokio_io::AsyncRead;
-
-fn main() {
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse().unwrap();
-
- // Create the TCP listener we'll accept connections on.
- let socket = TcpListener::bind(&addr).unwrap();
- println!("Listening on: {}", addr);
-
- // This is currently a multi threaded server.
- //
- // Once the same thread executor lands, transition to single threaded.
- let connections = Arc::new(Mutex::new(HashMap::new()));
-
- let srv = socket.incoming().for_each(move |stream| {
- let addr = stream.peer_addr().unwrap();
-
- println!("New Connection: {}", addr);
- let (reader, writer) = stream.split();
-
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.lock().unwrap().insert(addr, tx);
-
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_inner = connections.clone();
- let reader = BufReader::new(reader);
-
- // Model the read portion of this socket by mapping an infinite
- // iterator to each line off the socket. This "loop" is then
- // terminated with an error once we hit EOF on the socket.
- let iter = stream::iter_ok::<_, Error>(iter::repeat(()));
- let socket_reader = iter.fold(reader, move |reader, _| {
- // Read a line off the socket, failing if we're at EOF
- let line = io::read_until(reader, b'\n', Vec::new());
- let line = line.and_then(|(reader, vec)| {
- if vec.len() == 0 {
- Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
- } else {
- Ok((reader, vec))
+use std::net::SocketAddr;
+use std::rc::Rc;
+
+/// Shorthand for the transmit half of the message channel.
+type Tx = mpsc::UnboundedSender<Bytes>;
+
+/// Shorthand for the receive half of the message channel.
+type Rx = mpsc::UnboundedReceiver<Bytes>;
+
+/// Data that is shared between all peers in the chat server.
+///
+/// This is the set of `Tx` handles for all connected clients. Whenever a
+/// message is received from a client, it is broadcasted to all peers by
+/// iterating over the `peers` entries and sending a copy of the message on each
+/// `Tx`.
+struct Shared {
+ peers: HashMap<SocketAddr, Tx>,
+}
+
+/// The state for each connected client.
+struct Peer {
+ /// Name of the peer.
+ ///
+ /// When a client connects, the first line sent is treated as the client's
+ /// name (like alice or bob). The name is used to preface all messages that
+ /// arrive from the client so that we can simulate a real chat server:
+ ///
+ /// ```text
+ /// alice: Hello everyone.
+ /// bob: Welcome to telnet chat!
+ /// ```
+ name: BytesMut,
+
+ /// The TCP socket wrapped with the `Lines` codec, defined below.
+ ///
+ /// This handles sending and receiving data on the socket. When using
+ /// `Lines`, we can work at the line level instead of having to manage the
+ /// raw byte operations.
+ lines: Lines,
+
+ /// Handle to the shared chat state.
+ ///
+ /// This is used to broadcast messages read off the socket to all connected
+ /// peers.
+ state: Rc<RefCell<Shared>>,
+
+ /// Receive half of the message channel.
+ ///
+ /// This is used to receive messages from peers. When a message is received
+ /// off of this `Rx`, it will be written to the socket.
+ rx: Rx,
+
+ /// Client socket address.
+ ///
+ /// The socket address is used as the key in the `peers` HashMap. The
+ /// address is saved so that the `Peer` drop implementation can clean up its
+ /// entry.
+ addr: SocketAddr,
+}
+
+/// Line based codec
+///
+/// This decorates a socket and presents a line based read / write interface.
+///
+/// As a user of `Lines`, we can focus on working at the line level. So, we send
+/// and receive values that represent entire lines. The `Lines` codec will
+/// handle the encoding and decoding as well as reading from and writing to the
+/// socket.
+#[derive(Debug)]
+struct Lines {
+ /// The TCP socket.
+ socket: TcpStream,
+
+ /// Buffer used when reading from the socket. Data is not returned from this
+ /// buffer until an entire line has been read.
+ rd: BytesMut,
+
+ /// Buffer used to stage data before writing it to the socket.
+ wr: BytesMut,
+}
+
+impl Shared {
+ /// Create a new, empty, instance of `Shared`.
+ fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+}
+
+impl Peer {
+ /// Create a new instance of `Peer`.
+ fn new(name: BytesMut,
+ state: Rc<RefCell<Shared>>,
+ lines: Lines) -> Peer
+ {
+ // Get the client socket address
+ let addr = lines.socket.peer_addr().unwrap();
+
+ // Create a channel for this peer
+ let (tx, rx) = mpsc::unbounded();
+
+ // Add an entry for this `Peer` in the shared state map.
+ state.borrow_mut().peers.insert(addr, tx);
+
+ Peer {
+ name,
+ lines,
+ state,
+ rx,
+ addr,
+ }
+ }
+}
+
+/// This is where a connected client is managed.
+///
+/// A `Peer` is also a future representing completly processing the client.
+///
+/// When a `Peer` is created, the first line (representing the client's name)
+/// has already been read. When the socket closes, the `Peer` future completes.
+///
+/// While processing, the peer future implementation will:
+///
+/// 1) Receive messages on its message channel and write them to the socket.
+/// 2) Receive messages from the socket and broadcast them to all peers.
+///
+impl Future for Peer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ // Receive all messages from peers.
+ loop {
+ // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
+ // safe.
+ match self.rx.poll().unwrap() {
+ Async::Ready(Some(v)) => {
+ // Buffer the line. Once all lines are buffered, they will
+ // be flushed to the socket (right below).
+ self.lines.buffer(&v);
}
- });
-
- // Convert the bytes we read into a string, and then send that
- // string to all other connected clients.
- let line = line.map(|(reader, vec)| {
- (reader, String::from_utf8(vec))
- });
- let connections = connections_inner.clone();
- line.map(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- let mut conns = connections.lock().unwrap();
- if let Ok(msg) = message {
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns.iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
+ _ => break,
+ }
+ }
+
+ // Flush the write buffer to the socket
+ let _ = self.lines.poll_flush()?;
+
+ // Read new lines from the socket
+ while let Async::Ready(line) = self.lines.poll()? {
+ println!("Received line ({:?}) : {:?}", self.name, line);
+
+ if let Some(message) = line {
+ // Append the peer's name to the front of the line:
+ let mut line = self.name.clone();
+ line.put(": ");
+ line.put(&message);
+ line.put("\r\n");
+
+ // We're using `Bytes`, which allows zero-copy clones (by
+ // storing the data in an Arc internally).
+ //
+ // However, before cloning, we must freeze the data. This
+ // converts it from mutable -> immutable, allowing zero copy
+ // cloning.
+ let line = line.freeze();
+
+ // Now, send the line to all other peers
+ for (addr, tx) in &self.state.borrow().peers {
+ // Don't send the message to ourselves
+ if *addr != self.addr {
+ // The send only fails if the rx half has been dropped,
+ // however this is impossible as the `tx` half will be
+ // removed from the map before the `rx` is dropped.
+ tx.unbounded_send(line.clone()).unwrap();
}
- } else {
- let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
}
- reader
- })
- });
+ } else {
+ // EOF was reached. The remote client has disconnected. There is
+ // nothing more to do.
+ return Ok(Async::Ready(()));
+ }
+ }
+
+ // As always, it is important to not just return `NotReady` without
+ // ensuring an inner future also returned `NotReady`.
+ //
+ // We know we got a `NotReady` from either `self.rx` or `self.lines`, so
+ // the contract is respected.
+ Ok(Async::NotReady)
+ }
+}
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold(writer, |writer, msg| {
- let amt = io::write_all(writer, msg.into_bytes());
- let amt = amt.map(|(writer, _)| writer);
- amt.map_err(|_| ())
+impl Drop for Peer {
+ fn drop(&mut self) {
+ self.state.borrow_mut().peers
+ .remove(&self.addr);
+ }
+}
+
+impl Lines {
+ /// Create a new `Lines` codec backed by the socket
+ fn new(socket: TcpStream) -> Self {
+ Lines {
+ socket,
+ rd: BytesMut::new(),
+ wr: BytesMut::new(),
+ }
+ }
+
+ /// Buffer a line.
+ ///
+ /// This writes the line to an internal buffer. Calls to `poll_flush` will
+ /// attempt to flush this buffer to the socket.
+ fn buffer(&mut self, line: &[u8]) {
+ // Push the line onto the end of the write buffer.
+ //
+ // The `put` function is from the `BufMut` trait.
+ self.wr.put(line);
+ }
+
+ /// Flush the write buffer to the socket
+ fn poll_flush(&mut self) -> Poll<(), io::Error> {
+ // As long as there is buffered data to write, try to write it.
+ while !self.wr.is_empty() {
+ // `try_nb` is kind of like `try_ready`, but for operations that
+ // return `io::Result` instead of `Async`.
+ //
+ // In the case of `io::Result`, an error of `WouldBlock` is
+ // equivalent to `Async::NotReady.
+ let n = try_nb!(self.socket.write(&self.wr));
+
+ // As long as the wr is not empty, a successful write should
+ // never write 0 bytes.
+ assert!(n > 0);
+
+ // This discards the first `n` bytes of the buffer.
+ let _ = self.wr.split_to(n);
+ }
+
+ Ok(Async::Ready(()))
+ }
+
+ /// Read data from the socket.
+ ///
+ /// This only returns `Ready` when the socket has closed.
+ fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
+ loop {
+ // Ensure the read buffer has capacity.
+ //
+ // This might result in an internal allocation.
+ self.rd.reserve(1024);
+
+ // Read data into the buffer.
+ let n = try_ready!(self.socket.read_buf(&mut self.rd));
+
+ if n == 0 {
+ return Ok(Async::Ready(()));
+ }
+ }
+ }
+}
+
+impl Stream for Lines {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First, read any new data that might have been received off the socket
+ let sock_closed = self.fill_read_buf()?.is_ready();
+
+ // Now, try finding lines
+ let pos = self.rd.windows(2).enumerate()
+ .find(|&(_, bytes)| bytes == b"\r\n")
+ .map(|(i, _)| i);
+
+ if let Some(pos) = pos {
+ // Remove the line from the read buffer and set it to `line`.
+ let mut line = self.rd.split_to(pos + 2);
+
+ // Drop the trailing \r\n
+ line.split_off(pos);
+
+ // Return the line
+ return Ok(Async::Ready(Some(line)));
+ }
+
+ if sock_closed {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+/// Spawn a task to manage the socket.
+///
+/// This will read the first line from the socket to identify the client, then
+/// add the client to the set of connected peers in the chat service.
+fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
+ // Wrap the socket with the `Lines` codec that we wrote above.
+ //
+ // By doing this, we can operate at the line level instead of doing raw byte
+ // manipulation.
+ let lines = Lines::new(socket);
+
+ // The first line is treated as the client's name. The client is not added
+ // to the set of connected peers until this line is received.
+ //
+ // We use the `into_future` combinator to extract the first item from the
+ // lines stream. `into_future` takes a `Stream` and converts it to a future
+ // of `(first, rest)` where `rest` is the original stream instance.
+ let connection = lines.into_future()
+ // `into_future` doesn't have the right error type, so map the error to
+ // make it work.
+ .map_err(|(e, _)| e)
+ // Process the first received line as the client's name.
+ .and_then(|(name, lines)| {
+ // If `name` is `None`, then the client disconnected without
+ // actually sending a line of data.
+ //
+ // Since the connection is closed, there is no further work that we
+ // need to do. So, we just terminate processing by returning
+ // `future::ok()`.
+ //
+ // The problem is that only a single future type can be returned
+ // from a combinator closure, but we want to return both
+ // `future::ok()` and `Peer` (below).
+ //
+ // This is a common problem, so the `futures` crate solves this by
+ // providing the `Either` helper enum that allows creating a single
+ // return type that covers two concrete future types.
+ let name = match name {
+ Some(name) => name,
+ None => {
+ // The remote client closed the connection without sending
+ // any data.
+ return Either::A(future::ok(()));
+ }
+ };
+
+ println!("`{:?}` is joining the chat", name);
+
+ // Create the peer.
+ //
+ // This is also a future that processes the connection, only
+ // completing when the socket closes.
+ let peer = Peer::new(
+ name,
+ state,
+ lines);
+
+ // Wrap `peer` with `Either::B` to make the return type fit.
+ Either::B(peer)
+ })
+ // Task futures have an error of type `()`, this ensures we handle the
+ // error. We do this by printing the error to STDOUT.
+ .map_err(|e| {
+ println!("connection error = {:?}", e);
});
- let pool = CpuPool::new(1);
+ // Spawn a new task that processes the socket:
+ current_thread::spawn(connection);
+}
+
+pub fn main() {
+ // Create the shared state. This is how all the peers communicate.
+ //
+ // The server task will hold a handle to this. For every new client, the
+ // `state` handle is cloned and passed into the task that processes the
+ // client connection.
+ let state = Rc::new(RefCell::new(Shared::new()));
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
- pool.execute(connection.then(move |_| {
- connections.lock().unwrap().remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- })).unwrap();
+ let addr = "127.0.0.1:6142".parse().unwrap();
+ // Bind a TCP listener to the socket address.
+ //
+ // Note that this is the Tokio TcpListener, which is fully async.
+ let listener = TcpListener::bind(&addr).unwrap();
+
+ // The server task asynchronously iterates over and processes each
+ // incoming connection.
+ let server = listener.incoming().for_each(move |socket| {
+ process(socket, state.clone());
Ok(())
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("accept error = {:?}", err);
});
- // execute server
- future::blocking(srv).wait().unwrap();
+ // This starts the `current_thread` executor.
+ //
+ // Executors are responsible for scheduling many asynchronous tasks, driving
+ // them to completion. There are a number of different executor
+ // implementations, each providing different scheduling characteristics.
+ //
+ // The `current_thread` executor multiplexes all scheduled tasks on the
+ // current thread. This means that spawned tasks must not implement `Send`.
+ current_thread::run(|_| {
+ // Now, the server task must be spawned.
+ //
+ // It's important to note that all futures / tasks are lazy. No work
+ // will happen unless they are spawned onto an executor.
+ current_thread::spawn(server);
+
+ println!("server running on localhost:6142");
+
+ // The `current_thread::run` function will now block until *all* spawned
+ // tasks complete.
+ //
+ // In our example, we have not defined a shutdown strategy, so
+ // this will block until `ctrl-c` is pressed at the terminal.
+ });
}