summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-22 10:13:49 -0700
committerGitHub <noreply@github.com>2019-10-22 10:13:49 -0700
commitcfc15617a5247ea780c32c85b7134b88b6de5845 (patch)
treeef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples
parentb8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff)
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new `tokio-util` crate. This crate will mirror `tokio` and provide additional APIs that may require a greater rate of breaking changes. As examples require `tokio-util`, they are moved into a separate crate (`examples`). This has the added advantage of being able to avoid example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples')
-rw-r--r--examples/Cargo.toml52
-rw-r--r--examples/README.md6
-rw-r--r--examples/chat.rs261
-rw-r--r--examples/connect.rs207
-rw-r--r--examples/echo-udp.rs69
-rw-r--r--examples/echo.rs77
-rw-r--r--examples/hello_world.rs33
-rw-r--r--examples/print_each_packet.rs104
-rw-r--r--examples/proxy.rs67
-rw-r--r--examples/tinydb.rs224
-rw-r--r--examples/tinyhttp.rs299
-rw-r--r--examples/udp-client.rs72
-rw-r--r--examples/udp-codec.rs78
13 files changed, 1549 insertions, 0 deletions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
new file mode 100644
index 00000000..84a546f7
--- /dev/null
+++ b/examples/Cargo.toml
@@ -0,0 +1,52 @@
+[package]
+name = "examples"
+version = "0.0.0"
+publish = false
+edition = "2018"
+
+[dev-dependencies]
+tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
+tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
+
+bytes = "0.4.12"
+futures-preview = "=0.3.0-alpha.19"
+
+[[example]]
+name = "chat"
+path = "chat.rs"
+
+[[example]]
+name = "connect"
+path = "connect.rs"
+
+[[example]]
+name = "echo-udp"
+path = "echo-udp.rs"
+
+[[example]]
+name = "echo"
+path = "echo.rs"
+
+[[example]]
+name = "hello_world"
+path = "hello_world.rs"
+
+[[example]]
+name = "print_each_packet"
+path = "print_each_packet.rs"
+
+[[example]]
+name = "proxy"
+path = "proxy.rs"
+
+[[example]]
+name = "tinydb"
+path = "tinydb.rs"
+
+[[example]]
+name = "udp-client"
+path = "udp-client.rs"
+
+[[example]]
+name = "udp-codec"
+path = "udp-codec.rs"
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 00000000..802d0aa4
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,6 @@
+## Examples of how to use Tokio
+
+The `master` branch is currently being updated to use `async` / `await`.
+The examples are not fully ported. Examples for stable Tokio can be
+found
+[here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples).
diff --git a/examples/chat.rs b/examples/chat.rs
new file mode 100644
index 00000000..0a3976d5
--- /dev/null
+++ b/examples/chat.rs
@@ -0,0 +1,261 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This example is explicitly more verbose than it has to be. This is to
+//! illustrate more concepts.
+//!
+//! A chat server for telnet clients. After a telnet client connects, the first
+//! line should contain the client's name. After that, all lines sent by a
+//! client are broadcasted to all other connected clients.
+//!
+//! Because the client is telnet, lines are delimited by "\r\n".
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat
+//!
+//! And then in another terminal run:
+//!
+//! telnet localhost 6142
+//!
+//! You can run the `telnet` command in any number of additional windows.
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![warn(rust_2018_idioms)]
+
+use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::{mpsc, Mutex};
+use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
+
+use futures::{Poll, SinkExt, Stream, StreamExt};
+use std::collections::HashMap;
+use std::env;
+use std::error::Error;
+use std::io;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ // Create the shared state. This is how all the peers communicate.
+ //
+ // The server task will hold a handle to this. For every new client, the
+ // `state` handle is cloned and passed into the task that processes the
+ // client connection.
+ let state = Arc::new(Mutex::new(Shared::new()));
+
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string());
+
+ // Bind a TCP listener to the socket address.
+ //
+ // Note that this is the Tokio TcpListener, which is fully async.
+ let mut listener = TcpListener::bind(&addr).await?;
+
+ println!("server running on {}", addr);
+
+ loop {
+ // Asynchronously wait for an inbound TcpStream.
+ let (stream, addr) = listener.accept().await?;
+
+ // Clone a handle to the `Shared` state for the new connection.
+ let state = Arc::clone(&state);
+
+ // Spawn our handler to be run asynchronously.
+ tokio::spawn(async move {
+ if let Err(e) = process(state, stream, addr).await {
+ println!("an error occured; error = {:?}", e);
+ }
+ });
+ }
+}
+
+/// Shorthand for the transmit half of the message channel.
+type Tx = mpsc::UnboundedSender<String>;
+
+/// Shorthand for the receive half of the message channel.
+type Rx = mpsc::UnboundedReceiver<String>;
+
+/// Data that is shared between all peers in the chat server.
+///
+/// This is the set of `Tx` handles for all connected clients. Whenever a
+/// message is received from a client, it is broadcasted to all peers by
+/// iterating over the `peers` entries and sending a copy of the message on each
+/// `Tx`.
+struct Shared {
+ peers: HashMap<SocketAddr, Tx>,
+}
+
+/// The state for each connected client.
+struct Peer {
+ /// The TCP socket wrapped with the `Lines` codec, defined below.
+ ///
+ /// This handles sending and receiving data on the socket. When using
+ /// `Lines`, we can work at the line level instead of having to manage the
+ /// raw byte operations.
+ lines: Framed<TcpStream, LinesCodec>,
+
+ /// Receive half of the message channel.
+ ///
+ /// This is used to receive messages from peers. When a message is received
+ /// off of this `Rx`, it will be written to the socket.
+ rx: Rx,
+}
+
+impl Shared {
+ /// Create a new, empty, instance of `Shared`.
+ fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+
+ /// Send a `LineCodec` encoded message to every peer, except
+ /// for the sender.
+ async fn broadcast(
+ &mut self,
+ sender: SocketAddr,
+ message: &str,
+ ) -> Result<(), mpsc::error::UnboundedSendError> {
+ for peer in self.peers.iter_mut() {
+ if *peer.0 != sender {
+ peer.1.send(message.into()).await?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Peer {
+ /// Create a new instance of `Peer`.
+ async fn new(
+ state: Arc<Mutex<Shared>>,
+ lines: Framed<TcpStream, LinesCodec>,
+ ) -> io::Result<Peer> {
+ // Get the client socket address
+ let addr = lines.get_ref().peer_addr()?;
+
+ // Create a channel for this peer
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ // Add an entry for this `Peer` in the shared state map.
+ state.lock().await.peers.insert(addr, tx);
+
+ Ok(Peer { lines, rx })
+ }
+}
+
+#[derive(Debug)]
+enum Message {
+ /// A message that should be broadcasted to others.
+ Broadcast(String),
+
+ /// A message that should be received by a client
+ Received(String),
+}
+
+// Peer implements `Stream` in a way that polls both the `Rx`, and `Framed` types.
+// A message is produced whenever an event is ready until the `Framed` stream returns `None`.
+impl Stream for Peer {
+ type Item = Result<Message, LinesCodecError>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // First poll the `UnboundedReceiver`.
+
+ if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) {
+ return Poll::Ready(Some(Ok(Message::Received(v))));
+ }
+
+ // Secondly poll the `Framed` stream.
+ let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx));
+
+ Poll::Ready(match result {
+ // We've received a message we should broadcast to others.
+ Some(Ok(message)) => Some(Ok(Message::Broadcast(message))),
+
+ // An error occured.
+ Some(Err(e)) => Some(Err(e)),
+
+ // The stream has been exhausted.
+ None => None,
+ })
+ }
+}
+
+/// Process an individual chat client
+async fn process(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ addr: SocketAddr,
+) -> Result<(), Box<dyn Error>> {
+ let mut lines = Framed::new(stream, LinesCodec::new());
+
+ // Send a prompt to the client to enter their username.
+ lines
+ .send(String::from("Please enter your username:"))
+ .await?;
+
+ // Read the first line from the `LineCodec` stream to get the username.
+ let username = match lines.next().await {
+ Some(Ok(line)) => line,
+ // We didn't get a line so we return early here.
+ _ => {
+ println!("Failed to get username from {}. Client disconnected.", addr);
+ return Ok(());
+ }
+ };
+
+ // Register our peer with state which internally sets up some channels.
+ let mut peer = Peer::new(state.clone(), lines).await?;
+
+ // A client has connected, let's let everyone know.
+ {
+ let mut state = state.lock().await;
+ let msg = format!("{} has joined the chat", username);
+ println!("{}", msg);
+ state.broadcast(addr, &msg).await?;
+ }
+
+ // Process incoming messages until our stream is exhausted by a disconnect.
+ while let Some(result) = peer.next().await {
+ match result {
+ // A message was received from the current user, we should
+ // broadcast this message to the other users.
+ Ok(Message::Broadcast(msg)) => {
+ let mut state = state.lock().await;
+ let msg = format!("{}: {}", username, msg);
+
+ state.broadcast(addr, &msg).await?;
+ }
+ // A message was received from a peer. Send it to the
+ // current user.
+ Ok(Message::Received(msg)) => {
+ peer.lines.send(msg).await?;
+ }
+ Err(e) => {
+ println!(
+ "an error occured while processing messages for {}; error = {:?}",
+ username, e
+ );
+ }
+ }
+ }
+
+ // If this section is reached it means that the client was disconnected!
+ // Let's let everyone still connected know about it.
+ {
+ let mut state = state.lock().await;
+ state.peers.remove(&addr);
+
+ let msg = format!("{} has left the chat", username);
+ println!("{}", msg);
+ state.broadcast(addr, &msg).await?;
+ }
+
+ Ok(())
+}
diff --git a/examples/connect.rs b/examples/connect.rs
new file mode 100644
index 00000000..0dd14ef2
--- /dev/null
+++ b/examples/connect.rs
@@ -0,0 +1,207 @@
+//! An example of hooking up stdin/stdout to either a TCP or UDP stream.
+//!
+//! This example will connect to a socket address specified in the argument list
+//! and then forward all data read on stdin to the server, printing out all data
+//! received on stdout. An optional `--udp` argument can be passed to specify
+//! that the connection should be made over UDP instead of TCP, translating each
+//! line entered on stdin to a UDP packet to be sent to the remote address.
+//!
+//! Note that this is not currently optimized for performance, especially
+//! around buffer management. Rather it's intended to show an example of
+//! working with a client.
+//!
+//! This example can be quite useful when interacting with the other examples in
+//! this repository! Many of them recommend running this as a simple "hook up
+//! stdin/stdout to a server" to get up and running.
+
+#![warn(rust_2018_idioms)]
+
+use tokio::io;
+use tokio::sync::{mpsc, oneshot};
+use tokio_util::codec::{FramedRead, FramedWrite};
+
+use futures::{SinkExt, Stream};
+use std::env;
+use std::error::Error;
+use std::net::SocketAddr;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ let (tx, rx) = oneshot::channel();
+ tokio::spawn(async move {
+ run().await.unwrap();
+ tx.send(()).unwrap();
+ });
+
+ rx.await.map_err(Into::into)
+}
+
+// Currently, we need to spawn the initial future due to https://github.com/tokio-rs/tokio/issues/1356
+async fn run() -> Result<(), Box<dyn Error>> {
+ // Determine if we're going to run in TCP or UDP mode
+ let mut args = env::args().skip(1).collect::<Vec<_>>();
+ let tcp = match args.iter().position(|a| a == "--udp") {
+ Some(i) => {
+ args.remove(i);
+ false
+ }
+ None => true,
+ };
+
+ // Parse what address we're going to connect to
+ let addr = match args.first() {
+ Some(addr) => addr,
+ None => Err("this program requires at least one argument")?,
+ };
+ let addr = addr.parse::<SocketAddr>()?;
+
+ let stdin = stdin();
+ let stdout = FramedWrite::new(io::stdout(), codec::Bytes);
+
+ if tcp {
+ tcp::connect(&addr, stdin, stdout).await?;
+ } else {
+ udp::connect(&addr, stdin, stdout).await?;
+ }
+
+ Ok(())
+}
+
+// Temporary work around for stdin blocking the stream
+fn stdin() -> impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin {
+ let mut stdin = FramedRead::new(io::stdin(), codec::Bytes);
+
+ let (mut tx, rx) = mpsc::unbounded_channel();
+
+ tokio::spawn(async move {
+ tx.send_all(&mut stdin).await.unwrap();
+ });
+
+ rx
+}
+
+mod tcp {
+ use super::codec;
+ use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use std::{error::Error, io, net::SocketAddr};
+ use tokio::net::TcpStream;
+ use tokio_util::codec::{FramedRead, FramedWrite};
+
+ pub async fn connect(
+ addr: &SocketAddr,
+ stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ ) -> Result<(), Box<dyn Error>> {
+ let mut stream = TcpStream::connect(addr).await?;
+ let (r, w) = stream.split();
+ let sink = FramedWrite::new(w, codec::Bytes);
+ let mut stream = FramedRead::new(r, codec::Bytes).filter_map(|i| match i {
+ Ok(i) => future::ready(Some(i)),
+ Err(e) => {
+ println!("failed to read from socket; error={}", e);
+ future::ready(None)
+ }
+ });
+
+ match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
+ (Err(e), _) | (_, Err(e)) => Err(e.into()),
+ _ => Ok(()),
+ }
+ }
+}
+
+mod udp {
+ use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use std::{error::Error, io, net::SocketAddr};
+ use tokio::net::udp::{
+ split::{UdpSocketRecvHalf, UdpSocketSendHalf},
+ UdpSocket,
+ };
+
+ pub async fn connect(
+ addr: &SocketAddr,
+ stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ ) -> Result<(), Box<dyn Error>> {
+ // We'll bind our UDP socket to a local IP/port, but for now we
+ // basically let the OS pick both of those.
+ let bind_addr = if addr.ip().is_ipv4() {
+ "0.0.0.0:0"
+ } else {
+ "[::]:0"
+ };
+
+ let socket = UdpSocket::bind(&bind_addr).await?;
+ socket.connect(addr).await?;
+ let (mut r, mut w) = socket.split();
+
+ future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;
+
+ Ok(())
+ }
+
+ async fn send(
+ mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ writer: &mut UdpSocketSendHalf,
+ ) -> Result<(), io::Error> {
+ while let Some(item) = stdin.next().await {
+ let buf = item?;
+ writer.send(&buf[..]).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn recv(
+ mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ reader: &mut UdpSocketRecvHalf,
+ ) -> Result<(), io::Error> {
+ loop {
+ let mut buf = vec![0; 1024];
+ let n = reader.recv(&mut buf[..]).await?;
+
+ if n > 0 {
+ stdout.send(buf).await?;
+ }
+ }
+ }
+}
+
+mod codec {
+ use bytes::{BufMut, BytesMut};
+ use std::io;
+ use tokio_util::codec::{Decoder, Encoder};
+
+ /// A simple `Codec` implementation that just ships bytes around.
+ ///
+ /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
+ /// just a convenient method for us to work with streams/sinks for now.
+ /// This'll just take any data read and interpret it as a "frame" and
+ /// conversely just shove data into the output location without looking at
+ /// it.
+ pub struct Bytes;
+
+ impl Decoder for Bytes {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len).into_iter().collect()))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+
+ impl Encoder for Bytes {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
+ buf.put(&data[..]);
+ Ok(())
+ }
+ }
+}
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
new file mode 100644
index 00000000..f1e8134d
--- /dev/null
+++ b/examples/echo-udp.rs
@@ -0,0 +1,69 @@
+//! An UDP echo server that just sends back everything that it receives.
+//!
+//! If you're on Unix you can test this out by in one terminal executing:
+//!
+//! cargo run --example echo-udp
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect -- --udp 127.0.0.1:8080
+//!
+//! Each line you type in to the `nc` terminal should be echo'd back to you!
+
+#![warn(rust_2018_idioms)]
+
+use std::error::Error;
+use std::net::SocketAddr;
+use std::{env, io};
+use tokio;
+use tokio::net::UdpSocket;
+
+struct Server {
+ socket: UdpSocket,
+ buf: Vec<u8>,
+ to_send: Option<(usize, SocketAddr)>,
+}
+
+impl Server {
+ async fn run(self) -> Result<(), io::Error> {
+ let Server {
+ mut socket,
+ mut buf,
+ mut to_send,
+ } = self;
+
+ loop {
+ // First we check to see if there's a message we need to echo back.
+ // If so then we try to send it back to the original source, waiting
+ // until it's writable and we're able to do so.
+ if let Some((size, peer)) = to_send {
+ let amt = socket.send_to(&buf[..size], &peer).await?;
+
+ println!("Echoed {}/{} bytes to {}", amt, size, peer);
+ }
+
+ // If we're here then `to_send` is `None`, so we take a look for the
+ // next message we're going to echo back.
+ to_send = Some(socket.recv_from(&mut buf).await?);
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+
+ let socket = UdpSocket::bind(&addr).await?;
+ println!("Listening on: {}", socket.local_addr()?);
+
+ let server = Server {
+ socket,
+ buf: vec![0; 1024],
+ to_send: None,
+ };
+
+ // This starts the server task.
+ server.run().await?;
+
+ Ok(())
+}
diff --git a/examples/echo.rs b/examples/echo.rs
new file mode 100644
index 00000000..455aebde
--- /dev/null
+++ b/examples/echo.rs
@@ -0,0 +1,77 @@
+//! A "hello world" echo server with Tokio
+//!
+//! This server will create a TCP listener, accept connections in a loop, and
+//! write back everything that's read off of each TCP connection.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! To see this server in action, you can run this in one terminal:
+//!
+//! cargo run --example echo
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! Each line you type in to the `connect` terminal should be echo'd back to
+//! you! If you open up multiple terminals running the `connect` example you
+//! should be able to see them all make progress simultaneously.
+
+#![warn(rust_2018_idioms)]
+
+use tokio;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::TcpListener;
+
+use std::env;
+use std::error::Error;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ // Allow passing an address to listen on as the first argument of this
+ // program, but otherwise we'll just set up our TCP listener on
+ // 127.0.0.1:8080 for connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+
+ // Next up we create a TCP listener which will listen for incoming
+ // connections. This TCP listener is bound to the address we determined
+ // above and must be associated with an event loop.
+ let mut listener = TcpListener::bind(&addr).await?;
+ println!("Listening on: {}", addr);
+
+ loop {
+ // Asynchronously wait for an inbound socket.
+ let (mut socket, _) = listener.accept().await?;
+
+ // And this is where much of the magic of this server happens. We
+ // crucially want all clients to make progress concurrently, rather than
+ // blocking one on completion of another. To achieve this we use the
+ // `tokio::spawn` function to execute the work in the background.
+ //
+ // Essentially here we're executing a new task to run concurrently,
+ // which will allow all of our clients to be processed concurrently.
+
+ tokio::spawn(async move {
+ let mut buf = [0; 1024];
+
+ // In a loop, read data from the socket and write the data back.
+ loop {
+ let n = socket
+ .read(&mut buf)
+ .await
+ .expect("failed to read data from socket");
+
+ if n == 0 {
+ return;
+ }
+
+ socket
+ .write_all(&buf[0..n])
+ .await
+ .expect("failed to write data to socket");
+ }
+ });
+ }
+}
diff --git a/examples/hello_world.rs b/examples/hello_world.rs
new file mode 100644
index 00000000..8ff40902
--- /dev/null
+++ b/examples/hello_world.rs
@@ -0,0 +1,33 @@
+//! Hello world server.
+//!
+//! A simple client that opens a TCP stream, writes "hello world\n", and closes
+//! the connection.
+//!
+//! You can test this out by running:
+//!
+//! ncat -l 6142
+//!
+//! And then in another terminal run:
+//!
+//! cargo run --example hello_world
+
+#![warn(rust_2018_idioms)]
+
+use tokio::io::AsyncWriteExt;
+use tokio::net::TcpStream;
+
+use std::error::Error;
+
+#[tokio::main]
+pub async fn main() -> Result<(), Box<dyn Error>> {
+ // Open a TCP stream to the socket address.
+ //
+ // Note that this is the Tokio TcpStream, which is fully async.
+ let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
+ println!("created stream");
+
+ let result = stream.write(b"hello world\n").await;
+ println!("wrote to stream; success={:?}", result.is_ok());
+
+ Ok(())
+}
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
new file mode 100644
index 00000000..0a275545
--- /dev/null
+++ b/examples/print_each_packet.rs
@@ -0,0 +1,104 @@
+//! A "print-each-packet" server with Tokio
+//!
+//! This server will create a TCP listener, accept connections in a loop, and
+//! put down in the stdout everything that's read off of each TCP connection.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! To see this server in action, you can run this in one terminal:
+//!
+//! cargo run --example print\_each\_packet
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! Each line you type in to the `connect` terminal should be written to terminal!
+//!
+//! Minimal js example:
+//!
+//! ```js
+//! var net = require("net");
+//!
+//! var listenPort = 8080;
+//!
+//! var server = net.createServer(function (socket) {
+//! socket.on("data", function (bytes) {
+//! console.log("bytes", bytes);
+//! });
+//!
+//! socket.on("end", function() {
+//! console.log("Socket received FIN packet and closed connection");
+//! });
+//! socket.on("error", function (error) {
+//! console.log("Socket closed with error", error);
+//! });
+//!
+//! socket.on("close", function (with_error) {
+//! if (with_error) {
+//! console.log("Socket closed with result: Err(SomeError)");
+//! } else {
+//! console.log("Socket closed with result: Ok(())");
+//! }
+//! });
+//!
+//! });
+//!
+//! server.listen(listenPort);
+//!
+//! console.log("Listening on:", listenPort);
+//! ```
+//!
+
+#![warn(rust_2018_idioms)]
+
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+use tokio_util::codec::{BytesCodec, Decoder};
+
+use std::env;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ // Allow passing an address to listen on as the first argument of this
+ // program, but otherwise we'll just set up our TCP listener on
+ // 127.0.0.1:8080 for connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+
+ // Next up we create a TCP listener which will listen for incoming
+ // connections. This TCP listener is bound to the address we determined
+ // above and must be associated with an event loop, so we pass in a handle
+ // to our event loop. After the socket's created we inform that we're ready
+ // to go and start accepting connections.
+ let mut listener = TcpListener::bind(&addr).await?;
+ println!("Listening on: {}", addr);
+
+ loop {
+ // Asynchronously wait for an inbound socket.
+ let (socket, _) = listener.accept().await?;
+
+ // And this is where much of the magic of this server happens. We
+ // crucially want all clients to make progress concurrently, rather than
+ // blocking one on completion of another. To achieve this we use the
+ // `tokio::spawn` function to execute the work in the background.
+ //
+ // Essentially here we're executing a new task to run concurrently,
+ // which will allow all of our clients to be processed concurrently.
+ tokio::spawn(async move {
+ // We're parsing each socket with the `BytesCodec` included in `tokio::codec`.
+ let mut framed = BytesCodec::new().framed(socket);
+
+ // We loop while there are messages coming from the Stream `framed`.
+ // The stream will return None once the client disconnects.
+ while let Some(message) = framed.next().await {
+ match message {
+ Ok(bytes) => println!("bytes: {:?}", bytes),
+ Err(err) => println!("Socket closed with error: {:?}", err),
+ }
+ }
+ println!("Socket received FIN packet and closed connection");
+ });
+ }
+}
diff --git a/examples/proxy.rs b/examples/proxy.rs
new file mode 100644
index 00000000..6886a813
--- /dev/null
+++ b/examples/proxy.rs
@@ -0,0 +1,67 @@
+//! A proxy that forwards data to another server and forwards that server's
+//! responses back to clients.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! You can showcase this by running this in one terminal:
+//!
+//! cargo run --example proxy
+//!
+//! This in another terminal
+//!
+//! cargo run --example echo
+//!
+//! And finally this in another terminal
+//!
+//! cargo run --example connect 127.0.0.1:8081
+//!
+//! This final terminal will connect to our proxy, which will in turn connect to
+//! the echo server, and you'll be able to see data flowing between them.
+
+#![warn(rust_2018_idioms)]
+
+use futures::{future::try_join, FutureExt, StreamExt};
+use std::{env, error::Error};
+use tokio::{
+ io::AsyncReadExt,
+ net::{TcpListener, TcpStream},
+};
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
+ let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
+
+ println!("Listening on: {}", listen_addr);
+ println!("Proxying to: {}", server_addr);
+
+ let mut incoming = TcpListener::bind(listen_addr).await?.incoming();
+
+ while let Some(Ok(inbound)) = incoming.next().await {
+ let transfer = transfer(inbound, server_addr.clone()).map(|r| {
+ if let Err(e) = r {
+ println!("Failed to transfer; error={}", e);
+ }
+ });
+
+ tokio::spawn(transfer);
+ }
+
+ Ok(())
+}
+
+async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
+ let mut outbound = TcpStream::connect(proxy_addr).await?;
+
+ let (mut ri, mut wi) = inbound.split();
+ let (mut ro, mut wo) = outbound.split();
+
+ let client_to_server = ri.copy(&mut wo);
+ let server_to_client = ro.copy(&mut wi);
+
+ try_join(client_to_server, server_to_client).await?;
+
+ Ok(())
+}
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
new file mode 100644
index 00000000..3fc88f6b
--- /dev/null
+++ b/examples/tinydb.rs
@@ -0,0 +1,224 @@
+//! A "tiny database" and accompanying protocol
+//!
+//! This example shows the usage of shared state amongst all connected clients,
+//! namely a database of key/value pairs. Each connected client can send a
+//! series of GET/SET commands to query the current value of a key or set the
+//! value of a key.
+//!
+//! This example has a simple protocol you can use to interact with the server.
+//! To run, first run this in one terminal window:
+//!
+//! cargo run --example tinydb
+//!
+//! and next in another windows run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! In the `connect` window you can type in commands where when you hit enter
+//! you'll get a response from the server for that command. An example session
+//! is:
+//!
+//!
+//! $ cargo run --example connect 127.0.0.1:8080
+//! GET foo
+//! foo = bar
+//! GET FOOBAR
+//! error: no key FOOBAR
+//! SET FOOBAR my awesome string
+//! set FOOBAR = `my awesome string`, previous: None
+//! SET foo tokio
+//! set foo = `tokio`, previous: Some("bar")
+//! GET foo
+//! foo = tokio
+//!
+//! Namely you can issue two forms of commands:
+//!
+//! * `GET $key` - this will fetch the value of `$key` from the database and
+//! return it. The server's database is initially populated with the key `foo`
+//! set to the value `bar`
+//! * `SET $key $value` - this will set the value of `$key` to `$value`,
+//! returning the previous value, if any.
+
+#![warn(rust_2018_idioms)]
+
+use tokio::net::TcpListener;
+use tokio_util::codec::{Framed, LinesCodec};
+
+use futures::{SinkExt, StreamExt};
+use std::collections::HashMap;
+use std::env;
+use std::error::Error;
+use std::sync::{Arc, Mutex};
+
+/// The in-memory database shared amongst all clients.
+///
+/// This database will be shared via `Arc`, so to mutate the internal map we're
+/// going to use a `Mutex` for interior mutability.