From 30330da11a56dfdd11bdbef50dba073a9edc36b2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 22 Mar 2019 15:25:42 -0700 Subject: chore: Fix examples not working with `cargo run` (#998) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: Fix examples not working with `cargo run` ## Motivation PR #991 moved the `tokio` crate to its own subdirectory, but did not move the `examples` directory into `tokio/examples`. While attempting to use the examples for testing another change, I noticed that #991 had broken the ability to use `cargo run`, as the examples were no longer considered part of a crate that cargo was aware of: ``` tokio on master [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev ➜ cargo run --example chat error: no example target named `chat` Did you mean `echo`? ``` ## Solution This branch moves the examples into the `tokio` directory, so cargo is now once again aware of them: ``` tokio on eliza/fix-examples [$] via 🦀v1.33.0 at ☸️ aks-eliza-dev ➜ cargo run --example chat Compiling tokio-executor v0.1.7 (/Users/eliza/Code/tokio/tokio-executor) Compiling tokio-reactor v0.1.9 Compiling tokio-threadpool v0.1.13 Compiling tokio-current-thread v0.1.6 Compiling tokio-timer v0.2.10 Compiling tokio-uds v0.2.5 Compiling tokio-udp v0.1.3 Compiling tokio-tcp v0.1.3 Compiling tokio-fs v0.1.6 Compiling tokio v0.1.18 (/Users/eliza/Code/tokio/tokio) Finished dev [unoptimized + debuginfo] target(s) in 7.04s Running `target/debug/examples/chat` server running on localhost:6142 ``` Signed-off-by: Eliza Weisman Signed-off-by: Eliza Weisman --- examples/README.md | 60 ---- examples/chat-combinator-current-thread.rs | 172 ----------- examples/chat-combinator.rs | 156 ---------- examples/chat.rs | 473 ----------------------------- examples/connect.rs | 257 ---------------- examples/echo-udp.rs | 74 ----- examples/echo.rs | 115 ------- examples/hello_world.rs | 58 ---- examples/manual-runtime.rs | 87 ------ examples/print_each_packet.rs | 150 --------- examples/proxy.rs | 130 -------- examples/tinydb.rs | 227 -------------- examples/tinyhttp.rs | 325 -------------------- examples/udp-client.rs | 70 ----- examples/udp-codec.rs | 65 ---- 15 files changed, 2419 deletions(-) delete mode 100644 examples/README.md delete mode 100644 examples/chat-combinator-current-thread.rs delete mode 100644 examples/chat-combinator.rs delete mode 100644 examples/chat.rs delete mode 100644 examples/connect.rs delete mode 100644 examples/echo-udp.rs delete mode 100644 examples/echo.rs delete mode 100644 examples/hello_world.rs delete mode 100644 examples/manual-runtime.rs delete mode 100644 examples/print_each_packet.rs delete mode 100644 examples/proxy.rs delete mode 100644 examples/tinydb.rs delete mode 100644 examples/tinyhttp.rs delete mode 100644 examples/udp-client.rs delete mode 100644 examples/udp-codec.rs (limited to 'examples') diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index 63634c82..00000000 --- a/examples/README.md +++ /dev/null @@ -1,60 +0,0 @@ -## Examples of how to use Tokio - -This directory contains a number of examples showcasing various capabilities of -the `tokio` crate. - -All examples can be executed with: - -``` -cargo run --example $name -``` - -A high level description of each example is: - -* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to - all connected clients and then terminates the connection, should help see how - to create and initialize `tokio`. - -* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts - connections and then echos back any contents that are read from each connected - client. - -* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP - listener, accept connections in a loop, and put down in the stdout everything - that's read off of each TCP connection. - -* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP - instead of TCP. This will echo back any packets received to the original - sender. - -* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to - interact with most other examples. The program creates a TCP connection or UDP - socket to sends all information read on stdin to the remote peer, displaying - any data received on stdout. Often quite useful when interacting with the - various other servers here! - -* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from - any connected client to all other connected clients. You can connect to this - in multiple terminals and use it to chat between the terminals. - -* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a - much more functional programming approach using combinators. - -* [`proxy`](proxy.rs) - an example proxy server that will forward all connected - TCP clients to the remote address specified when starting the program. - -* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP - request bodies showcasing running on multiple cores, working with futures and - spawning tasks, and finally framing a TCP connection to discrete - request/response objects. - -* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state - between all connected clients, notably the key/value store of this database. - -* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example. - -* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime. - -If you've got an example you'd like to see here, please feel free to open an -issue. Otherwise if you've got an example you'd like to add, please feel free -to make a PR! diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs deleted file mode 100644 index ee147025..00000000 --- a/examples/chat-combinator-current-thread.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a line-based server which accepts connections, reads lines from -//! those connections, and broadcasts the lines to all other connected clients. -//! -//! This example is similar to chat.rs, but uses combinators and a much more -//! functional style. -//! -//! Because we are here running the reactor/executor on the same thread instead -//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use -//! Rc + RefCell instead. The max performance is however limited to a CPU HW -//! thread. -//! -//! You can test this out by running: -//! -//! cargo run --example chat-combinator-current-thread -//! -//! And then in another window run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; -use tokio::runtime::current_thread::{Runtime, TaskExecutor}; - -use std::cell::RefCell; -use std::collections::HashMap; -use std::env; -use std::io::BufReader; -use std::iter; -use std::rc::Rc; - -fn main() -> Result<(), Box> { - let mut runtime = Runtime::new().unwrap(); - - // Create the TCP listener we'll accept connections on. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse()?; - - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // This is running on the Tokio current_thread runtime, so it will be single- - // threaded. The `Rc>` allows state to be shared across the tasks. - let connections = Rc::new(RefCell::new(HashMap::new())); - - // The server task asynchronously iterates over and processes each incoming - // connection. - let srv = socket - .incoming() - .map_err(|e| { - println!("failed to accept socket; error = {:?}", e); - e - }) - .for_each(move |stream| { - // The client's socket address - let addr = stream.peer_addr()?; - - println!("New Connection: {}", addr); - - // Split the TcpStream into two separate handles. One handle for reading - // and one handle for writing. This lets us use separate tasks for - // reading and writing. - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - let mut conns = connections.borrow_mut(); - conns.insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); - - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) - } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); - - // Move the connection state into the closure below. - let connections = connections_inner.clone(); - - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.borrow_mut(); - - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns - .iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); - } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()) - .unwrap(); - } - - reader - }) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - - // Spawn locally a task to process the connection - TaskExecutor::current() - .spawn_local(Box::new(connection.then(move |_| { - let mut conns = connections.borrow_mut(); - conns.remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - }))) - .unwrap(); - - Ok(()) - }) - .map_err(|err| println!("error occurred: {:?}", err)); - - // Spawn srv itself - runtime.spawn(srv); - - // Execute server - runtime.run().unwrap(); - Ok(()) -} diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs deleted file mode 100644 index b81e8f7c..00000000 --- a/examples/chat-combinator.rs +++ /dev/null @@ -1,156 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a line-based server which accepts connections, reads lines from -//! those connections, and broadcasts the lines to all other connected clients. -//! -//! This example is similar to chat.rs, but uses combinators and a much more -//! functional style. -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another window run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; - -use std::collections::HashMap; -use std::env; -use std::io::BufReader; -use std::iter; -use std::sync::{Arc, Mutex}; - -fn main() -> Result<(), Box> { - // Create the TCP listener we'll accept connections on. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse()?; - - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // This is running on the Tokio runtime, so it will be multi-threaded. The - // `Arc>` allows state to be shared across the threads. - let connections = Arc::new(Mutex::new(HashMap::new())); - - // The server task asynchronously iterates over and processes each incoming - // connection. - let srv = socket - .incoming() - .map_err(|e| { - println!("failed to accept socket; error = {:?}", e); - e - }) - .for_each(move |stream| { - // The client's socket address - let addr = stream.peer_addr()?; - - println!("New Connection: {}", addr); - - // Split the TcpStream into two separate handles. One handle for reading - // and one handle for writing. This lets us use separate tasks for - // reading and writing. - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - connections.lock().unwrap().insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); - - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) - } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); - - // Move the connection state into the closure below. - let connections = connections_inner.clone(); - - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.lock().unwrap(); - - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns - .iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); - } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()) - .unwrap(); - } - - reader - }) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - - // Spawn a task to process the connection - tokio::spawn(connection.then(move |_| { - connections.lock().unwrap().remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - })); - - Ok(()) - }) - .map_err(|err| println!("error occurred: {:?}", err)); - - // execute server - tokio::run(srv); - Ok(()) -} diff --git a/examples/chat.rs b/examples/chat.rs deleted file mode 100644 index b21432af..00000000 --- a/examples/chat.rs +++ /dev/null @@ -1,473 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This example is explicitly more verbose than it has to be. This is to -//! illustrate more concepts. -//! -//! A chat server for telnet clients. After a telnet client connects, the first -//! line should contain the client's name. After that, all lines sent by a -//! client are broadcasted to all other connected clients. -//! -//! Because the client is telnet, lines are delimited by "\r\n". -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another terminal run: -//! -//! telnet localhost 6142 -//! -//! You can run the `telnet` command in any number of additional windows. -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate tokio; -#[macro_use] -extern crate futures; -extern crate bytes; - -use bytes::{BufMut, Bytes, BytesMut}; -use futures::future::{self, Either}; -use futures::sync::mpsc; -use tokio::io; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; - -/// Shorthand for the transmit half of the message channel. -type Tx = mpsc::UnboundedSender; - -/// Shorthand for the receive half of the message channel. -type Rx = mpsc::UnboundedReceiver; - -/// Data that is shared between all peers in the chat server. -/// -/// This is the set of `Tx` handles for all connected clients. Whenever a -/// message is received from a client, it is broadcasted to all peers by -/// iterating over the `peers` entries and sending a copy of the message on each -/// `Tx`. -struct Shared { - peers: HashMap, -} - -/// The state for each connected client. -struct Peer { - /// Name of the peer. - /// - /// When a client connects, the first line sent is treated as the client's - /// name (like alice or bob). The name is used to preface all messages that - /// arrive from the client so that we can simulate a real chat server: - /// - /// ```text - /// alice: Hello everyone. - /// bob: Welcome to telnet chat! - /// ``` - name: BytesMut, - - /// The TCP socket wrapped with the `Lines` codec, defined below. - /// - /// This handles sending and receiving data on the socket. When using - /// `Lines`, we can work at the line level instead of having to manage the - /// raw byte operations. - lines: Lines, - - /// Handle to the shared chat state. - /// - /// This is used to broadcast messages read off the socket to all connected - /// peers. - state: Arc>, - - /// Receive half of the message channel. - /// - /// This is used to receive messages from peers. When a message is received - /// off of this `Rx`, it will be written to the socket. - rx: Rx, - - /// Client socket address. - /// - /// The socket address is used as the key in the `peers` HashMap. The - /// address is saved so that the `Peer` drop implementation can clean up its - /// entry. - addr: SocketAddr, -} - -/// Line based codec -/// -/// This decorates a socket and presents a line based read / write interface. -/// -/// As a user of `Lines`, we can focus on working at the line level. So, we send -/// and receive values that represent entire lines. The `Lines` codec will -/// handle the encoding and decoding as well as reading from and writing to the -/// socket. -#[derive(Debug)] -struct Lines { - /// The TCP socket. - socket: TcpStream, - - /// Buffer used when reading from the socket. Data is not returned from this - /// buffer until an entire line has been read. - rd: BytesMut, - - /// Buffer used to stage data before writing it to the socket. - wr: BytesMut, -} - -impl Shared { - /// Create a new, empty, instance of `Shared`. - fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } -} - -impl Peer { - /// Create a new instance of `Peer`. - fn new(name: BytesMut, state: Arc>, lines: Lines) -> Peer { - // Get the client socket address - let addr = lines.socket.peer_addr().unwrap(); - - // Create a channel for this peer - let (tx, rx) = mpsc::unbounded(); - - // Add an entry for this `Peer` in the shared state map. - state.lock().unwrap().peers.insert(addr, tx); - - Peer { - name, - lines, - state, - rx, - addr, - } - } -} - -/// This is where a connected client is managed. -/// -/// A `Peer` is also a future representing completely processing the client. -/// -/// When a `Peer` is created, the first line (representing the client's name) -/// has already been read. When the socket closes, the `Peer` future completes. -/// -/// While processing, the peer future implementation will: -/// -/// 1) Receive messages on its message channel and write them to the socket. -/// 2) Receive messages from the socket and broadcast them to all peers. -/// -impl Future for Peer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - // Tokio (and futures) use cooperative scheduling without any - // preemption. If a task never yields execution back to the executor, - // then other tasks may be starved. - // - // To deal with this, robust applications should not have any unbounded - // loops. In this example, we will read at most `LINES_PER_TICK` lines - // from the client on each tick. - // - // If the limit is hit, the current task is notified, informing the - // executor to schedule the task again asap. - const LINES_PER_TICK: usize = 10; - - // Receive all messages from peers. - for i in 0..LINES_PER_TICK { - // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is - // safe. - match self.rx.poll().unwrap() { - Async::Ready(Some(v)) => { - // Buffer the line. Once all lines are buffered, they will - // be flushed to the socket (right below). - self.lines.buffer(&v); - - // If this is the last iteration, the loop will break even - // though there could still be lines to read. Because we did - // not reach `Async::NotReady`, we have to notify ourselves - // in order to tell the executor to schedule the task again. - if i + 1 == LINES_PER_TICK { - task::current().notify(); - } - } - _ => break, - } - } - - // Flush the write buffer to the socket - let _ = self.lines.poll_flush()?; - - // Read new lines from the socket - while let Async::Ready(line) = self.lines.poll()? { - println!("Received line ({:?}) : {:?}", self.name, line); - - if let Some(message) = line { - // Append the peer's name to the front of the line: - let mut line = self.name.clone(); - line.extend_from_slice(b": "); - line.extend_from_slice(&message); - line.extend_from_slice(b"\r\n"); - - // We're using `Bytes`, which allows zero-copy clones (by - // storing the data in an Arc internally). - // - // However, before cloning, we must freeze the data. This - // converts it from mutable -> immutable, allowing zero copy - // cloning. - let line = line.freeze(); - - // Now, send the line to all other peers - for (addr, tx) in &self.state.lock().unwrap().peers { - // Don't send the message to ourselves - if *addr != self.addr { - // The send only fails if the rx half has been dropped, - // however this is impossible as the `tx` half will be - // removed from the map before the `rx` is dropped. - tx.unbounded_send(line.clone()).unwrap(); - } - } - } else { - // EOF was reached. The remote client has disconnected. There is - // nothing more to do. - return Ok(Async::Ready(())); - } - } - - // As always, it is important to not just return `NotReady` without - // ensuring an inner future also returned `NotReady`. - // - // We know we got a `NotReady` from either `self.rx` or `self.lines`, so - // the contract is respected. - Ok(Async::NotReady) - } -} - -impl Drop for Peer { - fn drop(&mut self) { - self.state.lock().unwrap().peers.remove(&self.addr); - } -} - -impl Lines { - /// Create a new `Lines` codec backed by the socket - fn new(socket: TcpStream) -> Self { - Lines { - socket, - rd: BytesMut::new(), - wr: BytesMut::new(), - } - } - - /// Buffer a line. - /// - /// This writes the line to an internal buffer. Calls to `poll_flush` will - /// attempt to flush this buffer to the socket. - fn buffer(&mut self, line: &[u8]) { - // Ensure the buffer has capacity. Ideally this would not be unbounded, - // but to keep the example simple, we will not limit this. - self.wr.reserve(line.len()); - - // Push the line onto the end of the write buffer. - // - // The `put` function is from the `BufMut` trait. - self.wr.put(line); - } - - /// Flush the write buffer to the socket - fn poll_flush(&mut self) -> Poll<(), io::Error> { - // As long as there is buffered data to write, try to write it. - while !self.wr.is_empty() { - // Try to write some bytes to the socket - let n = try_ready!(self.socket.poll_write(&self.wr)); - - // As long as the wr is not empty, a successful write should - // never write 0 bytes. - assert!(n > 0); - - // This discards the first `n` bytes of the buffer. - let _ = self.wr.split_to(n); - } - - Ok(Async::Ready(())) - } - - /// Read data from the socket. - /// - /// This only returns `Ready` when the socket has closed. - fn fill_read_buf(&mut self) -> Poll<(), io::Error> { - loop { - // Ensure the read buffer has capacity. - // - // This might result in an internal allocation. - self.rd.reserve(1024); - - // Read data into the buffer. - let n = try_ready!(self.socket.read_buf(&mut self.rd)); - - if n == 0 { - return Ok(Async::Ready(())); - } - } - } -} - -impl Stream for Lines { - type Item = BytesMut; - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - // First, read any new data that might have been received off the socket - let sock_closed = self.fill_read_buf()?.is_ready(); - - // Now, try finding lines - let pos = self - .rd - .windows(2) - .enumerate() - .find(|&(_, bytes)| bytes == b"\r\n") - .map(|(i, _)| i); - - if let Some(pos) = pos { - // Remove the line from the read buffer and set it to `line`. - let mut line = self.rd.split_to(pos + 2); - - // Drop the trailing \r\n - line.split_off(pos); - - // Return the line - return Ok(Async::Ready(Some(line))); - } - - if sock_closed { - Ok(Async::Ready(None)) - } else { - Ok(Async::NotReady) - } - } -} - -/// Spawn a task to manage the socket. -/// -/// This will read the first line from the socket to identify the client, then -/// add the client to the set of connected peers in the chat service. -fn process(socket: TcpStream, state: Arc>) { - // Wrap the socket with the `Lines` codec that we wrote above. - // - // By doing this, we can operate at the line level instead of doing raw byte - // manipulation. - let lines = Lines::new(socket); - - // The first line is treated as the client's name. The client is not added - // to the set of connected peers until this line is received. - // - // We use the `into_future` combinator to extract the first item from the - // lines stream. `into_future` takes a `Stream` and converts it to a future - // of `(first, rest)` where `rest` is the original stream instance. - let connection = lines - .into_future() - // `into_future` doesn't have the right error type, so map the error to - // make it work. - .map_err(|(e, _)| e) - // Process the first received line as the client's name. - .and_then(|(name, lines)| { - // If `name` is `None`, then the client disconnected without - // actually sending a line of data. - // - // Since the connection is closed, there is no further work that we - // need to do. So, we just terminate processing by returning - // `future::ok()`. - // - // The problem is that only a single future type can be returned - // from a combinator closure, but we want to return both - // `future::ok()` and `Peer` (below). - // - // This is a common problem, so the `futures` crate solves this by - // providing the `Either` helper enum that allows creating a single - // return type that covers two concrete future types. - let name = match name { - Some(name) => name, - None => { - // The remote client closed the connection without sending - // any data. - return Either::A(future::ok(())); - } - }; - - println!("`{:?}` is joining the chat", name); - - // Create the peer. - // - // This is also a future that processes the connection, only - // completing when the socket closes. - let peer = Peer::new(name, state, lines); - - // Wrap `peer` with `Either::B` to make the return type fit. - Either::B(peer) - }) - // Task futures have an error of type `()`, this ensures we handle the - // error. We do this by printing the error to STDOUT. - .map_err(|e| { - println!("connection error = {:?}", e); - }); - - // Spawn the task. Internally, this submits the task to a thread pool. - tokio::spawn(connection); -} - -pub fn main() -> Result<(), Box> { - // Create the shared state. This is how all the peers communicate. - // - // The server task will hold a handle to this. For every new client, the - // `state` handle is cloned and passed into the task that processes the - // client connection. - let state = Arc::new(Mutex::new(Shared::new())); - - let addr = "127.0.0.1:6142".parse()?; - - // Bind a TCP listener to the socket address. - // - // Note that this is the Tokio TcpListener, which is fully async. - let listener = TcpListener::bind(&addr)?; - - // The server task asynchronously iterates over and processes each - // incoming connection. - let server = listener - .incoming() - .for_each(move |socket| { - // Spawn a task to process the connection - process(socket, state.clone()); - Ok(()) - }) - .map_err(|err| { - // All tasks must have an `Error` type of `()`. This forces error - // handling and helps avoid silencing failures. - // - // In our example, we are only going to log the error to STDOUT. - println!("accept error = {:?}", err); - }); - - println!("server running on localhost:6142"); - - // Start the Tokio runtime. - // - // The Tokio is a pre-configured "out of the box" runtime for building - // asynchronous applications. It includes both a reactor and a task - // scheduler. This means applications are multithreaded by default. - // - // This function blocks until the runtime reaches an idle state. Idle is - // defined as all spawned tasks have completed and all I/O resources (TCP - // sockets in our case) have been dropped. - // - // In our example, we have not defined a shutdown strategy, so this will - // block until `ctrl-c` is pressed at the terminal. - tokio::run(server); - Ok(()) -} diff --git a/examples/connect.rs b/examples/connect.rs deleted file mode 100644 index 4dc0ea31..00000000 --- a/examples/connect.rs +++ /dev/null @@ -1,257 +0,0 @@ -//! An example of hooking up stdin/stdout to either a TCP or UDP stream. -//! -//! This example will connect to a socket address specified in the argument list -//! and then forward all data read on stdin to the server, printing out all data -//! received on stdout. An optional `--udp` argument can be passed to specify -//! that the connection should be made over UDP instead of TCP, translating each -//! line entered on stdin to a UDP packet to be sent to the remote address. -//! -//! Note that this is not currently optimized for performance, especially -//! around buffer management. Rather it's intended to show an example of -//! working with a client. -//! -//! This example can be quite useful when interacting with the other examples in -//! this repository! Many of them recommend running this as a simple "hook up -//! stdin/stdout to a server" to get up and running. - -#![deny(warnings)] - -extern crate bytes; -extern crate futures; -extern crate tokio; -extern crate tokio_io; - -use std::env; -use std::io::{self, Read, Write}; -use std::net::SocketAddr; -use std::thread; - -use futures::sync::mpsc; -use tokio::prelude::*; - -fn main() -> Result<(), Box> { - // Determine if we're going to run in TCP or UDP mode - let mut args = env::args().skip(1).collect::>(); - let tcp = match args.iter().position(|a| a == "--udp") { - Some(i) => { - args.remove(i); - false - } - None => true, - }; - - // Parse what address we're going to connect to - let addr = match args.first() { - Some(addr) => addr, - None => Err("this program requires at least one argument")?, - }; - let addr = addr.parse::()?; - - // Right now Tokio doesn't support a handle to stdin running on the event - // loop, so we farm out that work to a separate thread. This thread will - // read data (with blocking I/O) from stdin and then send it to the event - // loop over a standard futures channel. - let (stdin_tx, stdin_rx) = mpsc::channel(0); - thread::spawn(|| read_stdin(stdin_tx)); - let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx")); - - // Now that we've got our stdin read we either set up our TCP connection or - // our UDP connection to get a stream of bytes we're going to emit to - // stdout. - let stdout = if tcp { - tcp::connect(&addr, Box::new(stdin_rx))? - } else { - udp::connect(&addr, Box::new(stdin_rx))? - }; - - // And now with our stream of bytes to write to stdout, we execute that in - // the event loop! Note that this is doing blocking I/O to emit data to - // stdout, and in general it's a no-no to do that sort of work on the event - // loop. In this case, though, we know it's ok as the event loop isn't - // otherwise running anything useful. - let mut out = io::stdout(); - - tokio::run({ - stdout - .for_each(move |chunk| out.write_all(&chunk)) - .map_err(|e| println!("error reading stdout; error = {:?}", e)) - }); - Ok(()) -} - -mod codec { - use bytes::{BufMut, BytesMut}; - use std::io; - use tokio::codec::{Decoder, Encoder}; - - /// A simple `Codec` implementation that just ships bytes around. - /// - /// This type is used for "framing" a TCP/UDP stream of bytes but it's really - /// just a convenient method for us to work with streams/sinks for now. - /// This'll just take any data read and interpret it as a "frame" and - /// conversely just shove data into the output location without looking at - /// it. - pub struct Bytes; - - impl Decoder for Bytes { - type Item = BytesMut; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { - if buf.len() > 0 { - let len = buf.len(); - Ok(Some(buf.split_to(len))) - } else { - Ok(None) - } - } - } - - impl Encoder for Bytes { - type Item = Vec; - type Error = io::Error; - - fn encode(&mut self, data: Vec, buf: &mut BytesMut) -> io::Result<()> { - buf.put(&data[..]); - Ok(()) - } - } -} - -mod tcp { - use tokio; - use tokio::codec::Decoder; - use tokio::net::TcpStream; - use tokio::prelude::*; - - use bytes::BytesMut; - use codec::Bytes; - - use std::error::Error; - use std::io; - use std::net::SocketAddr; - - pub fn connect( - addr: &SocketAddr, - stdin: Box, Error = io::Error> + Send>, - ) -> Result + Send>, Box> { - let tcp = TcpStream::connect(addr); - - // After the TCP connection has been established, we set up our client - // to start forwarding data. - // - // First we use the `Io::framed` method with a simple implementation of - // a `Codec` (listed below) that just ships bytes around. We then split - // that in two to work with the stream and sink separately. - // - // Half of the work we're going to do is to take all data we receive on - // `stdin` and send that along the TCP stream (`sink`). The second half - // is to take all the data we receive (`stream`) and then write that to - // stdout. We'll be passing this handle back out from this method. - // - // You'll also note that we *spawn* the work to read stdin and write it - // to the TCP stream. This is done to ensure that happens concurrently - // with us reading data from the stream. - let stream = Box::new( - tcp.map(move |stream| { - let (sink, stream) = Bytes.framed(stream).split(); - - tokio::spawn(stdin.forward(sink).then(|result| { - if let Err(e) = result { - println!("failed to write to socket: {}", e) - } - Ok(()) - })); - - stream - }) - .flatten_stream(), - ); - Ok(stream) - } -} - -mod udp { - use std::error::Error; - use std::io; - use std::net::SocketAddr; - - use bytes::BytesMut; - use tokio; - use tokio::net::{UdpFramed, UdpSocket}; - use tokio::prelude::*; - - use codec::Bytes; - - pub fn connect( - &addr: &SocketAddr, - stdin: Box, Error = io::Error> + Send>, - ) -> Result + Send>, Box> { - // We'll bind our UDP socket to a local IP/port, but for now we - // basically let the OS pick both of those. - let addr_to_bind = if addr.ip().is_ipv4() { - "0.0.0.0:0".parse()? - } else { - "[::]:0".parse()? - }; - let udp = match UdpSocket::bind(&addr_to_bind) { - Ok(udp) => udp, - Err(_) => Err("failed to bind socket")?, - }; - - // Like above with TCP we use an instance of `Bytes` codec to transform - // this UDP socket into a framed sink/stream which operates over - // discrete values. In this case we're working with *pairs* of socket - // addresses and byte buffers. - let (sink, stream) = UdpFramed::new(udp, Bytes).split(); - - // All bytes from `stdin` will go to the `addr` specified in our - // argument list. Like with TCP this is spawned concurrently - let forward_stdin = stdin - .map(move |chunk| (chunk, addr)) - .forward(sink) - .then(|result| { - if let Err(e) = result { - println!("failed to write to socket: {}", e) - } - Ok(()) - }); - - // With UDP we could receive data from any source, so filter out - // anything coming from a different address - let receive = stream.filter_map(move |(chunk, src)| { - if src == addr { - Some(chunk.into()) - } else { - None - } - }); - - let stream = Box::new( - future::lazy(|| { - tokio::spawn(forward_stdin); - future::ok(receive) - }) - .flatten_stream(), - ); - Ok(stream) - } -} - -// Our helper method which will read data from stdin and send it along the -// sender provided. -fn read_stdin(mut tx: mpsc::Sender>) { - let mut stdin = io::stdin(); - loop { - let mut buf = vec![0; 1024]; - let n = match stdin.read(&mut buf) { - Err(_) | Ok(0) => break, - Ok(n) => n, - }; - buf.truncate(n); - tx = match tx.send(buf).wait() { - Ok(tx) => tx, - Err(_) => break, - }; - } -} diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs deleted file mode 100644 index 93ebca79..00000000 --- a/examples/echo-udp.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! An UDP echo server that just sends back everything that it receives. -//! -//! If you're on Unix you can test this out by in one terminal executing: -//! -//! cargo run --example echo-udp -//! -//! and in another terminal you can run: -//! -//! cargo run --example connect -- --udp 127.0.0.1:8080 -//! -//! Each line you type in to the `nc` terminal should be echo'd back to you! - -#![deny(warnings)] - -#[macro_use] -extern crate futures; -extern crate tokio; - -use std::net::SocketAddr; -use std::{env, io}; - -use tokio::net::UdpSocket; -use tokio::prelude::*; - -struct Server { - socket: UdpSocket, - buf: Vec, - to_send: Option<(usize, SocketAddr)>, -} - -impl Future for Server { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - loop { - // First we check to see if there's a message we need to echo back. - // If so then we try to send it back to the original source, waiting - // until it's writable and we're able to do so. - if let Some((size, peer)) = self.to_send { - let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); - println!("Echoed {}/{} bytes to {}", amt, size, peer); - self.to_send = None; - } - - // If we're here then `to_send` is `None`, so we take a look for the - // next message we're going to echo back. - self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); - } - } -} - -fn main() -> Result<(), Box> { - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - - let socket = UdpSocket::bind(&addr)?; - println!("Listening on: {}", socket.local_addr()?); - - let server = Server { - socket: socket, - buf: vec![0; 1024], - to_send: None, - }; - - // This starts the server task. - // - // `map_err` handles the error by logging it and maps the future to a type - // that can be spawned. - // - // `tokio::run` spawns the task on the Tokio runtime and starts running. - tokio::run(server.map_err(|e| println!("server error = {:?}", e))); - Ok(()) -} diff --git a/examples/echo.rs b/examples/echo.rs deleted file mode 100644 index 45f808f8..00000000 --- a/examples/echo.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! A "hello world" echo server with Tokio -//! -//! This server will create a TCP listener, accept connections in a loop, and -//! write back everything that's read off of each TCP connection. -//! -//! Because the Tokio runtime uses a thread pool, each TCP connection is -//! processed concurrently with all other TCP connections across multiple -//! threads. -//! -//! To see this server in action, you can run this in one terminal: -//! -//! cargo run --example echo -//! -//! and in another terminal you can run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! Each line you type in to the `connect` terminal should be echo'd back to -//! you! If you open up multiple terminals running the `connect` example you -//! should be able to see them all make progress simultaneously. - -#![deny(warnings)] - -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; - -use std::env; -use std::net::SocketAddr; - -fn main() -> Result<(), Box> { - // Allow passing an address to listen on as the first argument of this - // program, but otherwise we'll just set up our TCP listener on - // 127.0.0.1:8080 for connections. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - - // Next up we create a TCP listener which will listen for incoming - // connections. This TCP listener is bound to the address we determined - // above and must be associated with an event loop, so we pass in a handle - // to our event loop. After the socket's created we inform that we're ready - // to go and start accepting connections. - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // Here we convert the `TcpListener` to a stream of incoming connections - // with the `incoming` method. We then define how to process each element in - // the stream with the `for_each` method. - // - // This combinator, defined on the `Stream` trait, will allow us to define a - // computation to happen for all items on the stream (in this case TCP - // connections made to the server). The return value of the `for_each` - // method is itself a future representing processing the entire stream of - // connections, and ends up being our server. - let done = socket - .incoming() - .map_err(|e| println!("failed to accept socket; error = {:?}", e)) - .for_each(move |socket| { - // Once we're inside this closure this represents an accepted client - // from our server. The `socket` is the client connection (similar to - // how the standard library operates). - // - // We just want to copy all data read from the socket back onto the - // socket itself (e.g. "echo"). We can use the standard `io::copy` - // combinator in the `tokio-core` crate to do precisely this! - // - // The `copy` function takes two arguments, where to read from and where - // to write to. We only have one argument, though, with `socket`. - // Luckily there's a method, `Io::split`, which will split an Read/Write - // stream into its two halves. This operation allows us to work with - // each stream independently, such as pass them as two arguments to the - // `copy` function. - // - // The `copy` function then returns a future, and this future will be - // resolved when the copying operation is complete, resolving to the - // amount of data that was copied. - let (reader, writer) = socket.split(); - let amt = io::copy(reader, writer); - - // After our copy operation is complete we just print out some helpful - // information. - let msg = amt.then(move |result| { - match result { - Ok((amt, _, _)) => println!("wrote {} bytes", amt), - Err(e) => println!("error: {}", e), - } - - Ok(()) - }); - - // And this is where much of the magic of this server happens. We - // crucially want all clients to make progress concurrently, rather than - // blocking one on completion of another. To achieve this we use the - // `tokio::spawn` function to execute the work in the background. - // - // This function will transfer ownership of the future (`msg` in this - // case) to the Tokio runtime thread pool that. The thread pool will - // drive the future to completion. - // - // Essentially here we're executing a new task to run concurrently, - // which will allow all of our clients to be processed concurrently. - tokio::spawn(msg) - }); - - // And finally now that we've define what our server is, we run it! - // - // This starts the Tokio runtime, spawns the server task, and blocks the - // current thread until all tasks complete execution. Since the `done` task - // never completes (it just keeps accepting sockets), `tokio::run` blocks - // forever (until ctrl-c is pressed). - tokio::run(done); - Ok(()) -} diff --git a/examples/hello_world.rs b/examples/hello_world.rs deleted file mode 100644 index c8276269..00000000 --- a/examples/hello_world.rs +++ /dev/null @@ -1,58 +0,0 @@ -//! Hello world server. -//! -//! A simple client that opens a TCP stream, writes "hello world\n", and closes -//! the connection. -//! -//! You can test this out by running: -//! -//! ncat -l 6142 -//! -//! And then in another terminal run: -//! -//! cargo run --example hello_world - -#![deny(warnings)] - -extern crate tokio; - -use tokio::io; -use tokio::net::TcpStream; -use tokio::prelude::*; - -pub fn main() -> Result<(), Box> { - let addr = "127.0.0.1:6142".parse()?; - - // Open a TCP stream to the socket address. - // - // Note that this is the Tokio TcpStream, which is fully async. - let client = TcpStream::connect(&addr) - .and_then(|stream| { - println!("created stream"); - io::write_all(stream, "hello world\n").then(|result| { - println!("wrote to stream; success={:?}", result.is_ok()); - Ok(()) - }) - }) - .map_err(|err| { - // All tasks must have an `Error` type of `()`. This forces error - // handling and helps avoid silencing failures. - // - // In our example, we are only going to log the error to STDOUT. - println!("connection error = {:?}", err); - }); - - // Start the Tokio runtime. - // - // The Tokio is a pre-configured "out of the box" runtime for building - // asynchronous applications. It includes both a reactor and a task - // scheduler. This means applications are multithreaded by default. - // - // This function blocks until the runtime reaches an idle state. Idle is - // defined as all spawned tasks have completed and all I/O resources (TCP - // sockets in our case) have been dropped. - println!("About to create the stream and write to it..."); - tokio::run(client); - println!("Stream has been created and written to."); - - Ok(()) -} diff --git a/examples/manual-runtime.rs b/examples/manual-runtime.rs deleted file mode 100644 index 8e3e1299..00000000 --- a/examples/manual-runtime.rs +++ /dev/null @@ -1,87 +0,0 @@ -//! An example how to manually assemble a runtime and run some tasks on it. -//! -//! This is closer to the single-threaded runtime than the default tokio one, as it is simpler to -//! grasp. There are conceptually similar, but the multi-threaded one would be more code. If you -//! just want to *use* a single-threaded runtime, use the one provided by tokio directly -//! (`tokio::runtime::current_thread::Runtime::new()`. This is a demonstration only. -//! -//! Note that the error handling is a bit left out. Also, the `run` could be modified to return the -//! result of the provided future. - -extern crate futures; -extern crate tokio; -extern crate tokio_current_thread; -extern crate tokio_executor; -extern crate tokio_reactor; -extern crate tokio_timer; - -use std::io::Error as IoError; -use std::time::{Duration, Instant}; - -use futures::{future, Future}; -use tokio_current_thread::CurrentThread; -use tokio_reactor::Reactor; -use tokio_timer::timer::{self, Timer}; - -/// Creates a "runtime". -/// -/// This is similar to running `tokio::runtime::current_thread::Runtime::new()`. -fn run>(f: F) -> Result<(), IoError> { - // We need a reactor to receive events about IO objects from kernel - let reactor = Reactor::new()?; - let reactor_handle = reactor.handle(); - // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the - // reactor pick up some new external events. - let timer = Timer::new(reactor); - let timer_handle = timer.handle(); - // And now put a single-threaded executor on top of the timer. When there are no futures ready - // to do something, it'll let the timer or the reactor generate some new stimuli for the - // futures to continue in their life. - let mut executor = CurrentThread::new_with_park(timer); - // Binds an executor to this thread - let mut enter = tokio_executor::enter().expect("Multiple executors at once"); - // This will set the default handle and timer to use inside the closure and run the future. - tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { - timer::with_default(&timer_handle, enter, |enter| { - // The TaskExecutor is a fake executor that looks into the current single-threaded - // executor when used. This is a trick, because we need two mutable references to the - // executor (one to run the provided future, another to install as the default one). We - // use the fake one here as the default one. - let mut default_executor = tokio_current_thread::TaskExecutor::current(); - tokio_executor::with_default(&mut default_executor, enter, |enter| { - let mut executor = executor.enter(enter); - // Run the provided future - executor.block_on(f).unwrap(); - // Run all the other futures that are still left in the executor - executor.run().unwrap(); - }); - }); - }); - Ok(()) -} - -fn main() -> Result<(), Box> { - run(future::lazy(|| { - // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn(). - // It also can use the default reactor and create timeouts. - - // Connect somewhere. And then do nothing with it. Yes, useless. - // - // This will use the default reactor which runs in the current thread. - let connect = tokio::net::TcpStream::connect(&"127.0.0.1:53".parse().unwrap()) - .map(|_| println!("Connected")) - .map_err(|e| println!("Failed to connect: {}", e)); - // We can spawn it without requiring Send. This would panic if we run it outside of the - // `run` (or outside of anything else) - tokio_current_thread::spawn(connect); - - // We can also create timeouts. - let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5)) - .map(|()| println!("5 seconds are over")) - .map_err(|e| println!("Failed to wait: {}", e)); - // We can spawn on the default executor, which is also the local one. - tokio::executor::spawn(deadline); - Ok(()) - }))?; - Ok(()) -} diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs deleted file mode 100644 index 94a60648..00000000 --- a/examples/print_each_packet.rs +++ /dev/null @@ -1,150 +0,0 @@ -//! A "print-each-packet" server with Tokio -//! -//! This server will create a TCP listener, accept connections in a loop, and -//! put down in the stdout everything that's read off of each TCP connection. -//! -//! Because the Tokio runtime uses a thread pool, each TCP connection is -//! processed concurrently with all other TCP connections across multiple -//! threads. -//! -//! To see this server in action, you can run this in one terminal: -//! -//! cargo run --example print\_each\_packet -//! -//! and in another terminal you can run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! Each line you type in to the `connect` terminal should be written to terminal! -//! -//! Minimal js example: -//! -//! ```js -//! var net = require("net"); -//! -//! var listenPort = 8080; -//! -//! var server = net.createServer(function (socket) { -//! socket.on("data", function (bytes) { -//! console.log("bytes", bytes); -//! }); -//! -//! socket.on("end", function() { -//! console.log("Socket received FIN packet and closed connection"); -//! }); -//! socket.on("error", function (error) { -//! console.log("Socket closed with error", error); -//! }); -//! -//! socket.on("close", function (with_error) { -//! if (with_error) { -//! console.log("Socket closed with result: Err(SomeError)"); -//! } else { -//! console.log("Socket closed with result: Ok(())"); -//! } -//! }); -//! -//! }); -//! -//! server.listen(listenPort); -//! -//! console.log("Listening on:", listenPort); -//! ``` -//! - -#![deny(warnings)] - -extern crate tokio; -extern crate tokio_codec; - -use tokio::codec::Decoder; -use tokio::net::TcpListener; -use tokio::prelude::*; -use tokio_codec::BytesCodec; - -use std::env; -use std::net::SocketAddr; - -fn main() -> Result<(), Box> { - // Allow passing an address to listen on as the first argument of this - // program, but otherwise we'll just set up our TCP listener on - // 127.0.0.1:8080 for connections. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - - // Next up we create a TCP listener which will listen for incoming - // connections. This TCP listener is bound to the address we determined - // above and must be associated with an event loop, so we pass in a handle - // to our event loop. After the socket's created we inform that we're ready - // to go and start accepting connections. - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // Here we convert the `TcpListener` to a stream of incoming connections - // with the `incoming` method. We then define how to process each element in - // the stream with the `for_each` method. - // - // This combinator, defined on the `Stream` trait, will allow us to define a - // computation to happen for all items on the stream (in this case TCP - // connections made to the server). The return value of the `for_each` - // method is itself a future representing processing the entire stream of - // connections, and ends up being our server. - let done = socket - .incoming() - .map_err(|e| println!("failed to accept socket; error = {:?}", e)) - .for_each(move |socket| { - // Once we're inside this closure this represents an accepted client - // from our server. The `socket` is the client connection (similar to - // how the standard library operates). - // - // We're parsing each socket with the `BytesCodec` included in `tokio_io`, - // and then we `split` each codec into the reader/writer halves. - // - // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html - let framed = BytesCodec::new().framed(socket); - let (_writer, reader) = framed.split(); - - let processor = reader - .for_each(|bytes| { - println!("bytes: {:?}", bytes); - Ok(()) - }) - // After our c