From cfc15617a5247ea780c32c85b7134b88b6de5845 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 22 Oct 2019 10:13:49 -0700 Subject: codec: move into tokio-util (#1675) Related to #1318, Tokio APIs that are "less stable" are moved into a new `tokio-util` crate. This crate will mirror `tokio` and provide additional APIs that may require a greater rate of breaking changes. As examples require `tokio-util`, they are moved into a separate crate (`examples`). This has the added advantage of being able to avoid example only dependencies in the `tokio` crate. --- Cargo.toml | 5 +- README.md | 2 +- azure-pipelines.yml | 4 +- ci/patch.toml | 2 +- examples/Cargo.toml | 52 ++ examples/README.md | 6 + examples/chat.rs | 261 +++++++++ examples/connect.rs | 207 +++++++ examples/echo-udp.rs | 69 +++ examples/echo.rs | 77 +++ examples/hello_world.rs | 33 ++ examples/print_each_packet.rs | 104 ++++ examples/proxy.rs | 67 +++ examples/tinydb.rs | 224 +++++++ examples/tinyhttp.rs | 299 ++++++++++ examples/udp-client.rs | 72 +++ examples/udp-codec.rs | 78 +++ tokio-codec/CHANGELOG.md | 35 -- tokio-codec/Cargo.toml | 37 -- tokio-codec/LICENSE | 25 - tokio-codec/README.md | 13 - tokio-codec/src/bytes_codec.rs | 40 -- tokio-codec/src/decoder.rs | 154 ----- tokio-codec/src/encoder.rs | 22 - tokio-codec/src/framed.rs | 329 ----------- tokio-codec/src/framed_read.rs | 226 -------- tokio-codec/src/framed_write.rs | 290 ---------- tokio-codec/src/length_delimited.rs | 960 ------------------------------ tokio-codec/src/lib.rs | 44 -- tokio-codec/src/lines_codec.rs | 223 ------- tokio-codec/src/macros.rs | 7 - tokio-codec/tests/codecs.rs | 216 ------- tokio-codec/tests/framed.rs | 96 --- tokio-codec/tests/framed_read.rs | 294 ---------- tokio-codec/tests/framed_write.rs | 174 ------ tokio-codec/tests/length_delimited.rs | 761 ------------------------ tokio-net/Cargo.toml | 1 - tokio-net/src/process/mod.rs | 9 +- tokio-net/src/udp/frame.rs | 189 ------ tokio-net/src/udp/mod.rs | 2 - tokio-net/src/udp/socket.rs | 29 +- tokio-net/src/udp/split.rs | 8 +- tokio-net/src/uds/frame.rs | 175 ------ tokio-net/src/uds/mod.rs | 17 +- tokio-net/tests/process_stdio.rs | 13 +- tokio-net/tests/udp.rs | 76 +-- tokio-util/CHANGELOG.md | 0 tokio-util/Cargo.toml | 38 ++ tokio-util/LICENSE | 25 + tokio-util/README.md | 13 + tokio-util/src/codec/bytes_codec.rs | 41 ++ tokio-util/src/codec/decoder.rs | 154 +++++ tokio-util/src/codec/encoder.rs | 22 + tokio-util/src/codec/framed.rs | 327 +++++++++++ tokio-util/src/codec/framed_read.rs | 226 ++++++++ tokio-util/src/codec/framed_write.rs | 288 +++++++++ tokio-util/src/codec/length_delimited.rs | 962 +++++++++++++++++++++++++++++++ tokio-util/src/codec/lines_codec.rs | 224 +++++++ tokio-util/src/codec/macros.rs | 7 + tokio-util/src/codec/mod.rs | 37 ++ tokio-util/src/lib.rs | 17 + tokio-util/src/udp/frame.rs | 173 ++++++ tokio-util/src/udp/mod.rs | 4 + tokio-util/tests/codecs.rs | 217 +++++++ tokio-util/tests/framed.rs | 96 +++ tokio-util/tests/framed_read.rs | 294 ++++++++++ tokio-util/tests/framed_write.rs | 174 ++++++ tokio-util/tests/length_delimited.rs | 761 ++++++++++++++++++++++++ tokio-util/tests/udp.rs | 79 +++ tokio/Cargo.toml | 5 +- tokio/README.md | 2 +- tokio/examples/README.md | 6 - tokio/examples/chat.rs | 259 --------- tokio/examples/connect.rs | 208 ------- tokio/examples/echo-udp.rs | 69 --- tokio/examples/echo.rs | 77 --- tokio/examples/hello_world.rs | 33 -- tokio/examples/print_each_packet.rs | 105 ---- tokio/examples/proxy.rs | 67 --- tokio/examples/tinydb.rs | 226 -------- tokio/examples/tinyhttp.rs | 299 ---------- tokio/examples/udp-client.rs | 72 --- tokio/examples/udp-codec.rs | 81 --- tokio/src/net.rs | 4 +- 84 files changed, 5779 insertions(+), 5940 deletions(-) create mode 100644 examples/Cargo.toml create mode 100644 examples/README.md create mode 100644 examples/chat.rs create mode 100644 examples/connect.rs create mode 100644 examples/echo-udp.rs create mode 100644 examples/echo.rs create mode 100644 examples/hello_world.rs create mode 100644 examples/print_each_packet.rs create mode 100644 examples/proxy.rs create mode 100644 examples/tinydb.rs create mode 100644 examples/tinyhttp.rs create mode 100644 examples/udp-client.rs create mode 100644 examples/udp-codec.rs delete mode 100644 tokio-codec/CHANGELOG.md delete mode 100644 tokio-codec/Cargo.toml delete mode 100644 tokio-codec/LICENSE delete mode 100644 tokio-codec/README.md delete mode 100644 tokio-codec/src/bytes_codec.rs delete mode 100644 tokio-codec/src/decoder.rs delete mode 100644 tokio-codec/src/encoder.rs delete mode 100644 tokio-codec/src/framed.rs delete mode 100644 tokio-codec/src/framed_read.rs delete mode 100644 tokio-codec/src/framed_write.rs delete mode 100644 tokio-codec/src/length_delimited.rs delete mode 100644 tokio-codec/src/lib.rs delete mode 100644 tokio-codec/src/lines_codec.rs delete mode 100644 tokio-codec/src/macros.rs delete mode 100644 tokio-codec/tests/codecs.rs delete mode 100644 tokio-codec/tests/framed.rs delete mode 100644 tokio-codec/tests/framed_read.rs delete mode 100644 tokio-codec/tests/framed_write.rs delete mode 100644 tokio-codec/tests/length_delimited.rs delete mode 100644 tokio-net/src/udp/frame.rs delete mode 100644 tokio-net/src/uds/frame.rs create mode 100644 tokio-util/CHANGELOG.md create mode 100644 tokio-util/Cargo.toml create mode 100644 tokio-util/LICENSE create mode 100644 tokio-util/README.md create mode 100644 tokio-util/src/codec/bytes_codec.rs create mode 100644 tokio-util/src/codec/decoder.rs create mode 100644 tokio-util/src/codec/encoder.rs create mode 100644 tokio-util/src/codec/framed.rs create mode 100644 tokio-util/src/codec/framed_read.rs create mode 100644 tokio-util/src/codec/framed_write.rs create mode 100644 tokio-util/src/codec/length_delimited.rs create mode 100644 tokio-util/src/codec/lines_codec.rs create mode 100644 tokio-util/src/codec/macros.rs create mode 100644 tokio-util/src/codec/mod.rs create mode 100644 tokio-util/src/lib.rs create mode 100644 tokio-util/src/udp/frame.rs create mode 100644 tokio-util/src/udp/mod.rs create mode 100644 tokio-util/tests/codecs.rs create mode 100644 tokio-util/tests/framed.rs create mode 100644 tokio-util/tests/framed_read.rs create mode 100644 tokio-util/tests/framed_write.rs create mode 100644 tokio-util/tests/length_delimited.rs create mode 100644 tokio-util/tests/udp.rs delete mode 100644 tokio/examples/README.md delete mode 100644 tokio/examples/chat.rs delete mode 100644 tokio/examples/connect.rs delete mode 100644 tokio/examples/echo-udp.rs delete mode 100644 tokio/examples/echo.rs delete mode 100644 tokio/examples/hello_world.rs delete mode 100644 tokio/examples/print_each_packet.rs delete mode 100644 tokio/examples/proxy.rs delete mode 100644 tokio/examples/tinydb.rs delete mode 100644 tokio/examples/tinyhttp.rs delete mode 100644 tokio/examples/udp-client.rs delete mode 100644 tokio/examples/udp-codec.rs diff --git a/Cargo.toml b/Cargo.toml index 5883131b..da766769 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "tokio", - "tokio-codec", "tokio-executor", "tokio-io", "tokio-macros", @@ -10,5 +9,9 @@ members = [ "tokio-sync", "tokio-test", "tokio-tls", + "tokio-util", + + # Internal + "examples", "build-tests", ] diff --git a/README.md b/README.md index 32e160b7..c2aa98af 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box> { ``` -More examples can be found [here](tokio/examples). Note that the `master` branch +More examples can be found [here](examples). Note that the `master` branch is currently being updated to use `async` / `await`. The examples are not fully ported. Examples for stable Tokio can be found [here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples). diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b943bfe6..2acdb522 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -26,7 +26,6 @@ jobs: cross: true crates: tokio: - - codec - fs - io - net @@ -61,7 +60,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-codec: [] tokio-executor: - current-thread - thread-pool @@ -71,6 +69,8 @@ jobs: - async-traits tokio-macros: [] tokio-test: [] + tokio-util: [] + examples: [] # Test compilation failure - template: ci/azure-test-stable.yml diff --git a/ci/patch.toml b/ci/patch.toml index e6dc9148..6d739341 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -2,10 +2,10 @@ # repository. [patch.crates-io] tokio = { path = "tokio" } -tokio-codec = { path = "tokio-codec" } tokio-executor = { path = "tokio-executor" } tokio-io = { path = "tokio-io" } tokio-macros = { path = "tokio-macros" } tokio-net = { path = "tokio-net" } tokio-sync = { path = "tokio-sync" } tokio-tls = { path = "tokio-tls" } +tokio-util = { path = "tokio-util" } diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 00000000..84a546f7 --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "examples" +version = "0.0.0" +publish = false +edition = "2018" + +[dev-dependencies] +tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } +tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } + +bytes = "0.4.12" +futures-preview = "=0.3.0-alpha.19" + +[[example]] +name = "chat" +path = "chat.rs" + +[[example]] +name = "connect" +path = "connect.rs" + +[[example]] +name = "echo-udp" +path = "echo-udp.rs" + +[[example]] +name = "echo" +path = "echo.rs" + +[[example]] +name = "hello_world" +path = "hello_world.rs" + +[[example]] +name = "print_each_packet" +path = "print_each_packet.rs" + +[[example]] +name = "proxy" +path = "proxy.rs" + +[[example]] +name = "tinydb" +path = "tinydb.rs" + +[[example]] +name = "udp-client" +path = "udp-client.rs" + +[[example]] +name = "udp-codec" +path = "udp-codec.rs" diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..802d0aa4 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,6 @@ +## Examples of how to use Tokio + +The `master` branch is currently being updated to use `async` / `await`. +The examples are not fully ported. Examples for stable Tokio can be +found +[here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples). diff --git a/examples/chat.rs b/examples/chat.rs new file mode 100644 index 00000000..0a3976d5 --- /dev/null +++ b/examples/chat.rs @@ -0,0 +1,261 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, Mutex}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; + +use futures::{Poll, SinkExt, Stream, StreamExt}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string()); + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let mut listener = TcpListener::bind(&addr).await?; + + println!("server running on {}", addr); + + loop { + // Asynchronously wait for an inbound TcpStream. + let (stream, addr) = listener.accept().await?; + + // Clone a handle to the `Shared` state for the new connection. + let state = Arc::clone(&state); + + // Spawn our handler to be run asynchronously. + tokio::spawn(async move { + if let Err(e) = process(state, stream, addr).await { + println!("an error occured; error = {:?}", e); + } + }); + } +} + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap, +} + +/// The state for each connected client. +struct Peer { + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Framed, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } + + /// Send a `LineCodec` encoded message to every peer, except + /// for the sender. + async fn broadcast( + &mut self, + sender: SocketAddr, + message: &str, + ) -> Result<(), mpsc::error::UnboundedSendError> { + for peer in self.peers.iter_mut() { + if *peer.0 != sender { + peer.1.send(message.into()).await?; + } + } + + Ok(()) + } +} + +impl Peer { + /// Create a new instance of `Peer`. + async fn new( + state: Arc>, + lines: Framed, + ) -> io::Result { + // Get the client socket address + let addr = lines.get_ref().peer_addr()?; + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded_channel(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().await.peers.insert(addr, tx); + + Ok(Peer { lines, rx }) + } +} + +#[derive(Debug)] +enum Message { + /// A message that should be broadcasted to others. + Broadcast(String), + + /// A message that should be received by a client + Received(String), +} + +// Peer implements `Stream` in a way that polls both the `Rx`, and `Framed` types. +// A message is produced whenever an event is ready until the `Framed` stream returns `None`. +impl Stream for Peer { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // First poll the `UnboundedReceiver`. + + if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) { + return Poll::Ready(Some(Ok(Message::Received(v)))); + } + + // Secondly poll the `Framed` stream. + let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx)); + + Poll::Ready(match result { + // We've received a message we should broadcast to others. + Some(Ok(message)) => Some(Ok(Message::Broadcast(message))), + + // An error occured. + Some(Err(e)) => Some(Err(e)), + + // The stream has been exhausted. + None => None, + }) + } +} + +/// Process an individual chat client +async fn process( + state: Arc>, + stream: TcpStream, + addr: SocketAddr, +) -> Result<(), Box> { + let mut lines = Framed::new(stream, LinesCodec::new()); + + // Send a prompt to the client to enter their username. + lines + .send(String::from("Please enter your username:")) + .await?; + + // Read the first line from the `LineCodec` stream to get the username. + let username = match lines.next().await { + Some(Ok(line)) => line, + // We didn't get a line so we return early here. + _ => { + println!("Failed to get username from {}. Client disconnected.", addr); + return Ok(()); + } + }; + + // Register our peer with state which internally sets up some channels. + let mut peer = Peer::new(state.clone(), lines).await?; + + // A client has connected, let's let everyone know. + { + let mut state = state.lock().await; + let msg = format!("{} has joined the chat", username); + println!("{}", msg); + state.broadcast(addr, &msg).await?; + } + + // Process incoming messages until our stream is exhausted by a disconnect. + while let Some(result) = peer.next().await { + match result { + // A message was received from the current user, we should + // broadcast this message to the other users. + Ok(Message::Broadcast(msg)) => { + let mut state = state.lock().await; + let msg = format!("{}: {}", username, msg); + + state.broadcast(addr, &msg).await?; + } + // A message was received from a peer. Send it to the + // current user. + Ok(Message::Received(msg)) => { + peer.lines.send(msg).await?; + } + Err(e) => { + println!( + "an error occured while processing messages for {}; error = {:?}", + username, e + ); + } + } + } + + // If this section is reached it means that the client was disconnected! + // Let's let everyone still connected know about it. + { + let mut state = state.lock().await; + state.peers.remove(&addr); + + let msg = format!("{} has left the chat", username); + println!("{}", msg); + state.broadcast(addr, &msg).await?; + } + + Ok(()) +} diff --git a/examples/connect.rs b/examples/connect.rs new file mode 100644 index 00000000..0dd14ef2 --- /dev/null +++ b/examples/connect.rs @@ -0,0 +1,207 @@ +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. +//! +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. +//! +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. + +#![warn(rust_2018_idioms)] + +use tokio::io; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +use futures::{SinkExt, Stream}; +use std::env; +use std::error::Error; +use std::net::SocketAddr; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + run().await.unwrap(); + tx.send(()).unwrap(); + }); + + rx.await.map_err(Into::into) +} + +// Currently, we need to spawn the initial future due to https://github.com/tokio-rs/tokio/issues/1356 +async fn run() -> Result<(), Box> { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + + // Parse what address we're going to connect to + let addr = match args.first() { + Some(addr) => addr, + None => Err("this program requires at least one argument")?, + }; + let addr = addr.parse::()?; + + let stdin = stdin(); + let stdout = FramedWrite::new(io::stdout(), codec::Bytes); + + if tcp { + tcp::connect(&addr, stdin, stdout).await?; + } else { + udp::connect(&addr, stdin, stdout).await?; + } + + Ok(()) +} + +// Temporary work around for stdin blocking the stream +fn stdin() -> impl Stream, io::Error>> + Unpin { + let mut stdin = FramedRead::new(io::stdin(), codec::Bytes); + + let (mut tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + tx.send_all(&mut stdin).await.unwrap(); + }); + + rx +} + +mod tcp { + use super::codec; + use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use std::{error::Error, io, net::SocketAddr}; + use tokio::net::TcpStream; + use tokio_util::codec::{FramedRead, FramedWrite}; + + pub async fn connect( + addr: &SocketAddr, + stdin: impl Stream, io::Error>> + Unpin, + mut stdout: impl Sink, Error = io::Error> + Unpin, + ) -> Result<(), Box> { + let mut stream = TcpStream::connect(addr).await?; + let (r, w) = stream.split(); + let sink = FramedWrite::new(w, codec::Bytes); + let mut stream = FramedRead::new(r, codec::Bytes).filter_map(|i| match i { + Ok(i) => future::ready(Some(i)), + Err(e) => { + println!("failed to read from socket; error={}", e); + future::ready(None) + } + }); + + match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + (Err(e), _) | (_, Err(e)) => Err(e.into()), + _ => Ok(()), + } + } +} + +mod udp { + use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use std::{error::Error, io, net::SocketAddr}; + use tokio::net::udp::{ + split::{UdpSocketRecvHalf, UdpSocketSendHalf}, + UdpSocket, + }; + + pub async fn connect( + addr: &SocketAddr, + stdin: impl Stream, io::Error>> + Unpin, + stdout: impl Sink, Error = io::Error> + Unpin, + ) -> Result<(), Box> { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let bind_addr = if addr.ip().is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + }; + + let socket = UdpSocket::bind(&bind_addr).await?; + socket.connect(addr).await?; + let (mut r, mut w) = socket.split(); + + future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; + + Ok(()) + } + + async fn send( + mut stdin: impl Stream, io::Error>> + Unpin, + writer: &mut UdpSocketSendHalf, + ) -> Result<(), io::Error> { + while let Some(item) = stdin.next().await { + let buf = item?; + writer.send(&buf[..]).await?; + } + + Ok(()) + } + + async fn recv( + mut stdout: impl Sink, Error = io::Error> + Unpin, + reader: &mut UdpSocketRecvHalf, + ) -> Result<(), io::Error> { + loop { + let mut buf = vec![0; 1024]; + let n = reader.recv(&mut buf[..]).await?; + + if n > 0 { + stdout.send(buf).await?; + } + } + } +} + +mod codec { + use bytes::{BufMut, BytesMut}; + use std::io; + use tokio_util::codec::{Decoder, Encoder}; + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = Vec; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len).into_iter().collect())) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec; + type Error = io::Error; + + fn encode(&mut self, data: Vec, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs new file mode 100644 index 00000000..f1e8134d --- /dev/null +++ b/examples/echo-udp.rs @@ -0,0 +1,69 @@ +//! An UDP echo server that just sends back everything that it receives. +//! +//! If you're on Unix you can test this out by in one terminal executing: +//! +//! cargo run --example echo-udp +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect -- --udp 127.0.0.1:8080 +//! +//! Each line you type in to the `nc` terminal should be echo'd back to you! + +#![warn(rust_2018_idioms)] + +use std::error::Error; +use std::net::SocketAddr; +use std::{env, io}; +use tokio; +use tokio::net::UdpSocket; + +struct Server { + socket: UdpSocket, + buf: Vec, + to_send: Option<(usize, SocketAddr)>, +} + +impl Server { + async fn run(self) -> Result<(), io::Error> { + let Server { + mut socket, + mut buf, + mut to_send, + } = self; + + loop { + // First we check to see if there's a message we need to echo back. + // If so then we try to send it back to the original source, waiting + // until it's writable and we're able to do so. + if let Some((size, peer)) = to_send { + let amt = socket.send_to(&buf[..size], &peer).await?; + + println!("Echoed {}/{} bytes to {}", amt, size, peer); + } + + // If we're here then `to_send` is `None`, so we take a look for the + // next message we're going to echo back. + to_send = Some(socket.recv_from(&mut buf).await?); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + let socket = UdpSocket::bind(&addr).await?; + println!("Listening on: {}", socket.local_addr()?); + + let server = Server { + socket, + buf: vec![0; 1024], + to_send: None, + }; + + // This starts the server task. + server.run().await?; + + Ok(()) +} diff --git a/examples/echo.rs b/examples/echo.rs new file mode 100644 index 00000000..455aebde --- /dev/null +++ b/examples/echo.rs @@ -0,0 +1,77 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![warn(rust_2018_idioms)] + +use tokio; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use std::env; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop. + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + loop { + // Asynchronously wait for an inbound socket. + let (mut socket, _) = listener.accept().await?; + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + + tokio::spawn(async move { + let mut buf = [0; 1024]; + + // In a loop, read data from the socket and write the data back. + loop { + let n = socket + .read(&mut buf) + .await + .expect("failed to read data from socket"); + + if n == 0 { + return; + } + + socket + .write_all(&buf[0..n]) + .await + .expect("failed to write data to socket"); + } + }); + } +} diff --git a/examples/hello_world.rs b/examples/hello_world.rs new file mode 100644 index 00000000..8ff40902 --- /dev/null +++ b/examples/hello_world.rs @@ -0,0 +1,33 @@ +//! Hello world server. +//! +//! A simple client that opens a TCP stream, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! ncat -l 6142 +//! +//! And then in another terminal run: +//! +//! cargo run --example hello_world + +#![warn(rust_2018_idioms)] + +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +use std::error::Error; + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + // Open a TCP stream to the socket address. + // + // Note that this is the Tokio TcpStream, which is fully async. + let mut stream = TcpStream::connect("127.0.0.1:6142").await?; + println!("created stream"); + + let result = stream.write(b"hello world\n").await; + println!("wrote to stream; success={:?}", result.is_ok()); + + Ok(()) +} diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs new file mode 100644 index 00000000..0a275545 --- /dev/null +++ b/examples/print_each_packet.rs @@ -0,0 +1,104 @@ +//! A "print-each-packet" server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! put down in the stdout everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example print\_each\_packet +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be written to terminal! +//! +//! Minimal js example: +//! +//! ```js +//! var net = require("net"); +//! +//! var listenPort = 8080; +//! +//! var server = net.createServer(function (socket) { +//! socket.on("data", function (bytes) { +//! console.log("bytes", bytes); +//! }); +//! +//! socket.on("end", function() { +//! console.log("Socket received FIN packet and closed connection"); +//! }); +//! socket.on("error", function (error) { +//! console.log("Socket closed with error", error); +//! }); +//! +//! socket.on("close", function (with_error) { +//! if (with_error) { +//! console.log("Socket closed with result: Err(SomeError)"); +//! } else { +//! console.log("Socket closed with result: Ok(())"); +//! } +//! }); +//! +//! }); +//! +//! server.listen(listenPort); +//! +//! console.log("Listening on:", listenPort); +//! ``` +//! + +#![warn(rust_2018_idioms)] + +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_util::codec::{BytesCodec, Decoder}; + +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + loop { + // Asynchronously wait for an inbound socket. + let (socket, _) = listener.accept().await?; + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(async move { + // We're parsing each socket with the `BytesCodec` included in `tokio::codec`. + let mut framed = BytesCodec::new().framed(socket); + + // We loop while there are messages coming from the Stream `framed`. + // The stream will return None once the client disconnects. + while let Some(message) = framed.next().await { + match message { + Ok(bytes) => println!("bytes: {:?}", bytes), + Err(err) => println!("Socket closed with error: {:?}", err), + } + } + println!("Socket received FIN packet and closed connection"); + }); + } +} diff --git a/examples/proxy.rs b/examples/proxy.rs new file mode 100644 index 00000000..6886a813 --- /dev/null +++ b/examples/proxy.rs @@ -0,0 +1,67 @@ +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo run --example proxy +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! cargo run --example connect 127.0.0.1:8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +#![warn(rust_2018_idioms)] + +use futures::{future::try_join, FutureExt, StreamExt}; +use std::{env, error::Error}; +use tokio::{ + io::AsyncReadExt, + net::{TcpListener, TcpStream}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); + let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + + println!("Listening on: {}", listen_addr); + println!("Proxying to: {}", server_addr); + + let mut incoming = TcpListener::bind(listen_addr).await?.incoming(); + + while let Some(Ok(inbound)) = incoming.next().await { + let transfer = transfer(inbound, server_addr.clone()).map(|r| { + if let Err(e) = r { + println!("Failed to transfer; error={}", e); + } + }); + + tokio::spawn(transfer); + } + + Ok(()) +} + +async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box> { + let mut outbound = TcpStream::connect(proxy_addr).await?; + + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + + let client_to_server = ri.copy(&mut wo); + let server_to_client = ro.copy(&mut wi); + + try_join(client_to_server, server_to_client).await?; + + Ok(()) +} diff --git a/examples/tinydb.rs b/examples/tinydb.rs new file mode 100644 index 00000000..3fc88f6b --- /dev/null +++ b/examples/tinydb.rs @@ -0,0 +1,224 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![warn(rust_2018_idioms)] + +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LinesCodec}; + +use futures::{SinkExt, StreamExt}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::sync::{Arc, Mutex}; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. +struct Database { + map: Mutex>, +} + +/// Possible requests our clients can send us +enum Request { + Get { key: String }, + Set { key: String, value: String }, +} + +/// Responses to the `Request` commands above +enum Response { + Value { + key: String, + value: String, + }, + Set { + key: String, + value: String, + previous: Option, + }, + Error { + msg: String, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + // Create the shared state of this server that will be shared amongst all + // clients. We populate the initial database and then create the `Database` + // structure. Note the usage of `Arc` here which will be used to ensure that + // each independently spawned client will have a reference to the in-memory + // database. + let mut initial_db = HashMap::new(); + initial_db.insert("foo".to_string(), "bar".to_string()); + let db = Arc::new(Database { + map: Mutex::new(initial_db), + }); + + loop { + match listener.accept().await { + Ok((socket, _)) => { + // After getting a new connection first we see a clone of the database + // being created, which is creating a new reference for this connected + // client to use. + let db = db.clone(); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients. The `move` keyword is used + // here to move ownership of our db handle into the async closure. + tokio::spawn(async move { + // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec` + // to convert our stream of bytes, `socket`, into a `Stream` of lines + // as well as convert our line based responses into a stream of bytes. + let mut lines = Framed::new(socket, LinesCodec::new()); + + // Here for every line we get back from the `Framed` decoder, + // we parse the request, and if it's valid we generate a response + // based on the values in the database. + while let Some(result) = lines.next().await { + match result { + Ok(line) => { + let response = handle_request(&line, &db); + + let response = response.serialize(); + + if let Err(e) = lines.send(response).await { + println!("error on sending response; error = {:?}", e); + } + } + Err(e) => { + println!("error on decoding from socket; error = {:?}", e); + } + } + } + + // The connection will be closed at this point as `lines.next()` has returned `None`. + }); + } + Err(e) => println!("error accepting socket; error = {:?}", e), + } + } +} + +fn handle_request(line: &str, db: &Arc) -> Response { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => match db.get(&key) { + Some(value) => Response::Value { + key, + value: value.clone(), + }, + None => Response::Error { + msg: format!("no key {}", key), + }, + }, + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { + key, + value, + previous, + } + } + } +} + +impl Request { + fn parse(input: &str) -> Result { + let mut parts = input.splitn(3, " "); + match parts.next() { + Some("GET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("GET must be followed by a key")), + }; + if parts.next().is_some() { + return Err(format!("GET's key must not be followed by anything")); + } + Ok(Request::Get { + key: key.to_string(), + }) + } + Some("SET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("SET must be followed by a key")), + }; + let value = match parts.next() { + Some(value) => value, + None => return Err(format!("SET needs a value")), + }; + Ok(Request::Set { + key: key.to_string(), + value: value.to_string(), + }) + } + Some(cmd) => Err(format!("unknown command: {}", cmd)), + None => Err(format!("empty input")), + } + } +} + +impl Response { + fn serialize(&self) -> String { + match *self { + Response::Value { ref key, ref value } => format!("{} = {}", key, value), + Response::Set { + ref key, + ref value, + ref previous, + } => format!("set {} = `{}`, previous: {:?}", key, value, previous), + Response::Error { ref msg } => format!("error: {}", msg), + } + } +} diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs new file mode 100644 index 00000000..65074018 --- /dev/null +++ b/examples/tinyhttp.rs @@ -0,0 +1,299 @@ +//! A "tiny" example of HTTP request/response handling using transports. +//! +//! This example is intended for *learning purposes* to see how various pieces +//! hook up together and how HTTP can get up and running. Note that this example +//! is written with the restriction that it *can't* use any "big" library other +//! than Tokio, if you'd like a "real world" HTTP library you likely want a +//! crate like Hyper. +//! +//! Code here is based on the `echo-threads` example and implements two paths, +//! the `/plaintext` and `/json` routes to respond with some text and json, +//! respectively. By default this will run I/O on all the cores your system has +//! available, and it doesn't support HTTP request bodies. + +#![warn(rust_2018_idioms)] + +use bytes::BytesMut; +use futures::{SinkExt, StreamExt}; +use http::{header::HeaderValue, Request, Response, StatusCode}; +use serde::Serialize; +use std::{env, error::Error, fmt, io}; +use tokio::{ + codec::{Decoder, Encoder, Framed}, + net::{TcpListener, TcpStream}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse the arguments, bind the TCP socket we'll be listening to, spin up + // our worker threads, and start shipping sockets to those worker threads. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + let mut incoming = TcpListener::bind(&addr).await?.incoming(); + println!("Listening on: {}", addr); + + while let Some(Ok(stream)) = incoming.next().await { + tokio::spawn(async move { + if let Err(e) = process(stream).await { + println!("failed to process connection; error = {}", e); + } + }); + } + + Ok(()) +} + +async fn process(stream: TcpStream) -> Result<(), Box> { + let mut transport = Framed::new(stream, Http); + + while let Some(request) = transport.next().await { + match request { + Ok(request) => { + let response = respond(request).await?; + transport.send(response).await?; + } + Err(e) => return Err(e.into()), + } + } + + Ok(()) +} + +async fn respond(req: Request<()>) -> Result, Box> { + let mut response = Response::builder(); + let body = match req.uri().path() { + "/plaintext" => { + response.header("Content-Type", "text/plain"); + "Hello, World!".to_string() + } + "/json" => { + response.header("Content-Type", "application/json"); + + #[derive(Serialize)] + struct Message { + message: &'static str, + } + serde_json::to_string(&Message { + message: "Hello, World!", + })? + } + _ => { + response.status(StatusCode::NOT_FOUND); + String::new() + } + }; + let response = response + .body(body) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok(response) +} + +struct Http; + +/// Implementation of encoding an HTTP response into a `BytesMut`, basically +/// just writing out an HTTP/1.1 response. +impl Encoder for Http { + type Item = Response; + type Error = io::Error; + + fn encode(&mut self, item: Response, dst: &mut BytesMut) -> io::Result<()> { + use std::fmt::Write; + + write!( + BytesWrite(dst), + "\ + HTTP/1.1 {}\r\n\ + Server: Example\r\n\ + Content-Length: {}\r\n\ + Date: {}\r\n\ + ", + item.status(), + item.body().len(), + date::now() + ) + .unwrap(); + + for (k, v) in item.headers() { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + + dst.extend_from_slice(b"\r\n"); + dst.extend_from_slice(item.body().as_bytes()); + + return Ok(()); + + // Right now `write!` on `Vec` goes through io::Write and is not + // super speedy, so inline a less-crufty implementation here which + // doesn't go through io::Error. + struct BytesWrite<'a>(&'a mut BytesMut); + + impl fmt::Write for BytesWrite<'_> { + fn write_str(&mut self, s: &str) -> fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { + fmt::write(self, args) + } + } + } +} + +/// Implementation of decoding an HTTP request from the bytes we've read so far. +/// This leverages the `httparse` crate to do the actual parsing and then we use +/// that information to construct an instance of a `http::Request` object, +/// trying to avoid allocations where possible. +impl Decoder for Http { + type Item = Request<()>; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result>> { + // TODO: we should grow this headers array if parsing fails and asks + // for more headers + let mut headers = [None; 16]; + let (method, path, version, amt) = { + let mut parsed_headers = [httparse::EMPTY_HEADER; 16]; + let mut r = httparse::Request::new(&mut parsed_headers); + let status = r.parse(src).map_err(|e| { + let msg = format!("failed to parse http request: {:?}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let amt = match status { + httparse::Status::Complete(amt) => amt, + httparse::Status::Partial => return Ok(None), + }; + + let toslice = |a: &[u8]| { + let start = a.as_ptr() as usize - src.as_ptr() as usize; + assert!(start < src.len()); + (start, start + a.len()) + }; + + for (i, header) in r.headers.iter().enumerate() { + let k = toslice(header.name.as_bytes()); + let v = toslice(header.value); + headers[i] = Some((k, v)); + } + + ( + toslice(r.method.unwrap().as_bytes()), + toslice(r.path.unwrap().as_bytes()), + r.version.unwrap(), + amt, + ) + }; + if version != 1 { + return Err(io::Error::new( + io::ErrorKind::Other, + "only HTTP/1.1 accepted", + )); + } + let data = src.split_to(amt).freeze(); + let mut ret = Request::builder(); + ret.method(&data[method.0..method.1]); + ret.uri(data.slice(path.0, path.1)); + ret.version(http::Version::HTTP_11); + for header in headers.iter() { + let (k, v) = match *header { + Some((ref k, ref v)) => (k, v), + None => break, + }; + let value = unsafe { HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) }; + ret.header(&data[k.0..k.1], value); + } + + let req = ret + .body(()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(Some(req)) + } +} + +mod date { + use std::cell::RefCell; + use std::fmt::{self, Write}; + use std::str; + + use time::{self, Duration}; + + pub struct Now(()); + + /// Returns a struct, which when formatted, renders an appropriate `Date` + /// header value. + pub fn now() -> Now { + Now(()) + } + + // Gee Alex, doesn't this seem like premature optimization. Well you see + // there Billy, you're absolutely correct! If your server is *bottlenecked* + // on rendering the `Date` header, well then boy do I have news for you, you + // don't need this optimization. + // + // In all seriousness, though, a simple "hello world" benchmark which just + // sends back literally "hello world" with standard headers actually is + // bottlenecked on rendering a date into a byte buffer. Since it was at the + // top of a profile, and this was done for some competitive benchmarks, this + // module was written. + // + // Just to be clear, though, I was not intending on doing this because it + // really does seem kinda absurd, but it was done by someone else [1], so I + // blame them! :) + // + // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66 + + struct LastRenderedNow { + bytes: [u8; 128], + amt: usize, + next_update: time::Timespec, + } + + thread_local!(static LAST: RefCell = RefCell::new(LastRenderedNow { + bytes: [0; 128], + amt: 0, + next_update: time::Timespec::new(0, 0), + })); + + impl fmt::Display for Now { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + LAST.with(|cache| { + let mut cache = cache.borrow_mut(); + let now = time::get_time(); + if now >= cache.next_update { + cache.update(now); + } + f.write_str(cache.buffer()) + }) + } + } + + impl LastRenderedNow { + fn buffer(&self) -> &str { + str::from_utf8(&self.bytes[..self.amt]).unwrap() + } + + fn update(&mut self, now: time::Timespec) { + self.amt = 0; + write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap(); + self.next_update = now + Duration::seconds(1); + self.next_update.nsec = 0; + } + } + + struct LocalBuffer<'a>(&'a mut LastRenderedNow); + + impl fmt::Write for LocalBuffer<'_> { + fn write_str(&mut self, s: &str) -> fmt::Result { + let start = self.0.amt; + let end = start + s.len(); + self.0.bytes[start..end].copy_from_slice(s.as_bytes()); + self.0.amt += s.len(); + Ok(()) + } + } +} diff --git a/examples/udp-client.rs b/examples/udp-client.rs new file mode 100644 index 00000000..5437daf6 --- /dev/null +++ b/examples/udp-client.rs @@ -0,0 +1,72 @@ +//! A UDP client that just sends everything it gets via `stdio` in a single datagram, and then +//! waits for a reply. +//! +//! For the reasons of simplicity data from `stdio` is read until `EOF` in a blocking manner. +//! +//! You can test this out by running an echo server: +//! +//! ``` +//! $ cargo run --example echo-udp -- 127.0.0.1:8080 +//! ``` +//! +//! and running the client in another terminal: +//! +//! ``` +//! $ cargo run --example udp-client +//! ``` +//! +//! You can optionally provide any custom endpoint address for the client: +//! +//! ``` +//! $ cargo run --example udp-client -- 127.0.0.1:8080 +//! ``` +//! +//! Don't forget to pass `EOF` to the standard input of the client! +//! +//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken +//! connection the server needs to be run first, otherwise the client will block forever. + +#![warn(rust_2018_idioms)] + +use std::env; +use std::error::Error; +use std::io::{stdin, Read}; +use std::net::SocketAddr; +use tokio::net::UdpSocket; + +fn get_stdin_data() -> Result, Box> { + let mut buf = Vec::new(); + stdin().read_to_end(&mut buf)?; + Ok(buf) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let remote_addr: SocketAddr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".into()) + .parse()?; + + // We use port 0 to let the operating system allocate an available port for us. + let local_addr: SocketAddr = if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + } + .parse()?; + + let mut socket = UdpSocket::bind(local_addr).await?; + const MAX_DATAGRAM_SIZE: usize = 65_507; + socket.connect(&remote_addr).await?; + let data = get_stdin_data()?; + socket.send(&data).await?; + let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; + let len = socket.recv(&mut data).await?; + println!( + "Received {} bytes:\n{}", + len, + String::from_utf8_lossy(&data[..len]) + ); + + Ok(()) +} diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs new file mode 100644 index 00000000..baf64886 --- /dev/null +++ b/examples/udp-codec.rs @@ -0,0 +1,78 @@ +//! This example leverages `BytesCodec` to create a UDP client and server which +//! speak a custom protocol. +//! +//! Here we're using the codec from `tokio-codec` to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + +#![warn(rust_2018_idioms)] + +use tokio::future::FutureExt as TokioFutureExt; +use tokio::io; +use tokio::net::UdpSocket; +use tokio_util::codec::BytesCodec; +use tokio_util::udp::UdpFramed; + +use bytes::Bytes; +use futures::{FutureExt, SinkExt, StreamExt}; +use std::env; +use std::error::Error; +use std::net::SocketAddr; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string()); + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr).await?; + let b = UdpSocket::bind(&addr).await?; + + let b_addr = b.local_addr()?; + + let mut a = UdpFramed::new(a, BytesCodec::new()); + let mut b = UdpFramed::new(b, BytesCodec::new()); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + let a = ping(&mut a, b_addr); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b = pong(&mut b); + + // Run both futures simultaneously of `a` and `b` sending messages back and forth. + match futures::future::try_join(a, b).await { + Err(e) => println!("an error occured; error = {:?}", e), + _ => println!("done!"), + } + + Ok(()) +} + +async fn ping(socket: &mut UdpFramed, b_addr: SocketAddr) -> Result<(), io::Error> { + socket.send((Bytes::from(&b"PING"[..]), b_addr)).await?; + + for _ in 0..4usize { + let (bytes, addr) = socket.next().map(|e| e.unwrap()).await?; + + println!("[a] recv: {}", String::from_utf8_lossy(&bytes)); + + socket.send((Bytes::from(&b"PING"[..]), addr)).await?; + } + + Ok(()) +} + +async fn pong(socket: &mut UdpFramed) -> Result<(), io::Error> { + let timeout = Duration::from_millis(200); + + while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await { + println!("[b] recv: {}", String::from_utf8_lossy(&bytes)); + + socket.send((Bytes::from(&b"PONG"[..]), addr)).await?; + } + + Ok(()) +} diff --git a/tokio-codec/CHANGELOG.md b/tokio-codec/CHANGELOG.md deleted file mode 100644 index 2f0ca5f5..00000000 --- a/tokio-codec/CHANGELOG.md +++ /dev/null @@ -1,35 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -- Track tokio release - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 28, 2019) - -### Fix -- Infinite loop in `LinesCodec` (#1489). - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.1 (September 26, 2018) - -* Allow setting max line length with `LinesCodec` (#632) - -# 0.1.0 (June 13, 2018) - -* Initial release (#353) diff --git a/tokio-codec/Cargo.toml b/tokio-codec/Cargo.toml deleted file mode 100644 index 6c0d61f9..00000000 --- a/tokio-codec/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "tokio-codec" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors "] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-codec/0.2.0-alpha.6/tokio_codec" -description = """ -Utilities for encoding and