diff options
author | Eliza Weisman <eliza@buoyant.io> | 2019-03-22 15:25:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-22 15:25:42 -0700 |
commit | 30330da11a56dfdd11bdbef50dba073a9edc36b2 (patch) | |
tree | bf4e8e90293a3c75a2bf5281572e1c01eceab3cb /examples | |
parent | 6e4945025cdc6f2b71d9b30aaa23c5517cca1504 (diff) |
chore: Fix examples not working with `cargo run` (#998)
* chore: Fix examples not working with `cargo run`
## Motivation
PR #991 moved the `tokio` crate to its own subdirectory, but did not
move the `examples` directory into `tokio/examples`. While attempting to
use the examples for testing another change, I noticed that #991 had
broken the ability to use `cargo run`, as the examples were no longer
considered part of a crate that cargo was aware of:
```
tokio on master [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev
➜ cargo run --example chat
error: no example target named `chat`
Did you mean `echo`?
```
## Solution
This branch moves the examples into the `tokio` directory, so cargo is
now once again aware of them:
```
tokio on eliza/fix-examples [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev
➜ cargo run --example chat
Compiling tokio-executor v0.1.7 (/Users/eliza/Code/tokio/tokio-executor)
Compiling tokio-reactor v0.1.9
Compiling tokio-threadpool v0.1.13
Compiling tokio-current-thread v0.1.6
Compiling tokio-timer v0.2.10
Compiling tokio-uds v0.2.5
Compiling tokio-udp v0.1.3
Compiling tokio-tcp v0.1.3
Compiling tokio-fs v0.1.6
Compiling tokio v0.1.18 (/Users/eliza/Code/tokio/tokio)
Finished dev [unoptimized + debuginfo] target(s) in 7.04s
Running `target/debug/examples/chat`
server running on localhost:6142
```
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/README.md | 60 | ||||
-rw-r--r-- | examples/chat-combinator-current-thread.rs | 172 | ||||
-rw-r--r-- | examples/chat-combinator.rs | 156 | ||||
-rw-r--r-- | examples/chat.rs | 473 | ||||
-rw-r--r-- | examples/connect.rs | 257 | ||||
-rw-r--r-- | examples/echo-udp.rs | 74 | ||||
-rw-r--r-- | examples/echo.rs | 115 | ||||
-rw-r--r-- | examples/hello_world.rs | 58 | ||||
-rw-r--r-- | examples/manual-runtime.rs | 87 | ||||
-rw-r--r-- | examples/print_each_packet.rs | 150 | ||||
-rw-r--r-- | examples/proxy.rs | 130 | ||||
-rw-r--r-- | examples/tinydb.rs | 227 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 325 | ||||
-rw-r--r-- | examples/udp-client.rs | 70 | ||||
-rw-r--r-- | examples/udp-codec.rs | 65 |
15 files changed, 0 insertions, 2419 deletions
diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index 63634c82..00000000 --- a/examples/README.md +++ /dev/null @@ -1,60 +0,0 @@ -## Examples of how to use Tokio - -This directory contains a number of examples showcasing various capabilities of -the `tokio` crate. - -All examples can be executed with: - -``` -cargo run --example $name -``` - -A high level description of each example is: - -* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to - all connected clients and then terminates the connection, should help see how - to create and initialize `tokio`. - -* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts - connections and then echos back any contents that are read from each connected - client. - -* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP - listener, accept connections in a loop, and put down in the stdout everything - that's read off of each TCP connection. - -* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP - instead of TCP. This will echo back any packets received to the original - sender. - -* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to - interact with most other examples. The program creates a TCP connection or UDP - socket to sends all information read on stdin to the remote peer, displaying - any data received on stdout. Often quite useful when interacting with the - various other servers here! - -* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from - any connected client to all other connected clients. You can connect to this - in multiple terminals and use it to chat between the terminals. - -* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a - much more functional programming approach using combinators. - -* [`proxy`](proxy.rs) - an example proxy server that will forward all connected - TCP clients to the remote address specified when starting the program. - -* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP - request bodies showcasing running on multiple cores, working with futures and - spawning tasks, and finally framing a TCP connection to discrete - request/response objects. - -* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state - between all connected clients, notably the key/value store of this database. - -* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example. - -* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime. - -If you've got an example you'd like to see here, please feel free to open an -issue. Otherwise if you've got an example you'd like to add, please feel free -to make a PR! diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs deleted file mode 100644 index ee147025..00000000 --- a/examples/chat-combinator-current-thread.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a line-based server which accepts connections, reads lines from -//! those connections, and broadcasts the lines to all other connected clients. -//! -//! This example is similar to chat.rs, but uses combinators and a much more -//! functional style. -//! -//! Because we are here running the reactor/executor on the same thread instead -//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use -//! Rc + RefCell instead. The max performance is however limited to a CPU HW -//! thread. -//! -//! You can test this out by running: -//! -//! cargo run --example chat-combinator-current-thread -//! -//! And then in another window run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; -use tokio::runtime::current_thread::{Runtime, TaskExecutor}; - -use std::cell::RefCell; -use std::collections::HashMap; -use std::env; -use std::io::BufReader; -use std::iter; -use std::rc::Rc; - -fn main() -> Result<(), Box<std::error::Error>> { - let mut runtime = Runtime::new().unwrap(); - - // Create the TCP listener we'll accept connections on. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse()?; - - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // This is running on the Tokio current_thread runtime, so it will be single- - // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks. - let connections = Rc::new(RefCell::new(HashMap::new())); - - // The server task asynchronously iterates over and processes each incoming - // connection. - let srv = socket - .incoming() - .map_err(|e| { - println!("failed to accept socket; error = {:?}", e); - e - }) - .for_each(move |stream| { - // The client's socket address - let addr = stream.peer_addr()?; - - println!("New Connection: {}", addr); - - // Split the TcpStream into two separate handles. One handle for reading - // and one handle for writing. This lets us use separate tasks for - // reading and writing. - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - let mut conns = connections.borrow_mut(); - conns.insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); - - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) - } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); - - // Move the connection state into the closure below. - let connections = connections_inner.clone(); - - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.borrow_mut(); - - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns - .iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); - } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()) - .unwrap(); - } - - reader - }) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf<TcpStream>`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - - // Spawn locally a task to process the connection - TaskExecutor::current() - .spawn_local(Box::new(connection.then(move |_| { - let mut conns = connections.borrow_mut(); - conns.remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - }))) - .unwrap(); - - Ok(()) - }) - .map_err(|err| println!("error occurred: {:?}", err)); - - // Spawn srv itself - runtime.spawn(srv); - - // Execute server - runtime.run().unwrap(); - Ok(()) -} diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs deleted file mode 100644 index b81e8f7c..00000000 --- a/examples/chat-combinator.rs +++ /dev/null @@ -1,156 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a line-based server which accepts connections, reads lines from -//! those connections, and broadcasts the lines to all other connected clients. -//! -//! This example is similar to chat.rs, but uses combinators and a much more -//! functional style. -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another window run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; - -use std::collections::HashMap; -use std::env; -use std::io::BufReader; -use std::iter; -use std::sync::{Arc, Mutex}; - -fn main() -> Result<(), Box<std::error::Error>> { - // Create the TCP listener we'll accept connections on. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse()?; - - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // This is running on the Tokio runtime, so it will be multi-threaded. The - // `Arc<Mutex<...>>` allows state to be shared across the threads. - let connections = Arc::new(Mutex::new(HashMap::new())); - - // The server task asynchronously iterates over and processes each incoming - // connection. - let srv = socket - .incoming() - .map_err(|e| { - println!("failed to accept socket; error = {:?}", e); - e - }) - .for_each(move |stream| { - // The client's socket address - let addr = stream.peer_addr()?; - - println!("New Connection: {}", addr); - - // Split the TcpStream into two separate handles. One handle for reading - // and one handle for writing. This lets us use separate tasks for - // reading and writing. - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - connections.lock().unwrap().insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); - - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) - } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); - - // Move the connection state into the closure below. - let connections = connections_inner.clone(); - - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.lock().unwrap(); - - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns - .iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); - } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()) - .unwrap(); - } - - reader - }) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf<TcpStream>`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - - // Spawn a task to process the connection - tokio::spawn(connection.then(move |_| { - connections.lock().unwrap().remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - })); - - Ok(()) - }) - .map_err(|err| println!("error occurred: {:?}", err)); - - // execute server - tokio::run(srv); - Ok(()) -} diff --git a/examples/chat.rs b/examples/chat.rs deleted file mode 100644 index b21432af..00000000 --- a/examples/chat.rs +++ /dev/null @@ -1,473 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This example is explicitly more verbose than it has to be. This is to -//! illustrate more concepts. -//! -//! A chat server for telnet clients. After a telnet client connects, the first -//! line should contain the client's name. After that, all lines sent by a -//! client are broadcasted to all other connected clients. -//! -//! Because the client is telnet, lines are delimited by "\r\n". -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another terminal run: -//! -//! telnet localhost 6142 -//! -//! You can run the `telnet` command in any number of additional windows. -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate tokio; -#[macro_use] -extern crate futures; -extern crate bytes; - -use bytes::{BufMut, Bytes, BytesMut}; -use futures::future::{self, Either}; -use futures::sync::mpsc; -use tokio::io; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; - -/// Shorthand for the transmit half of the message channel. -type Tx = mpsc::UnboundedSender<Bytes>; - -/// Shorthand for the receive half of the message channel. -type Rx = mpsc::UnboundedReceiver<Bytes>; - -/// Data that is shared between all peers in the chat server. -/// -/// This is the set of `Tx` handles for all connected clients. Whenever a -/// message is received from a client, it is broadcasted to all peers by -/// iterating over the `peers` entries and sending a copy of the message on each -/// `Tx`. -struct Shared { - peers: HashMap<SocketAddr, Tx>, -} - -/// The state for each connected client. -struct Peer { - /// Name of the peer. - /// - /// When a client connects, the first line sent is treated as the client's - /// name (like alice or bob). The name is used to preface all messages that - /// arrive from the client so that we can simulate a real chat server: - /// - /// ```text - /// alice: Hello everyone. - /// bob: Welcome to telnet chat! - /// ``` - name: BytesMut, - - /// The TCP socket wrapped with the `Lines` codec, defined below. - /// - /// This handles sending and receiving data on the socket. When using - /// `Lines`, we can work at the line level instead of having to manage the - /// raw byte operations. - lines: Lines, - - /// Handle to the shared chat state. - /// - /// This is used to broadcast messages read off the socket to all connected - /// peers. - state: Arc<Mutex<Shared>>, - - /// Receive half of the message channel. - /// - /// This is used to receive messages from peers. When a message is received - /// off of this `Rx`, it will be written to the socket. - rx: Rx, - - /// Client socket address. - /// - /// The socket address is used as the key in the `peers` HashMap. The - /// address is saved so that the `Peer` drop implementation can clean up its - /// entry. - addr: SocketAddr, -} - -/// Line based codec -/// -/// This decorates a socket and presents a line based read / write interface. -/// -/// As a user of `Lines`, we can focus on working at the line level. So, we send -/// and receive values that represent entire lines. The `Lines` codec will -/// handle the encoding and decoding as well as reading from and writing to the -/// socket. -#[derive(Debug)] -struct Lines { - /// The TCP socket. - socket: TcpStream, - - /// Buffer used when reading from the socket. Data is not returned from this - /// buffer until an entire line has been read. - rd: BytesMut, - - /// Buffer used to stage data before writing it to the socket. - wr: BytesMut, -} - -impl Shared { - /// Create a new, empty, instance of `Shared`. - fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } -} - -impl Peer { - /// Create a new instance of `Peer`. - fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer { - // Get the client socket address - let addr = lines.socket.peer_addr().unwrap(); - - // Create a channel for this peer - let (tx, rx) = mpsc::unbounded(); - - // Add an entry for this `Peer` in the shared state map. - state.lock().unwrap().peers.insert(addr, tx); - - Peer { - name, - lines, - state, - rx, - addr, - } - } -} - -/// This is where a connected client is managed. -/// -/// A `Peer` is also a future representing completely processing the client. -/// -/// When a `Peer` is created, the first line (representing the client's name) -/// has already been read. When the socket closes, the `Peer` future completes. -/// -/// While processing, the peer future implementation will: -/// -/// 1) Receive messages on its message channel and write them to the socket. -/// 2) Receive messages from the socket and broadcast them to all peers. -/// -impl Future for Peer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - // Tokio (and futures) use cooperative scheduling without any - // preemption. If a task never yields execution back to the executor, - // then other tasks may be starved. - // - // To deal with this, robust applications should not have any unbounded - // loops. In this example, we will read at most `LINES_PER_TICK` lines - // from the client on each tick. - // - // If the limit is hit, the current task is notified, informing the - // executor to schedule the task again asap. - const LINES_PER_TICK: usize = 10; - - // Receive all messages from peers. - for i in 0..LINES_PER_TICK { - // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is - // safe. - match self.rx.poll().unwrap() { - Async::Ready(Some(v)) => { - // Buffer the line. Once all lines are buffered, they will - // be flushed to the socket (right below). - self.lines.buffer(&v); - - // If this is the last iteration, the loop will break even - // though there could still be lines to read. Because we did - // not reach `Async::NotReady`, we have to notify ourselves - // in order to tell the executor to schedule the task again. - if i + 1 == LINES_PER_TICK { - task::current().notify(); - } - } - _ => break, - } - } - - // Flush the write buffer to the socket - let _ = self.lines.poll_flush()?; - - // Read new lines from the socket - while let Async::Ready(line) = self.lines.poll()? { - println!("Received line ({:?}) : {:?}", self.name, line); - - if let Some(message) = line { - // Append the peer's name to the front of the line: - let mut line = self.name.clone(); - line.extend_from_slice(b": "); - line.extend_from_slice(&message); - line.extend_from_slice(b"\r\n"); - - // We're using `Bytes`, which allows zero-copy clones (by - // storing the data in an Arc internally). - // - // However, before cloning, we must freeze the data. This - // converts it from mutable -> immutable, allowing zero copy - // cloning. - let line = line.freeze(); - - // Now, send the line to all other peers - for (addr, tx) in &self.state.lock().unwrap().peers { - // Don't send the message to ourselves - if *addr != self.addr { - // The send only fails if the rx half has been dropped, - // however this is impossible as the `tx` half will be - // removed from the map before the `rx` is dropped. - tx.unbounded_send(line.clone()).unwrap(); - } - } - } else { - // EOF was reached. The remote client has disconnected. There is - // nothing more to do. - return Ok(Async::Ready(())); - } - } - - // As always, it is important to not just return `NotReady` without - // ensuring an inner future also returned `NotReady`. - // - // We know we got a `NotReady` from either `self.rx` or `self.lines`, so - // the contract is respected. - Ok(Async::NotReady) - } -} - -impl Drop for Peer { - fn drop(&mut self) { - self.state.lock().unwrap().peers.remove(&self.addr); - } -} - -impl Lines { - /// Create a new `Lines` codec backed by the socket - fn new(socket: TcpStream) -> Self { - Lines { - socket, - rd: BytesMut::new(), - wr: BytesMut::new(), - } - } - - /// Buffer a line. - /// - /// This writes the line to an internal buffer. Calls to `poll_flush` will - /// attempt to flush this buffer to the socket. - fn buffer(&mut self, line: &[u8]) { - // Ensure the buffer has capacity. Ideally this would not be unbounded, - // but to keep the example simple, we will not limit this. - self.wr.reserve(line.len()); - - // Push the line onto the end of the write buffer. - // - // The `put` function is from the `BufMut` trait. - self.wr.put(line); - } - - /// Flush the write buffer to the socket - fn poll_flush(&mut self) -> Poll<(), io::Error> { - // As long as there is buffered data to write, try to write it. - while !self.wr.is_empty() { - // Try to write some bytes to the socket - let n = try_ready!(self.socket.poll_write(&self.wr)); - - // As long as the wr is not empty, a successful write should - // never write 0 bytes. - assert!(n > 0); - - // This discards the first `n` bytes of the buffer. - let _ = self.wr.split_to(n); - } - - Ok(Async::Ready(())) - } - - /// Read data from the socket. - /// - /// This only returns `Ready` when the socket has closed. - fn fill_read_buf(&mut self) -> Poll<(), io::Error> { - loop { - // Ensure the read buffer has capacity. - // - // This might result in an internal allocation. - self.rd.reserve(1024); - - // Read data into the buffer. - let n = try_ready!(self.socket.read_buf(&mut self.rd)); - - if n == 0 { - return Ok(Async::Ready(())); - } - } - } -} - -impl Stream for Lines { - type Item = BytesMut; - type Error = io::Error; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - // First, read any new data that might have been received off the socket - let sock_closed = self.fill_read_buf()?.is_ready(); - - // Now, try finding lines - let pos = self - .rd - .windows(2) - .enumerate() - .find(|&(_, bytes)| bytes == b"\r\n") - .map(|(i, _)| i); - - if let Some(pos) = pos { - // Remove the line from the read buffer and set it to `line`. - let mut line = self.rd.split_to(pos + 2); - - // Drop the trailing \r\n - line.split_off(pos); - - // Return the line - return Ok(Async::Ready(Some(line))); - } - - if sock_closed { - Ok(Async::Ready(None)) - } else { - Ok(Async::NotReady) - } - } -} - -/// Spawn a task to manage the socket. -/// -/// This will read the first line from the socket to identify the client, then -/// add the client to the set of connected peers in the chat service. -fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { - // Wrap the socket with the `Lines` codec that we wrote above. - // - // By doing this, we can operate at the line level instead of doing raw byte - // manipulation. - let lines = Lines::new(socket); - - // The first line is treated as the client's name. The client is not added - // to the set of connected peers until this line is received. - // - // We use the `into_future` combinator to extract the first item from the - // lines stream. `into_future` takes a `Stream` and converts it to a future - // of `(first, rest)` where `rest` is the original stream instance. - let connection = lines - .into_future() - // `into_future` doesn't have the right error type, so map the error to - // make it work. - .map_err(|(e, _)| e) - // Process the first received line as the client's name. - .and_then(|(name, lines)| { - // If `name` is `None`, then the client disconnected without - // actually sending a line of data. - // - // Since the connection is closed, there is no further work that we - // need to do. So, we just terminate processing by returning - // `future::ok()`. - // - // The problem is that only a single future type can be returned - // from a combinator closure, but we want to return both - // `future::ok()` and `Peer` (below). - // - // This is a common problem, so the `futures` crate solves this by - // providing the `Either` helper enum that allows creating a single - // return type that covers two concrete future types. - let name = match name { - Some(name) => name, - None => { - // The remote client closed the connection without sending - // any data. - return Either::A(future::ok(())); - } - }; - - println!("`{:?}` is joining the chat", name); - - // Create the peer. - // - // This is also a future that processes the connection, only - // completing when the socket closes. - let peer = Peer::new(name, state, lines); - - // Wrap `peer` with `Either::B` to make the return type fit. - Either::B(peer) - }) - // Task futures have an error of type `()`, this ensures we handle the - // error. We do this by printing the error to STDOUT. - .map_err(|e| { - println!("connection error = {:?}", e); - }); - - // Spawn the task. Internally, this submits the task to a thread pool. - tokio::spawn(connection); -} - -pub fn main() -> Result<(), Box<std::error::Error>> { - // Create the shared state. This is how all the peers communicate. - // - // The server task will hold a handle to this. For every new client, the - // `state` handle is cloned and passed into the task that processes the - // client connection. - let state = Arc::new(Mutex::new(Shared::new())); - - let addr = "127.0.0.1:6142".parse()?; - - // Bind a TCP listener to the socket address. - // - // Note that this is the Tokio TcpListener, which is fully async. - let listener = TcpListener::bind(&addr)?; - - // The server task asynchronously iterates over and processes each - // incoming connection. - let server = listener - .incoming() - .for_each(move |socket| { - // Spawn a task to process the connection - process(socket, state.clone()); - Ok(()) - }) - .map_err(|err| { - // All tasks must have an `Error` |