diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples/udp-codec.rs | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples/udp-codec.rs')
-rw-r--r-- | examples/udp-codec.rs | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs new file mode 100644 index 00000000..baf64886 --- /dev/null +++ b/examples/udp-codec.rs @@ -0,0 +1,78 @@ +//! This example leverages `BytesCodec` to create a UDP client and server which +//! speak a custom protocol. +//! +//! Here we're using the codec from `tokio-codec` to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + +#![warn(rust_2018_idioms)] + +use tokio::future::FutureExt as TokioFutureExt; +use tokio::io; +use tokio::net::UdpSocket; +use tokio_util::codec::BytesCodec; +use tokio_util::udp::UdpFramed; + +use bytes::Bytes; +use futures::{FutureExt, SinkExt, StreamExt}; +use std::env; +use std::error::Error; +use std::net::SocketAddr; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string()); + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr).await?; + let b = UdpSocket::bind(&addr).await?; + + let b_addr = b.local_addr()?; + + let mut a = UdpFramed::new(a, BytesCodec::new()); + let mut b = UdpFramed::new(b, BytesCodec::new()); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + let a = ping(&mut a, b_addr); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b = pong(&mut b); + + // Run both futures simultaneously of `a` and `b` sending messages back and forth. + match futures::future::try_join(a, b).await { + Err(e) => println!("an error occured; error = {:?}", e), + _ => println!("done!"), + } + + Ok(()) +} + +async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<(), io::Error> { + socket.send((Bytes::from(&b"PING"[..]), b_addr)).await?; + + for _ in 0..4usize { + let (bytes, addr) = socket.next().map(|e| e.unwrap()).await?; + + println!("[a] recv: {}", String::from_utf8_lossy(&bytes)); + + socket.send((Bytes::from(&b"PING"[..]), addr)).await?; + } + + Ok(()) +} + +async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> { + let timeout = Duration::from_millis(200); + + while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await { + println!("[b] recv: {}", String::from_utf8_lossy(&bytes)); + + socket.send((Bytes::from(&b"PONG"[..]), addr)).await?; + } + + Ok(()) +} |