summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-03-22 15:25:42 -0700
committerGitHub <noreply@github.com>2019-03-22 15:25:42 -0700
commit30330da11a56dfdd11bdbef50dba073a9edc36b2 (patch)
treebf4e8e90293a3c75a2bf5281572e1c01eceab3cb /examples
parent6e4945025cdc6f2b71d9b30aaa23c5517cca1504 (diff)
chore: Fix examples not working with `cargo run` (#998)
* chore: Fix examples not working with `cargo run` ## Motivation PR #991 moved the `tokio` crate to its own subdirectory, but did not move the `examples` directory into `tokio/examples`. While attempting to use the examples for testing another change, I noticed that #991 had broken the ability to use `cargo run`, as the examples were no longer considered part of a crate that cargo was aware of: ``` tokio on master [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev ➜ cargo run --example chat error: no example target named `chat` Did you mean `echo`? ``` ## Solution This branch moves the examples into the `tokio` directory, so cargo is now once again aware of them: ``` tokio on eliza/fix-examples [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev ➜ cargo run --example chat Compiling tokio-executor v0.1.7 (/Users/eliza/Code/tokio/tokio-executor) Compiling tokio-reactor v0.1.9 Compiling tokio-threadpool v0.1.13 Compiling tokio-current-thread v0.1.6 Compiling tokio-timer v0.2.10 Compiling tokio-uds v0.2.5 Compiling tokio-udp v0.1.3 Compiling tokio-tcp v0.1.3 Compiling tokio-fs v0.1.6 Compiling tokio v0.1.18 (/Users/eliza/Code/tokio/tokio) Finished dev [unoptimized + debuginfo] target(s) in 7.04s Running `target/debug/examples/chat` server running on localhost:6142 ``` Signed-off-by: Eliza Weisman <eliza@buoyant.io> Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'examples')
-rw-r--r--examples/README.md60
-rw-r--r--examples/chat-combinator-current-thread.rs172
-rw-r--r--examples/chat-combinator.rs156
-rw-r--r--examples/chat.rs473
-rw-r--r--examples/connect.rs257
-rw-r--r--examples/echo-udp.rs74
-rw-r--r--examples/echo.rs115
-rw-r--r--examples/hello_world.rs58
-rw-r--r--examples/manual-runtime.rs87
-rw-r--r--examples/print_each_packet.rs150
-rw-r--r--examples/proxy.rs130
-rw-r--r--examples/tinydb.rs227
-rw-r--r--examples/tinyhttp.rs325
-rw-r--r--examples/udp-client.rs70
-rw-r--r--examples/udp-codec.rs65
15 files changed, 0 insertions, 2419 deletions
diff --git a/examples/README.md b/examples/README.md
deleted file mode 100644
index 63634c82..00000000
--- a/examples/README.md
+++ /dev/null
@@ -1,60 +0,0 @@
-## Examples of how to use Tokio
-
-This directory contains a number of examples showcasing various capabilities of
-the `tokio` crate.
-
-All examples can be executed with:
-
-```
-cargo run --example $name
-```
-
-A high level description of each example is:
-
-* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to
- all connected clients and then terminates the connection, should help see how
- to create and initialize `tokio`.
-
-* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts
- connections and then echos back any contents that are read from each connected
- client.
-
-* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP
- listener, accept connections in a loop, and put down in the stdout everything
- that's read off of each TCP connection.
-
-* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP
- instead of TCP. This will echo back any packets received to the original
- sender.
-
-* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to
- interact with most other examples. The program creates a TCP connection or UDP
- socket to sends all information read on stdin to the remote peer, displaying
- any data received on stdout. Often quite useful when interacting with the
- various other servers here!
-
-* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from
- any connected client to all other connected clients. You can connect to this
- in multiple terminals and use it to chat between the terminals.
-
-* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a
- much more functional programming approach using combinators.
-
-* [`proxy`](proxy.rs) - an example proxy server that will forward all connected
- TCP clients to the remote address specified when starting the program.
-
-* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP
- request bodies showcasing running on multiple cores, working with futures and
- spawning tasks, and finally framing a TCP connection to discrete
- request/response objects.
-
-* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state
- between all connected clients, notably the key/value store of this database.
-
-* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example.
-
-* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime.
-
-If you've got an example you'd like to see here, please feel free to open an
-issue. Otherwise if you've got an example you'd like to add, please feel free
-to make a PR!
diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs
deleted file mode 100644
index ee147025..00000000
--- a/examples/chat-combinator-current-thread.rs
+++ /dev/null
@@ -1,172 +0,0 @@
-//! A chat server that broadcasts a message to all connections.
-//!
-//! This is a line-based server which accepts connections, reads lines from
-//! those connections, and broadcasts the lines to all other connected clients.
-//!
-//! This example is similar to chat.rs, but uses combinators and a much more
-//! functional style.
-//!
-//! Because we are here running the reactor/executor on the same thread instead
-//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use
-//! Rc + RefCell instead. The max performance is however limited to a CPU HW
-//! thread.
-//!
-//! You can test this out by running:
-//!
-//! cargo run --example chat-combinator-current-thread
-//!
-//! And then in another window run:
-//!
-//! cargo run --example connect 127.0.0.1:8080
-//!
-//! You can run the second command in multiple windows and then chat between the
-//! two, seeing the messages from the other client as they're received. For all
-//! connected clients they'll all join the same room and see everyone else's
-//! messages.
-
-#![deny(warnings)]
-
-extern crate futures;
-extern crate tokio;
-
-use tokio::io;
-use tokio::net::TcpListener;
-use tokio::prelude::*;
-use tokio::runtime::current_thread::{Runtime, TaskExecutor};
-
-use std::cell::RefCell;
-use std::collections::HashMap;
-use std::env;
-use std::io::BufReader;
-use std::iter;
-use std::rc::Rc;
-
-fn main() -> Result<(), Box<std::error::Error>> {
- let mut runtime = Runtime::new().unwrap();
-
- // Create the TCP listener we'll accept connections on.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse()?;
-
- let socket = TcpListener::bind(&addr)?;
- println!("Listening on: {}", addr);
-
- // This is running on the Tokio current_thread runtime, so it will be single-
- // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks.
- let connections = Rc::new(RefCell::new(HashMap::new()));
-
- // The server task asynchronously iterates over and processes each incoming
- // connection.
- let srv = socket
- .incoming()
- .map_err(|e| {
- println!("failed to accept socket; error = {:?}", e);
- e
- })
- .for_each(move |stream| {
- // The client's socket address
- let addr = stream.peer_addr()?;
-
- println!("New Connection: {}", addr);
-
- // Split the TcpStream into two separate handles. One handle for reading
- // and one handle for writing. This lets us use separate tasks for
- // reading and writing.
- let (reader, writer) = stream.split();
-
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- let mut conns = connections.borrow_mut();
- conns.insert(addr, tx);
-
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_inner = connections.clone();
- let reader = BufReader::new(reader);
-
- // Model the read portion of this socket by mapping an infinite
- // iterator to each line off the socket. This "loop" is then
- // terminated with an error once we hit EOF on the socket.
- let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
-
- let socket_reader = iter.fold(reader, move |reader, _| {
- // Read a line off the socket, failing if we're at EOF
- let line = io::read_until(reader, b'\n', Vec::new());
- let line = line.and_then(|(reader, vec)| {
- if vec.len() == 0 {
- Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
- } else {
- Ok((reader, vec))
- }
- });
-
- // Convert the bytes we read into a string, and then send that
- // string to all other connected clients.
- let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
-
- // Move the connection state into the closure below.
- let connections = connections_inner.clone();
-
- line.map(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- let mut conns = connections.borrow_mut();
-
- if let Ok(msg) = message {
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns
- .iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
- }
- } else {
- let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string())
- .unwrap();
- }
-
- reader
- })
- });
-
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold(writer, |writer, msg| {
- let amt = io::write_all(writer, msg.into_bytes());
- let amt = amt.map(|(writer, _)| writer);
- amt.map_err(|_| ())
- });
-
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
-
- // Spawn locally a task to process the connection
- TaskExecutor::current()
- .spawn_local(Box::new(connection.then(move |_| {
- let mut conns = connections.borrow_mut();
- conns.remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- })))
- .unwrap();
-
- Ok(())
- })
- .map_err(|err| println!("error occurred: {:?}", err));
-
- // Spawn srv itself
- runtime.spawn(srv);
-
- // Execute server
- runtime.run().unwrap();
- Ok(())
-}
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs
deleted file mode 100644
index b81e8f7c..00000000
--- a/examples/chat-combinator.rs
+++ /dev/null
@@ -1,156 +0,0 @@
-//! A chat server that broadcasts a message to all connections.
-//!
-//! This is a line-based server which accepts connections, reads lines from
-//! those connections, and broadcasts the lines to all other connected clients.
-//!
-//! This example is similar to chat.rs, but uses combinators and a much more
-//! functional style.
-//!
-//! You can test this out by running:
-//!
-//! cargo run --example chat
-//!
-//! And then in another window run:
-//!
-//! cargo run --example connect 127.0.0.1:8080
-//!
-//! You can run the second command in multiple windows and then chat between the
-//! two, seeing the messages from the other client as they're received. For all
-//! connected clients they'll all join the same room and see everyone else's
-//! messages.
-
-#![deny(warnings)]
-
-extern crate futures;
-extern crate tokio;
-
-use tokio::io;
-use tokio::net::TcpListener;
-use tokio::prelude::*;
-
-use std::collections::HashMap;
-use std::env;
-use std::io::BufReader;
-use std::iter;
-use std::sync::{Arc, Mutex};
-
-fn main() -> Result<(), Box<std::error::Error>> {
- // Create the TCP listener we'll accept connections on.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse()?;
-
- let socket = TcpListener::bind(&addr)?;
- println!("Listening on: {}", addr);
-
- // This is running on the Tokio runtime, so it will be multi-threaded. The
- // `Arc<Mutex<...>>` allows state to be shared across the threads.
- let connections = Arc::new(Mutex::new(HashMap::new()));
-
- // The server task asynchronously iterates over and processes each incoming
- // connection.
- let srv = socket
- .incoming()
- .map_err(|e| {
- println!("failed to accept socket; error = {:?}", e);
- e
- })
- .for_each(move |stream| {
- // The client's socket address
- let addr = stream.peer_addr()?;
-
- println!("New Connection: {}", addr);
-
- // Split the TcpStream into two separate handles. One handle for reading
- // and one handle for writing. This lets us use separate tasks for
- // reading and writing.
- let (reader, writer) = stream.split();
-
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.lock().unwrap().insert(addr, tx);
-
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_inner = connections.clone();
- let reader = BufReader::new(reader);
-
- // Model the read portion of this socket by mapping an infinite
- // iterator to each line off the socket. This "loop" is then
- // terminated with an error once we hit EOF on the socket.
- let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
-
- let socket_reader = iter.fold(reader, move |reader, _| {
- // Read a line off the socket, failing if we're at EOF
- let line = io::read_until(reader, b'\n', Vec::new());
- let line = line.and_then(|(reader, vec)| {
- if vec.len() == 0 {
- Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
- } else {
- Ok((reader, vec))
- }
- });
-
- // Convert the bytes we read into a string, and then send that
- // string to all other connected clients.
- let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
-
- // Move the connection state into the closure below.
- let connections = connections_inner.clone();
-
- line.map(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- let mut conns = connections.lock().unwrap();
-
- if let Ok(msg) = message {
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns
- .iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
- }
- } else {
- let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string())
- .unwrap();
- }
-
- reader
- })
- });
-
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold(writer, |writer, msg| {
- let amt = io::write_all(writer, msg.into_bytes());
- let amt = amt.map(|(writer, _)| writer);
- amt.map_err(|_| ())
- });
-
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
-
- // Spawn a task to process the connection
- tokio::spawn(connection.then(move |_| {
- connections.lock().unwrap().remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- }));
-
- Ok(())
- })
- .map_err(|err| println!("error occurred: {:?}", err));
-
- // execute server
- tokio::run(srv);
- Ok(())
-}
diff --git a/examples/chat.rs b/examples/chat.rs
deleted file mode 100644
index b21432af..00000000
--- a/examples/chat.rs
+++ /dev/null
@@ -1,473 +0,0 @@
-//! A chat server that broadcasts a message to all connections.
-//!
-//! This example is explicitly more verbose than it has to be. This is to
-//! illustrate more concepts.
-//!
-//! A chat server for telnet clients. After a telnet client connects, the first
-//! line should contain the client's name. After that, all lines sent by a
-//! client are broadcasted to all other connected clients.
-//!
-//! Because the client is telnet, lines are delimited by "\r\n".
-//!
-//! You can test this out by running:
-//!
-//! cargo run --example chat
-//!
-//! And then in another terminal run:
-//!
-//! telnet localhost 6142
-//!
-//! You can run the `telnet` command in any number of additional windows.
-//!
-//! You can run the second command in multiple windows and then chat between the
-//! two, seeing the messages from the other client as they're received. For all
-//! connected clients they'll all join the same room and see everyone else's
-//! messages.
-
-#![deny(warnings)]
-
-extern crate tokio;
-#[macro_use]
-extern crate futures;
-extern crate bytes;
-
-use bytes::{BufMut, Bytes, BytesMut};
-use futures::future::{self, Either};
-use futures::sync::mpsc;
-use tokio::io;
-use tokio::net::{TcpListener, TcpStream};
-use tokio::prelude::*;
-
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use std::sync::{Arc, Mutex};
-
-/// Shorthand for the transmit half of the message channel.
-type Tx = mpsc::UnboundedSender<Bytes>;
-
-/// Shorthand for the receive half of the message channel.
-type Rx = mpsc::UnboundedReceiver<Bytes>;
-
-/// Data that is shared between all peers in the chat server.
-///
-/// This is the set of `Tx` handles for all connected clients. Whenever a
-/// message is received from a client, it is broadcasted to all peers by
-/// iterating over the `peers` entries and sending a copy of the message on each
-/// `Tx`.
-struct Shared {
- peers: HashMap<SocketAddr, Tx>,
-}
-
-/// The state for each connected client.
-struct Peer {
- /// Name of the peer.
- ///
- /// When a client connects, the first line sent is treated as the client's
- /// name (like alice or bob). The name is used to preface all messages that
- /// arrive from the client so that we can simulate a real chat server:
- ///
- /// ```text
- /// alice: Hello everyone.
- /// bob: Welcome to telnet chat!
- /// ```
- name: BytesMut,
-
- /// The TCP socket wrapped with the `Lines` codec, defined below.
- ///
- /// This handles sending and receiving data on the socket. When using
- /// `Lines`, we can work at the line level instead of having to manage the
- /// raw byte operations.
- lines: Lines,
-
- /// Handle to the shared chat state.
- ///
- /// This is used to broadcast messages read off the socket to all connected
- /// peers.
- state: Arc<Mutex<Shared>>,
-
- /// Receive half of the message channel.
- ///
- /// This is used to receive messages from peers. When a message is received
- /// off of this `Rx`, it will be written to the socket.
- rx: Rx,
-
- /// Client socket address.
- ///
- /// The socket address is used as the key in the `peers` HashMap. The
- /// address is saved so that the `Peer` drop implementation can clean up its
- /// entry.
- addr: SocketAddr,
-}
-
-/// Line based codec
-///
-/// This decorates a socket and presents a line based read / write interface.
-///
-/// As a user of `Lines`, we can focus on working at the line level. So, we send
-/// and receive values that represent entire lines. The `Lines` codec will
-/// handle the encoding and decoding as well as reading from and writing to the
-/// socket.
-#[derive(Debug)]
-struct Lines {
- /// The TCP socket.
- socket: TcpStream,
-
- /// Buffer used when reading from the socket. Data is not returned from this
- /// buffer until an entire line has been read.
- rd: BytesMut,
-
- /// Buffer used to stage data before writing it to the socket.
- wr: BytesMut,
-}
-
-impl Shared {
- /// Create a new, empty, instance of `Shared`.
- fn new() -> Self {
- Shared {
- peers: HashMap::new(),
- }
- }
-}
-
-impl Peer {
- /// Create a new instance of `Peer`.
- fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
- // Get the client socket address
- let addr = lines.socket.peer_addr().unwrap();
-
- // Create a channel for this peer
- let (tx, rx) = mpsc::unbounded();
-
- // Add an entry for this `Peer` in the shared state map.
- state.lock().unwrap().peers.insert(addr, tx);
-
- Peer {
- name,
- lines,
- state,
- rx,
- addr,
- }
- }
-}
-
-/// This is where a connected client is managed.
-///
-/// A `Peer` is also a future representing completely processing the client.
-///
-/// When a `Peer` is created, the first line (representing the client's name)
-/// has already been read. When the socket closes, the `Peer` future completes.
-///
-/// While processing, the peer future implementation will:
-///
-/// 1) Receive messages on its message channel and write them to the socket.
-/// 2) Receive messages from the socket and broadcast them to all peers.
-///
-impl Future for Peer {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<(), io::Error> {
- // Tokio (and futures) use cooperative scheduling without any
- // preemption. If a task never yields execution back to the executor,
- // then other tasks may be starved.
- //
- // To deal with this, robust applications should not have any unbounded
- // loops. In this example, we will read at most `LINES_PER_TICK` lines
- // from the client on each tick.
- //
- // If the limit is hit, the current task is notified, informing the
- // executor to schedule the task again asap.
- const LINES_PER_TICK: usize = 10;
-
- // Receive all messages from peers.
- for i in 0..LINES_PER_TICK {
- // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
- // safe.
- match self.rx.poll().unwrap() {
- Async::Ready(Some(v)) => {
- // Buffer the line. Once all lines are buffered, they will
- // be flushed to the socket (right below).
- self.lines.buffer(&v);
-
- // If this is the last iteration, the loop will break even
- // though there could still be lines to read. Because we did
- // not reach `Async::NotReady`, we have to notify ourselves
- // in order to tell the executor to schedule the task again.
- if i + 1 == LINES_PER_TICK {
- task::current().notify();
- }
- }
- _ => break,
- }
- }
-
- // Flush the write buffer to the socket
- let _ = self.lines.poll_flush()?;
-
- // Read new lines from the socket
- while let Async::Ready(line) = self.lines.poll()? {
- println!("Received line ({:?}) : {:?}", self.name, line);
-
- if let Some(message) = line {
- // Append the peer's name to the front of the line:
- let mut line = self.name.clone();
- line.extend_from_slice(b": ");
- line.extend_from_slice(&message);
- line.extend_from_slice(b"\r\n");
-
- // We're using `Bytes`, which allows zero-copy clones (by
- // storing the data in an Arc internally).
- //
- // However, before cloning, we must freeze the data. This
- // converts it from mutable -> immutable, allowing zero copy
- // cloning.
- let line = line.freeze();
-
- // Now, send the line to all other peers
- for (addr, tx) in &self.state.lock().unwrap().peers {
- // Don't send the message to ourselves
- if *addr != self.addr {
- // The send only fails if the rx half has been dropped,
- // however this is impossible as the `tx` half will be
- // removed from the map before the `rx` is dropped.
- tx.unbounded_send(line.clone()).unwrap();
- }
- }
- } else {
- // EOF was reached. The remote client has disconnected. There is
- // nothing more to do.
- return Ok(Async::Ready(()));
- }
- }
-
- // As always, it is important to not just return `NotReady` without
- // ensuring an inner future also returned `NotReady`.
- //
- // We know we got a `NotReady` from either `self.rx` or `self.lines`, so
- // the contract is respected.
- Ok(Async::NotReady)
- }
-}
-
-impl Drop for Peer {
- fn drop(&mut self) {
- self.state.lock().unwrap().peers.remove(&self.addr);
- }
-}
-
-impl Lines {
- /// Create a new `Lines` codec backed by the socket
- fn new(socket: TcpStream) -> Self {
- Lines {
- socket,
- rd: BytesMut::new(),
- wr: BytesMut::new(),
- }
- }
-
- /// Buffer a line.
- ///
- /// This writes the line to an internal buffer. Calls to `poll_flush` will
- /// attempt to flush this buffer to the socket.
- fn buffer(&mut self, line: &[u8]) {
- // Ensure the buffer has capacity. Ideally this would not be unbounded,
- // but to keep the example simple, we will not limit this.
- self.wr.reserve(line.len());
-
- // Push the line onto the end of the write buffer.
- //
- // The `put` function is from the `BufMut` trait.
- self.wr.put(line);
- }
-
- /// Flush the write buffer to the socket
- fn poll_flush(&mut self) -> Poll<(), io::Error> {
- // As long as there is buffered data to write, try to write it.
- while !self.wr.is_empty() {
- // Try to write some bytes to the socket
- let n = try_ready!(self.socket.poll_write(&self.wr));
-
- // As long as the wr is not empty, a successful write should
- // never write 0 bytes.
- assert!(n > 0);
-
- // This discards the first `n` bytes of the buffer.
- let _ = self.wr.split_to(n);
- }
-
- Ok(Async::Ready(()))
- }
-
- /// Read data from the socket.
- ///
- /// This only returns `Ready` when the socket has closed.
- fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
- loop {
- // Ensure the read buffer has capacity.
- //
- // This might result in an internal allocation.
- self.rd.reserve(1024);
-
- // Read data into the buffer.
- let n = try_ready!(self.socket.read_buf(&mut self.rd));
-
- if n == 0 {
- return Ok(Async::Ready(()));
- }
- }
- }
-}
-
-impl Stream for Lines {
- type Item = BytesMut;
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- // First, read any new data that might have been received off the socket
- let sock_closed = self.fill_read_buf()?.is_ready();
-
- // Now, try finding lines
- let pos = self
- .rd
- .windows(2)
- .enumerate()
- .find(|&(_, bytes)| bytes == b"\r\n")
- .map(|(i, _)| i);
-
- if let Some(pos) = pos {
- // Remove the line from the read buffer and set it to `line`.
- let mut line = self.rd.split_to(pos + 2);
-
- // Drop the trailing \r\n
- line.split_off(pos);
-
- // Return the line
- return Ok(Async::Ready(Some(line)));
- }
-
- if sock_closed {
- Ok(Async::Ready(None))
- } else {
- Ok(Async::NotReady)
- }
- }
-}
-
-/// Spawn a task to manage the socket.
-///
-/// This will read the first line from the socket to identify the client, then
-/// add the client to the set of connected peers in the chat service.
-fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
- // Wrap the socket with the `Lines` codec that we wrote above.
- //
- // By doing this, we can operate at the line level instead of doing raw byte
- // manipulation.
- let lines = Lines::new(socket);
-
- // The first line is treated as the client's name. The client is not added
- // to the set of connected peers until this line is received.
- //
- // We use the `into_future` combinator to extract the first item from the
- // lines stream. `into_future` takes a `Stream` and converts it to a future
- // of `(first, rest)` where `rest` is the original stream instance.
- let connection = lines
- .into_future()
- // `into_future` doesn't have the right error type, so map the error to
- // make it work.
- .map_err(|(e, _)| e)
- // Process the first received line as the client's name.
- .and_then(|(name, lines)| {
- // If `name` is `None`, then the client disconnected without
- // actually sending a line of data.
- //
- // Since the connection is closed, there is no further work that we
- // need to do. So, we just terminate processing by returning
- // `future::ok()`.
- //
- // The problem is that only a single future type can be returned
- // from a combinator closure, but we want to return both
- // `future::ok()` and `Peer` (below).
- //
- // This is a common problem, so the `futures` crate solves this by
- // providing the `Either` helper enum that allows creating a single
- // return type that covers two concrete future types.
- let name = match name {
- Some(name) => name,
- None => {
- // The remote client closed the connection without sending
- // any data.
- return Either::A(future::ok(()));
- }
- };
-
- println!("`{:?}` is joining the chat", name);
-
- // Create the peer.
- //
- // This is also a future that processes the connection, only
- // completing when the socket closes.
- let peer = Peer::new(name, state, lines);
-
- // Wrap `peer` with `Either::B` to make the return type fit.
- Either::B(peer)
- })
- // Task futures have an error of type `()`, this ensures we handle the
- // error. We do this by printing the error to STDOUT.
- .map_err(|e| {
- println!("connection error = {:?}", e);
- });
-
- // Spawn the task. Internally, this submits the task to a thread pool.
- tokio::spawn(connection);
-}
-
-pub fn main() -> Result<(), Box<std::error::Error>> {
- // Create the shared state. This is how all the peers communicate.
- //
- // The server task will hold a handle to this. For every new client, the
- // `state` handle is cloned and passed into the task that processes the
- // client connection.
- let state = Arc::new(Mutex::new(Shared::new()));
-
- let addr = "127.0.0.1:6142".parse()?;
-
- // Bind a TCP listener to the socket address.
- //
- // Note that this is the Tokio TcpListener, which is fully async.
- let listener = TcpListener::bind(&addr)?;
-
- // The server task asynchronously iterates over and processes each
- // incoming connection.
- let server = listener
- .incoming()
- .for_each(move |socket| {
- // Spawn a task to process the connection
- process(socket, state.clone());
- Ok(())
- })
- .map_err(|err| {
- // All tasks must have an `Error`