diff options
author | Evan Cameron <cameron.evan@gmail.com> | 2020-10-22 12:58:00 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-22 09:58:00 -0700 |
commit | 358e4f9f8029b6b289f8ef5a54bd7c6eae5bf969 (patch) | |
tree | 9793f48ae6e239516a093f9d514daeaaba731e11 /tokio/tests | |
parent | adf822f5cc11acdeeae3cf119469a19c524e82b4 (diff) |
tokio: add back poll_* for udp (#2981)
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/udp.rs | 131 |
1 files changed, 130 insertions, 1 deletions
diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 0bea83aa..8b79cb85 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -1,8 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use futures::future::poll_fn; use std::sync::Arc; -use tokio::net::UdpSocket; +use tokio::{io::ReadBuf, net::UdpSocket}; const MSG: &[u8] = b"hello"; const MSG_LEN: usize = MSG.len(); @@ -25,6 +26,24 @@ async fn send_recv() -> std::io::Result<()> { } #[tokio::test] +async fn send_recv_poll() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + sender.connect(receiver.local_addr()?).await?; + receiver.connect(sender.local_addr()?).await?; + + poll_fn(|cx| sender.poll_send(cx, MSG)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?; + + assert_eq!(read.filled(), MSG); + Ok(()) +} + +#[tokio::test] async fn send_to_recv_from() -> std::io::Result<()> { let sender = UdpSocket::bind("127.0.0.1:0").await?; let receiver = UdpSocket::bind("127.0.0.1:0").await?; @@ -41,6 +60,79 @@ async fn send_to_recv_from() -> std::io::Result<()> { } #[tokio::test] +async fn send_to_recv_from_poll() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let receiver_addr = receiver.local_addr()?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), MSG); + assert_eq!(addr, sender.local_addr()?); + Ok(()) +} + +#[tokio::test] +async fn send_to_peek_from() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let receiver_addr = receiver.local_addr()?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + + // peek + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.peek_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + // peek + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.peek_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.recv_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + Ok(()) +} + +#[tokio::test] +async fn send_to_peek_from_poll() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let receiver_addr = receiver.local_addr()?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), MSG); + assert_eq!(addr, sender.local_addr()?); + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), MSG); + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + + poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + assert_eq!(read.filled(), MSG); + Ok(()) +} + +#[tokio::test] async fn split() -> std::io::Result<()> { let socket = UdpSocket::bind("127.0.0.1:0").await?; let s = Arc::new(socket); @@ -88,6 +180,43 @@ async fn split_chan() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn split_chan_poll() -> std::io::Result<()> { + // setup UdpSocket that will echo all sent items + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let addr = socket.local_addr().unwrap(); + let s = Arc::new(socket); + let r = s.clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000); + tokio::spawn(async move { + while let Some((bytes, addr)) = rx.recv().await { + poll_fn(|cx| s.poll_send_to(cx, &bytes, &addr)) + .await + .unwrap(); + } + }); + + tokio::spawn(async move { + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + loop { + let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap(); + tx.send((read.filled().to_vec(), addr)).await.unwrap(); + } + }); + + // test that we can send a value and get back some response + let sender = UdpSocket::bind("127.0.0.1:0").await?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, &addr)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?; + assert_eq!(read.filled(), MSG); + Ok(()) +} + // # Note // // This test is purposely written such that each time `sender` sends data on |