diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/Cargo.toml | 52 | ||||
-rw-r--r-- | examples/README.md | 6 | ||||
-rw-r--r-- | examples/chat.rs | 261 | ||||
-rw-r--r-- | examples/connect.rs | 207 | ||||
-rw-r--r-- | examples/echo-udp.rs | 69 | ||||
-rw-r--r-- | examples/echo.rs | 77 | ||||
-rw-r--r-- | examples/hello_world.rs | 33 | ||||
-rw-r--r-- | examples/print_each_packet.rs | 104 | ||||
-rw-r--r-- | examples/proxy.rs | 67 | ||||
-rw-r--r-- | examples/tinydb.rs | 224 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 299 | ||||
-rw-r--r-- | examples/udp-client.rs | 72 | ||||
-rw-r--r-- | examples/udp-codec.rs | 78 |
13 files changed, 1549 insertions, 0 deletions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 00000000..84a546f7 --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "examples" +version = "0.0.0" +publish = false +edition = "2018" + +[dev-dependencies] +tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } +tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } + +bytes = "0.4.12" +futures-preview = "=0.3.0-alpha.19" + +[[example]] +name = "chat" +path = "chat.rs" + +[[example]] +name = "connect" +path = "connect.rs" + +[[example]] +name = "echo-udp" +path = "echo-udp.rs" + +[[example]] +name = "echo" +path = "echo.rs" + +[[example]] +name = "hello_world" +path = "hello_world.rs" + +[[example]] +name = "print_each_packet" +path = "print_each_packet.rs" + +[[example]] +name = "proxy" +path = "proxy.rs" + +[[example]] +name = "tinydb" +path = "tinydb.rs" + +[[example]] +name = "udp-client" +path = "udp-client.rs" + +[[example]] +name = "udp-codec" +path = "udp-codec.rs" diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..802d0aa4 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,6 @@ +## Examples of how to use Tokio + +The `master` branch is currently being updated to use `async` / `await`. +The examples are not fully ported. Examples for stable Tokio can be +found +[here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples). diff --git a/examples/chat.rs b/examples/chat.rs new file mode 100644 index 00000000..0a3976d5 --- /dev/null +++ b/examples/chat.rs @@ -0,0 +1,261 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, Mutex}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; + +use futures::{Poll, SinkExt, Stream, StreamExt}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string()); + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let mut listener = TcpListener::bind(&addr).await?; + + println!("server running on {}", addr); + + loop { + // Asynchronously wait for an inbound TcpStream. + let (stream, addr) = listener.accept().await?; + + // Clone a handle to the `Shared` state for the new connection. + let state = Arc::clone(&state); + + // Spawn our handler to be run asynchronously. + tokio::spawn(async move { + if let Err(e) = process(state, stream, addr).await { + println!("an error occured; error = {:?}", e); + } + }); + } +} + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender<String>; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver<String>; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap<SocketAddr, Tx>, +} + +/// The state for each connected client. +struct Peer { + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Framed<TcpStream, LinesCodec>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } + + /// Send a `LineCodec` encoded message to every peer, except + /// for the sender. + async fn broadcast( + &mut self, + sender: SocketAddr, + message: &str, + ) -> Result<(), mpsc::error::UnboundedSendError> { + for peer in self.peers.iter_mut() { + if *peer.0 != sender { + peer.1.send(message.into()).await?; + } + } + + Ok(()) + } +} + +impl Peer { + /// Create a new instance of `Peer`. + async fn new( + state: Arc<Mutex<Shared>>, + lines: Framed<TcpStream, LinesCodec>, + ) -> io::Result<Peer> { + // Get the client socket address + let addr = lines.get_ref().peer_addr()?; + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded_channel(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().await.peers.insert(addr, tx); + + Ok(Peer { lines, rx }) + } +} + +#[derive(Debug)] +enum Message { + /// A message that should be broadcasted to others. + Broadcast(String), + + /// A message that should be received by a client + Received(String), +} + +// Peer implements `Stream` in a way that polls both the `Rx`, and `Framed` types. +// A message is produced whenever an event is ready until the `Framed` stream returns `None`. +impl Stream for Peer { + type Item = Result<Message, LinesCodecError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // First poll the `UnboundedReceiver`. + + if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) { + return Poll::Ready(Some(Ok(Message::Received(v)))); + } + + // Secondly poll the `Framed` stream. + let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx)); + + Poll::Ready(match result { + // We've received a message we should broadcast to others. + Some(Ok(message)) => Some(Ok(Message::Broadcast(message))), + + // An error occured. + Some(Err(e)) => Some(Err(e)), + + // The stream has been exhausted. + None => None, + }) + } +} + +/// Process an individual chat client +async fn process( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + addr: SocketAddr, +) -> Result<(), Box<dyn Error>> { + let mut lines = Framed::new(stream, LinesCodec::new()); + + // Send a prompt to the client to enter their username. + lines + .send(String::from("Please enter your username:")) + .await?; + + // Read the first line from the `LineCodec` stream to get the username. + let username = match lines.next().await { + Some(Ok(line)) => line, + // We didn't get a line so we return early here. + _ => { + println!("Failed to get username from {}. Client disconnected.", addr); + return Ok(()); + } + }; + + // Register our peer with state which internally sets up some channels. + let mut peer = Peer::new(state.clone(), lines).await?; + + // A client has connected, let's let everyone know. + { + let mut state = state.lock().await; + let msg = format!("{} has joined the chat", username); + println!("{}", msg); + state.broadcast(addr, &msg).await?; + } + + // Process incoming messages until our stream is exhausted by a disconnect. + while let Some(result) = peer.next().await { + match result { + // A message was received from the current user, we should + // broadcast this message to the other users. + Ok(Message::Broadcast(msg)) => { + let mut state = state.lock().await; + let msg = format!("{}: {}", username, msg); + + state.broadcast(addr, &msg).await?; + } + // A message was received from a peer. Send it to the + // current user. + Ok(Message::Received(msg)) => { + peer.lines.send(msg).await?; + } + Err(e) => { + println!( + "an error occured while processing messages for {}; error = {:?}", + username, e + ); + } + } + } + + // If this section is reached it means that the client was disconnected! + // Let's let everyone still connected know about it. + { + let mut state = state.lock().await; + state.peers.remove(&addr); + + let msg = format!("{} has left the chat", username); + println!("{}", msg); + state.broadcast(addr, &msg).await?; + } + + Ok(()) +} diff --git a/examples/connect.rs b/examples/connect.rs new file mode 100644 index 00000000..0dd14ef2 --- /dev/null +++ b/examples/connect.rs @@ -0,0 +1,207 @@ +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. +//! +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. +//! +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. + +#![warn(rust_2018_idioms)] + +use tokio::io; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +use futures::{SinkExt, Stream}; +use std::env; +use std::error::Error; +use std::net::SocketAddr; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + run().await.unwrap(); + tx.send(()).unwrap(); + }); + + rx.await.map_err(Into::into) +} + +// Currently, we need to spawn the initial future due to https://github.com/tokio-rs/tokio/issues/1356 +async fn run() -> Result<(), Box<dyn Error>> { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::<Vec<_>>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + + // Parse what address we're going to connect to + let addr = match args.first() { + Some(addr) => addr, + None => Err("this program requires at least one argument")?, + }; + let addr = addr.parse::<SocketAddr>()?; + + let stdin = stdin(); + let stdout = FramedWrite::new(io::stdout(), codec::Bytes); + + if tcp { + tcp::connect(&addr, stdin, stdout).await?; + } else { + udp::connect(&addr, stdin, stdout).await?; + } + + Ok(()) +} + +// Temporary work around for stdin blocking the stream +fn stdin() -> impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin { + let mut stdin = FramedRead::new(io::stdin(), codec::Bytes); + + let (mut tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + tx.send_all(&mut stdin).await.unwrap(); + }); + + rx +} + +mod tcp { + use super::codec; + use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use std::{error::Error, io, net::SocketAddr}; + use tokio::net::TcpStream; + use tokio_util::codec::{FramedRead, FramedWrite}; + + pub async fn connect( + addr: &SocketAddr, + stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + ) -> Result<(), Box<dyn Error>> { + let mut stream = TcpStream::connect(addr).await?; + let (r, w) = stream.split(); + let sink = FramedWrite::new(w, codec::Bytes); + let mut stream = FramedRead::new(r, codec::Bytes).filter_map(|i| match i { + Ok(i) => future::ready(Some(i)), + Err(e) => { + println!("failed to read from socket; error={}", e); + future::ready(None) + } + }); + + match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + (Err(e), _) | (_, Err(e)) => Err(e.into()), + _ => Ok(()), + } + } +} + +mod udp { + use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use std::{error::Error, io, net::SocketAddr}; + use tokio::net::udp::{ + split::{UdpSocketRecvHalf, UdpSocketSendHalf}, + UdpSocket, + }; + + pub async fn connect( + addr: &SocketAddr, + stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + ) -> Result<(), Box<dyn Error>> { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let bind_addr = if addr.ip().is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + }; + + let socket = UdpSocket::bind(&bind_addr).await?; + socket.connect(addr).await?; + let (mut r, mut w) = socket.split(); + + future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; + + Ok(()) + } + + async fn send( + mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + writer: &mut UdpSocketSendHalf, + ) -> Result<(), io::Error> { + while let Some(item) = stdin.next().await { + let buf = item?; + writer.send(&buf[..]).await?; + } + + Ok(()) + } + + async fn recv( + mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + reader: &mut UdpSocketRecvHalf, + ) -> Result<(), io::Error> { + loop { + let mut buf = vec![0; 1024]; + let n = reader.recv(&mut buf[..]).await?; + + if n > 0 { + stdout.send(buf).await?; + } + } + } +} + +mod codec { + use bytes::{BufMut, BytesMut}; + use std::io; + use tokio_util::codec::{Decoder, Encoder}; + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len).into_iter().collect())) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs new file mode 100644 index 00000000..f1e8134d --- /dev/null +++ b/examples/echo-udp.rs @@ -0,0 +1,69 @@ +//! An UDP echo server that just sends back everything that it receives. +//! +//! If you're on Unix you can test this out by in one terminal executing: +//! +//! cargo run --example echo-udp +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect -- --udp 127.0.0.1:8080 +//! +//! Each line you type in to the `nc` terminal should be echo'd back to you! + +#![warn(rust_2018_idioms)] + +use std::error::Error; +use std::net::SocketAddr; +use std::{env, io}; +use tokio; +use tokio::net::UdpSocket; + +struct Server { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl Server { + async fn run(self) -> Result<(), io::Error> { + let Server { + mut socket, + mut buf, + mut to_send, + } = self; + + loop { + // First we check to see if there's a message we need to echo back. + // If so then we try to send it back to the original source, waiting + // until it's writable and we're able to do so. + if let Some((size, peer)) = to_send { + let amt = socket.send_to(&buf[..size], &peer).await?; + + println!("Echoed {}/{} bytes to {}", amt, size, peer); + } + + // If we're here then `to_send` is `None`, so we take a look for the + // next message we're going to echo back. + to_send = Some(socket.recv_from(&mut buf).await?); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + let socket = UdpSocket::bind(&addr).await?; + println!("Listening on: {}", socket.local_addr()?); + + let server = Server { + socket, + buf: vec![0; 1024], + to_send: None, + }; + + // This starts the server task. + server.run().await?; + + Ok(()) +} diff --git a/examples/echo.rs b/examples/echo.rs new file mode 100644 index 00000000..455aebde --- /dev/null +++ b/examples/echo.rs @@ -0,0 +1,77 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![warn(rust_2018_idioms)] + +use tokio; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use std::env; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop. + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + loop { + // Asynchronously wait for an inbound socket. + let (mut socket, _) = listener.accept().await?; + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + + tokio::spawn(async move { + let mut buf = [0; 1024]; + + // In a loop, read data from the socket and write the data back. + loop { + let n = socket + .read(&mut buf) + .await + .expect("failed to read data from socket"); + + if n == 0 { + return; + } + + socket + .write_all(&buf[0..n]) + .await + .expect("failed to write data to socket"); + } + }); + } +} diff --git a/examples/hello_world.rs b/examples/hello_world.rs new file mode 100644 index 00000000..8ff40902 --- /dev/null +++ b/examples/hello_world.rs @@ -0,0 +1,33 @@ +//! Hello world server. +//! +//! A simple client that opens a TCP stream, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! ncat -l 6142 +//! +//! And then in another terminal run: +//! +//! cargo run --example hello_world + +#![warn(rust_2018_idioms)] + +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +use std::error::Error; + +#[tokio::main] +pub async fn main() -> Result<(), Box<dyn Error>> { + // Open a TCP stream to the socket address. + // + // Note that this is the Tokio TcpStream, which is fully async. + let mut stream = TcpStream::connect("127.0.0.1:6142").await?; + println!("created stream"); + + let result = stream.write(b"hello world\n").await; + println!("wrote to stream; success={:?}", result.is_ok()); + + Ok(()) +} diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs new file mode 100644 index 00000000..0a275545 --- /dev/null +++ b/examples/print_each_packet.rs @@ -0,0 +1,104 @@ +//! A "print-each-packet" server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! put down in the stdout everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example print\_each\_packet +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be written to terminal! +//! +//! Minimal js example: +//! +//! ```js +//! var net = require("net"); +//! +//! var listenPort = 8080; +//! +//! var server = net.createServer(function (socket) { +//! socket.on("data", function (bytes) { +//! console.log("bytes", bytes); +//! }); +//! +//! socket.on("end", function() { +//! console.log("Socket received FIN packet and closed connection"); +//! }); +//! socket.on("error", function (error) { +//! console.log("Socket closed with error", error); +//! }); +//! +//! socket.on("close", function (with_error) { +//! if (with_error) { +//! console.log("Socket closed with result: Err(SomeError)"); +//! } else { +//! console.log("Socket closed with result: Ok(())"); +//! } +//! }); +//! +//! }); +//! +//! server.listen(listenPort); +//! +//! console.log("Listening on:", listenPort); +//! ``` +//! + +#![warn(rust_2018_idioms)] + +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_util::codec::{BytesCodec, Decoder}; + +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + loop { + // Asynchronously wait for an inbound socket. + let (socket, _) = listener.accept().await?; + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(async move { + // We're parsing each socket with the `BytesCodec` included in `tokio::codec`. + let mut framed = BytesCodec::new().framed(socket); + + // We loop while there are messages coming from the Stream `framed`. + // The stream will return None once the client disconnects. + while let Some(message) = framed.next().await { + match message { + Ok(bytes) => println!("bytes: {:?}", bytes), + Err(err) => println!("Socket closed with error: {:?}", err), + } + } + println!("Socket received FIN packet and closed connection"); + }); + } +} diff --git a/examples/proxy.rs b/examples/proxy.rs new file mode 100644 index 00000000..6886a813 --- /dev/null +++ b/examples/proxy.rs @@ -0,0 +1,67 @@ +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo run --example proxy +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! cargo run --example connect 127.0.0.1:8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +#![warn(rust_2018_idioms)] + +use futures::{future::try_join, FutureExt, StreamExt}; +use std::{env, error::Error}; +use tokio::{ + io::AsyncReadExt, + net::{TcpListener, TcpStream}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); + let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + + println!("Listening on: {}", listen_addr); + println!("Proxying to: {}", server_addr); + + let mut incoming = TcpListener::bind(listen_addr).await?.incoming(); + + while let Some(Ok(inbound)) = incoming.next().await { + let transfer = transfer(inbound, server_addr.clone()).map(|r| { + if let Err(e) = r { + println!("Failed to transfer; error={}", e); + } + }); + + tokio::spawn(transfer); + } + + Ok(()) +} + +async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> { + let mut outbound = TcpStream::connect(proxy_addr).await?; + + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + + let client_to_server = ri.copy(&mut wo); + let server_to_client = ro.copy(&mut wi); + + try_join(client_to_server, server_to_client).await?; + + Ok(()) +} diff --git a/examples/tinydb.rs b/examples/tinydb.rs new file mode 100644 index 00000000..3fc88f6b --- /dev/null +++ b/examples/tinydb.rs @@ -0,0 +1,224 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![warn(rust_2018_idioms)] + +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LinesCodec}; + +use futures::{SinkExt, StreamExt}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::sync::{Arc, Mutex}; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. |