summaryrefslogtreecommitdiffstats
path: root/examples/udp-codec.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-22 10:13:49 -0700
committerGitHub <noreply@github.com>2019-10-22 10:13:49 -0700
commitcfc15617a5247ea780c32c85b7134b88b6de5845 (patch)
treeef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples/udp-codec.rs
parentb8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff)
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new `tokio-util` crate. This crate will mirror `tokio` and provide additional APIs that may require a greater rate of breaking changes. As examples require `tokio-util`, they are moved into a separate crate (`examples`). This has the added advantage of being able to avoid example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples/udp-codec.rs')
-rw-r--r--examples/udp-codec.rs78
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
new file mode 100644
index 00000000..baf64886
--- /dev/null
+++ b/examples/udp-codec.rs
@@ -0,0 +1,78 @@
+//! This example leverages `BytesCodec` to create a UDP client and server which
+//! speak a custom protocol.
+//!
+//! Here we're using the codec from `tokio-codec` to convert a UDP socket to a stream of
+//! client messages. These messages are then processed and returned back as a
+//! new message with a new destination. Overall, we then use this to construct a
+//! "ping pong" pair where two sockets are sending messages back and forth.
+
+#![warn(rust_2018_idioms)]
+
+use tokio::future::FutureExt as TokioFutureExt;
+use tokio::io;
+use tokio::net::UdpSocket;
+use tokio_util::codec::BytesCodec;
+use tokio_util::udp::UdpFramed;
+
+use bytes::Bytes;
+use futures::{FutureExt, SinkExt, StreamExt};
+use std::env;
+use std::error::Error;
+use std::net::SocketAddr;
+use std::time::Duration;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string());
+
+ // Bind both our sockets and then figure out what ports we got.
+ let a = UdpSocket::bind(&addr).await?;
+ let b = UdpSocket::bind(&addr).await?;
+
+ let b_addr = b.local_addr()?;
+
+ let mut a = UdpFramed::new(a, BytesCodec::new());
+ let mut b = UdpFramed::new(b, BytesCodec::new());
+
+ // Start off by sending a ping from a to b, afterwards we just print out
+ // what they send us and continually send pings
+ let a = ping(&mut a, b_addr);
+
+ // The second client we have will receive the pings from `a` and then send
+ // back pongs.
+ let b = pong(&mut b);
+
+ // Run both futures simultaneously of `a` and `b` sending messages back and forth.
+ match futures::future::try_join(a, b).await {
+ Err(e) => println!("an error occured; error = {:?}", e),
+ _ => println!("done!"),
+ }
+
+ Ok(())
+}
+
+async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<(), io::Error> {
+ socket.send((Bytes::from(&b"PING"[..]), b_addr)).await?;
+
+ for _ in 0..4usize {
+ let (bytes, addr) = socket.next().map(|e| e.unwrap()).await?;
+
+ println!("[a] recv: {}", String::from_utf8_lossy(&bytes));
+
+ socket.send((Bytes::from(&b"PING"[..]), addr)).await?;
+ }
+
+ Ok(())
+}
+
+async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
+ let timeout = Duration::from_millis(200);
+
+ while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await {
+ println!("[b] recv: {}", String::from_utf8_lossy(&bytes));
+
+ socket.send((Bytes::from(&b"PONG"[..]), addr)).await?;
+ }
+
+ Ok(())
+}