summaryrefslogtreecommitdiffstats
path: root/tokio/examples/chat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/examples/chat.rs')
-rw-r--r--tokio/examples/chat.rs473
1 files changed, 473 insertions, 0 deletions
diff --git a/tokio/examples/chat.rs b/tokio/examples/chat.rs
new file mode 100644
index 00000000..b21432af
--- /dev/null
+++ b/tokio/examples/chat.rs
@@ -0,0 +1,473 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This example is explicitly more verbose than it has to be. This is to
+//! illustrate more concepts.
+//!
+//! A chat server for telnet clients. After a telnet client connects, the first
+//! line should contain the client's name. After that, all lines sent by a
+//! client are broadcasted to all other connected clients.
+//!
+//! Because the client is telnet, lines are delimited by "\r\n".
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat
+//!
+//! And then in another terminal run:
+//!
+//! telnet localhost 6142
+//!
+//! You can run the `telnet` command in any number of additional windows.
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![deny(warnings)]
+
+extern crate tokio;
+#[macro_use]
+extern crate futures;
+extern crate bytes;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::future::{self, Either};
+use futures::sync::mpsc;
+use tokio::io;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::{Arc, Mutex};
+
+/// Shorthand for the transmit half of the message channel.
+type Tx = mpsc::UnboundedSender<Bytes>;
+
+/// Shorthand for the receive half of the message channel.
+type Rx = mpsc::UnboundedReceiver<Bytes>;
+
+/// Data that is shared between all peers in the chat server.
+///
+/// This is the set of `Tx` handles for all connected clients. Whenever a
+/// message is received from a client, it is broadcasted to all peers by
+/// iterating over the `peers` entries and sending a copy of the message on each
+/// `Tx`.
+struct Shared {
+ peers: HashMap<SocketAddr, Tx>,
+}
+
+/// The state for each connected client.
+struct Peer {
+ /// Name of the peer.
+ ///
+ /// When a client connects, the first line sent is treated as the client's
+ /// name (like alice or bob). The name is used to preface all messages that
+ /// arrive from the client so that we can simulate a real chat server:
+ ///
+ /// ```text
+ /// alice: Hello everyone.
+ /// bob: Welcome to telnet chat!
+ /// ```
+ name: BytesMut,
+
+ /// The TCP socket wrapped with the `Lines` codec, defined below.
+ ///
+ /// This handles sending and receiving data on the socket. When using
+ /// `Lines`, we can work at the line level instead of having to manage the
+ /// raw byte operations.
+ lines: Lines,
+
+ /// Handle to the shared chat state.
+ ///
+ /// This is used to broadcast messages read off the socket to all connected
+ /// peers.
+ state: Arc<Mutex<Shared>>,
+
+ /// Receive half of the message channel.
+ ///
+ /// This is used to receive messages from peers. When a message is received
+ /// off of this `Rx`, it will be written to the socket.
+ rx: Rx,
+
+ /// Client socket address.
+ ///
+ /// The socket address is used as the key in the `peers` HashMap. The
+ /// address is saved so that the `Peer` drop implementation can clean up its
+ /// entry.
+ addr: SocketAddr,
+}
+
+/// Line based codec
+///
+/// This decorates a socket and presents a line based read / write interface.
+///
+/// As a user of `Lines`, we can focus on working at the line level. So, we send
+/// and receive values that represent entire lines. The `Lines` codec will
+/// handle the encoding and decoding as well as reading from and writing to the
+/// socket.
+#[derive(Debug)]
+struct Lines {
+ /// The TCP socket.
+ socket: TcpStream,
+
+ /// Buffer used when reading from the socket. Data is not returned from this
+ /// buffer until an entire line has been read.
+ rd: BytesMut,
+
+ /// Buffer used to stage data before writing it to the socket.
+ wr: BytesMut,
+}
+
+impl Shared {
+ /// Create a new, empty, instance of `Shared`.
+ fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+}
+
+impl Peer {
+ /// Create a new instance of `Peer`.
+ fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
+ // Get the client socket address
+ let addr = lines.socket.peer_addr().unwrap();
+
+ // Create a channel for this peer
+ let (tx, rx) = mpsc::unbounded();
+
+ // Add an entry for this `Peer` in the shared state map.
+ state.lock().unwrap().peers.insert(addr, tx);
+
+ Peer {
+ name,
+ lines,
+ state,
+ rx,
+ addr,
+ }
+ }
+}
+
+/// This is where a connected client is managed.
+///
+/// A `Peer` is also a future representing completely processing the client.
+///
+/// When a `Peer` is created, the first line (representing the client's name)
+/// has already been read. When the socket closes, the `Peer` future completes.
+///
+/// While processing, the peer future implementation will:
+///
+/// 1) Receive messages on its message channel and write them to the socket.
+/// 2) Receive messages from the socket and broadcast them to all peers.
+///
+impl Future for Peer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ // Tokio (and futures) use cooperative scheduling without any
+ // preemption. If a task never yields execution back to the executor,
+ // then other tasks may be starved.
+ //
+ // To deal with this, robust applications should not have any unbounded
+ // loops. In this example, we will read at most `LINES_PER_TICK` lines
+ // from the client on each tick.
+ //
+ // If the limit is hit, the current task is notified, informing the
+ // executor to schedule the task again asap.
+ const LINES_PER_TICK: usize = 10;
+
+ // Receive all messages from peers.
+ for i in 0..LINES_PER_TICK {
+ // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
+ // safe.
+ match self.rx.poll().unwrap() {
+ Async::Ready(Some(v)) => {
+ // Buffer the line. Once all lines are buffered, they will
+ // be flushed to the socket (right below).
+ self.lines.buffer(&v);
+
+ // If this is the last iteration, the loop will break even
+ // though there could still be lines to read. Because we did
+ // not reach `Async::NotReady`, we have to notify ourselves
+ // in order to tell the executor to schedule the task again.
+ if i + 1 == LINES_PER_TICK {
+ task::current().notify();
+ }
+ }
+ _ => break,
+ }
+ }
+
+ // Flush the write buffer to the socket
+ let _ = self.lines.poll_flush()?;
+
+ // Read new lines from the socket
+ while let Async::Ready(line) = self.lines.poll()? {
+ println!("Received line ({:?}) : {:?}", self.name, line);
+
+ if let Some(message) = line {
+ // Append the peer's name to the front of the line:
+ let mut line = self.name.clone();
+ line.extend_from_slice(b": ");
+ line.extend_from_slice(&message);
+ line.extend_from_slice(b"\r\n");
+
+ // We're using `Bytes`, which allows zero-copy clones (by
+ // storing the data in an Arc internally).
+ //
+ // However, before cloning, we must freeze the data. This
+ // converts it from mutable -> immutable, allowing zero copy
+ // cloning.
+ let line = line.freeze();
+
+ // Now, send the line to all other peers
+ for (addr, tx) in &self.state.lock().unwrap().peers {
+ // Don't send the message to ourselves
+ if *addr != self.addr {
+ // The send only fails if the rx half has been dropped,
+ // however this is impossible as the `tx` half will be
+ // removed from the map before the `rx` is dropped.
+ tx.unbounded_send(line.clone()).unwrap();
+ }
+ }
+ } else {
+ // EOF was reached. The remote client has disconnected. There is
+ // nothing more to do.
+ return Ok(Async::Ready(()));
+ }
+ }
+
+ // As always, it is important to not just return `NotReady` without
+ // ensuring an inner future also returned `NotReady`.
+ //
+ // We know we got a `NotReady` from either `self.rx` or `self.lines`, so
+ // the contract is respected.
+ Ok(Async::NotReady)
+ }
+}
+
+impl Drop for Peer {
+ fn drop(&mut self) {
+ self.state.lock().unwrap().peers.remove(&self.addr);
+ }
+}
+
+impl Lines {
+ /// Create a new `Lines` codec backed by the socket
+ fn new(socket: TcpStream) -> Self {
+ Lines {
+ socket,
+ rd: BytesMut::new(),
+ wr: BytesMut::new(),
+ }
+ }
+
+ /// Buffer a line.
+ ///
+ /// This writes the line to an internal buffer. Calls to `poll_flush` will
+ /// attempt to flush this buffer to the socket.
+ fn buffer(&mut self, line: &[u8]) {
+ // Ensure the buffer has capacity. Ideally this would not be unbounded,
+ // but to keep the example simple, we will not limit this.
+ self.wr.reserve(line.len());
+
+ // Push the line onto the end of the write buffer.
+ //
+ // The `put` function is from the `BufMut` trait.
+ self.wr.put(line);
+ }
+
+ /// Flush the write buffer to the socket
+ fn poll_flush(&mut self) -> Poll<(), io::Error> {
+ // As long as there is buffered data to write, try to write it.
+ while !self.wr.is_empty() {
+ // Try to write some bytes to the socket
+ let n = try_ready!(self.socket.poll_write(&self.wr));
+
+ // As long as the wr is not empty, a successful write should
+ // never write 0 bytes.
+ assert!(n > 0);
+
+ // This discards the first `n` bytes of the buffer.
+ let _ = self.wr.split_to(n);
+ }
+
+ Ok(Async::Ready(()))
+ }
+
+ /// Read data from the socket.
+ ///
+ /// This only returns `Ready` when the socket has closed.
+ fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
+ loop {
+ // Ensure the read buffer has capacity.
+ //
+ // This might result in an internal allocation.
+ self.rd.reserve(1024);
+
+ // Read data into the buffer.
+ let n = try_ready!(self.socket.read_buf(&mut self.rd));
+
+ if n == 0 {
+ return Ok(Async::Ready(()));
+ }
+ }
+ }
+}
+
+impl Stream for Lines {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First, read any new data that might have been received off the socket
+ let sock_closed = self.fill_read_buf()?.is_ready();
+
+ // Now, try finding lines
+ let pos = self
+ .rd
+ .windows(2)
+ .enumerate()
+ .find(|&(_, bytes)| bytes == b"\r\n")
+ .map(|(i, _)| i);
+
+ if let Some(pos) = pos {
+ // Remove the line from the read buffer and set it to `line`.
+ let mut line = self.rd.split_to(pos + 2);
+
+ // Drop the trailing \r\n
+ line.split_off(pos);
+
+ // Return the line
+ return Ok(Async::Ready(Some(line)));
+ }
+
+ if sock_closed {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+/// Spawn a task to manage the socket.
+///
+/// This will read the first line from the socket to identify the client, then
+/// add the client to the set of connected peers in the chat service.
+fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
+ // Wrap the socket with the `Lines` codec that we wrote above.
+ //
+ // By doing this, we can operate at the line level instead of doing raw byte
+ // manipulation.
+ let lines = Lines::new(socket);
+
+ // The first line is treated as the client's name. The client is not added
+ // to the set of connected peers until this line is received.
+ //
+ // We use the `into_future` combinator to extract the first item from the
+ // lines stream. `into_future` takes a `Stream` and converts it to a future
+ // of `(first, rest)` where `rest` is the original stream instance.
+ let connection = lines
+ .into_future()
+ // `into_future` doesn't have the right error type, so map the error to
+ // make it work.
+ .map_err(|(e, _)| e)
+ // Process the first received line as the client's name.
+ .and_then(|(name, lines)| {
+ // If `name` is `None`, then the client disconnected without
+ // actually sending a line of data.
+ //
+ // Since the connection is closed, there is no further work that we
+ // need to do. So, we just terminate processing by returning
+ // `future::ok()`.
+ //
+ // The problem is that only a single future type can be returned
+ // from a combinator closure, but we want to return both
+ // `future::ok()` and `Peer` (below).
+ //
+ // This is a common problem, so the `futures` crate solves this by
+ // providing the `Either` helper enum that allows creating a single
+ // return type that covers two concrete future types.
+ let name = match name {
+ Some(name) => name,
+ None => {
+ // The remote client closed the connection without sending
+ // any data.
+ return Either::A(future::ok(()));
+ }
+ };
+
+ println!("`{:?}` is joining the chat", name);
+
+ // Create the peer.
+ //
+ // This is also a future that processes the connection, only
+ // completing when the socket closes.
+ let peer = Peer::new(name, state, lines);
+
+ // Wrap `peer` with `Either::B` to make the return type fit.
+ Either::B(peer)
+ })
+ // Task futures have an error of type `()`, this ensures we handle the
+ // error. We do this by printing the error to STDOUT.
+ .map_err(|e| {
+ println!("connection error = {:?}", e);
+ });
+
+ // Spawn the task. Internally, this submits the task to a thread pool.
+ tokio::spawn(connection);
+}
+
+pub fn main() -> Result<(), Box<std::error::Error>> {
+ // Create the shared state. This is how all the peers communicate.
+ //
+ // The server task will hold a handle to this. For every new client, the
+ // `state` handle is cloned and passed into the task that processes the
+ // client connection.
+ let state = Arc::new(Mutex::new(Shared::new()));
+
+ let addr = "127.0.0.1:6142".parse()?;
+
+ // Bind a TCP listener to the socket address.
+ //
+ // Note that this is the Tokio TcpListener, which is fully async.
+ let listener = TcpListener::bind(&addr)?;
+
+ // The server task asynchronously iterates over and processes each
+ // incoming connection.
+ let server = listener
+ .incoming()
+ .for_each(move |socket| {
+ // Spawn a task to process the connection
+ process(socket, state.clone());
+ Ok(())
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("accept error = {:?}", err);
+ });
+
+ println!("server running on localhost:6142");
+
+ // Start the Tokio runtime.
+ //
+ // The Tokio is a pre-configured "out of the box" runtime for building
+ // asynchronous applications. It includes both a reactor and a task
+ // scheduler. This means applications are multithreaded by default.
+ //
+ // This function blocks until the runtime reaches an idle state. Idle is
+ // defined as all spawned tasks have completed and all I/O resources (TCP
+ // sockets in our case) have been dropped.
+ //
+ // In our example, we have not defined a shutdown strategy, so this will
+ // block until `ctrl-c` is pressed at the terminal.
+ tokio::run(server);
+ Ok(())
+}