diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-25 12:50:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-25 12:50:15 -0700 |
commit | 227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch) | |
tree | 498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/tests | |
parent | 03a9378297c73c2e56a6d6b55db22b92427b850a (diff) |
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The `net` implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/tests')
27 files changed, 959 insertions, 7 deletions
diff --git a/tokio/tests/buffered.rs b/tokio/tests/buffered.rs index fe419099..5d77f866 100644 --- a/tokio/tests/buffered.rs +++ b/tokio/tests/buffered.rs @@ -1,5 +1,4 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "default")] use tokio::net::TcpListener; use tokio::prelude::*; diff --git a/tokio/tests/net_bind_resource.rs b/tokio/tests/net_bind_resource.rs new file mode 100644 index 00000000..bb507fcc --- /dev/null +++ b/tokio/tests/net_bind_resource.rs @@ -0,0 +1,11 @@ +use tokio::net::TcpListener; + +use std::convert::TryFrom; +use std::net; + +#[test] +#[should_panic] +fn no_runtime_panics_binding_net_tcp_listener() { + let listener = net::TcpListener::bind("127.0.0.1:0").expect("failed to bind listener"); + let _ = TcpListener::try_from(listener); +} diff --git a/tokio/tests/reactor.rs b/tokio/tests/net_driver.rs index f1ed8703..998bfcc9 100644 --- a/tokio/tests/reactor.rs +++ b/tokio/tests/net_driver.rs @@ -1,8 +1,7 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "default")] -use tokio_net::driver::Reactor; -use tokio_net::tcp::TcpListener; +use tokio::net::driver::Reactor; +use tokio::net::TcpListener; use tokio_test::{assert_ok, assert_pending}; use futures_util::task::{waker_ref, ArcWake}; @@ -66,7 +65,7 @@ fn test_drop_on_notify() { { let handle = reactor.handle(); - let _reactor = tokio_net::driver::set_default(&handle); + let _reactor = tokio::net::driver::set_default(&handle); let waker = waker_ref(&task); let mut cx = Context::from_waker(&waker); assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx)); diff --git a/tokio/tests/drop-core.rs b/tokio/tests/net_driver_drop.rs index f1906ab3..d52c04d1 100644 --- a/tokio/tests/drop-core.rs +++ b/tokio/tests/net_driver_drop.rs @@ -1,8 +1,7 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "default")] +use tokio::net::driver::{self, Reactor}; use tokio::net::TcpListener; -use tokio_net::driver::{self, Reactor}; use tokio_test::{assert_err, assert_pending, assert_ready, task}; #[test] diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs new file mode 100644 index 00000000..3631aa7b --- /dev/null +++ b/tokio/tests/process_issue_42.rs @@ -0,0 +1,56 @@ +#![cfg(feature = "process")] +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +use tokio::process::Command; +use tokio::runtime::current_thread; + +use futures_util::future::FutureExt; +use futures_util::stream::FuturesOrdered; +use std::process::Stdio; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +fn run_test() { + let finished = Arc::new(AtomicBool::new(false)); + let finished_clone = finished.clone(); + + thread::spawn(move || { + let mut rt = current_thread::Runtime::new().expect("failed to get runtime"); + let mut futures = FuturesOrdered::new(); + rt.block_on(async { + for i in 0..2 { + futures.push( + Command::new("echo") + .arg(format!("I am spawned process #{}", i)) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .unwrap() + .boxed(), + ) + } + }); + + drop(rt); + finished_clone.store(true, Ordering::SeqCst); + }); + + thread::sleep(Duration::from_millis(1000)); + assert!( + finished.load(Ordering::SeqCst), + "FINISHED flag not set, maybe we deadlocked?" + ); +} + +#[test] +fn issue_42() { + let max = 10; + for i in 0..max { + println!("running {}/{}", i, max); + run_test() + } +} diff --git a/tokio/tests/process_smoke.rs b/tokio/tests/process_smoke.rs new file mode 100644 index 00000000..a952f7f6 --- /dev/null +++ b/tokio/tests/process_smoke.rs @@ -0,0 +1,29 @@ +#![cfg(feature = "process")] +#![warn(rust_2018_idioms)] + +use tokio::process::Command; +use tokio_test::assert_ok; + +#[tokio::test] +async fn simple() { + let mut cmd; + + if cfg!(windows) { + cmd = Command::new("cmd"); + cmd.arg("/c"); + } else { + cmd = Command::new("sh"); + cmd.arg("-c"); + } + + let mut child = cmd.arg("exit 2").spawn().unwrap(); + + let id = child.id(); + assert!(id > 0); + + let status = assert_ok!((&mut child).await); + assert_eq!(status.code(), Some(2)); + + assert_eq!(child.id(), id); + drop(child.kill()); +} diff --git a/tokio/tests/signal_ctrl_c.rs b/tokio/tests/signal_ctrl_c.rs new file mode 100644 index 00000000..ea4efaa2 --- /dev/null +++ b/tokio/tests/signal_ctrl_c.rs @@ -0,0 +1,28 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal; +use tokio::sync::oneshot; + +#[tokio::test] +async fn ctrl_c() { + let ctrl_c = signal::ctrl_c().expect("failed to init ctrl_c"); + + let (fire, wait) = oneshot::channel(); + + // NB: simulate a signal coming in by exercising our signal handler + // to avoid complications with sending SIGINT to the test process + tokio::spawn(async { + wait.await.expect("wait failed"); + send_signal(libc::SIGINT); + }); + + let _ = fire.send(()); + let _ = ctrl_c.into_future().await; +} diff --git a/tokio/tests/signal_drop_recv.rs b/tokio/tests/signal_drop_recv.rs new file mode 100644 index 00000000..2a5c047f --- /dev/null +++ b/tokio/tests/signal_drop_recv.rs @@ -0,0 +1,22 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn drop_then_get_a_signal() { + let kind = SignalKind::user_defined1(); + let sig = signal(kind).expect("failed to create first signal"); + drop(sig); + + send_signal(libc::SIGUSR1); + let sig = signal(kind).expect("failed to create second signal"); + + let _ = sig.into_future().await; +} diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs new file mode 100644 index 00000000..22495e8e --- /dev/null +++ b/tokio/tests/signal_drop_rt.rs @@ -0,0 +1,37 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::runtime::current_thread::Runtime; +use tokio::signal::unix::{signal, SignalKind}; + +#[test] +fn dropping_loops_does_not_cause_starvation() { + let kind = SignalKind::user_defined1(); + + let mut first_rt = Runtime::new().expect("failed to init first runtime"); + let mut first_signal = + first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); + + let mut second_rt = Runtime::new().expect("failed to init second runtime"); + let mut second_signal = + second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); + + send_signal(libc::SIGUSR1); + + first_rt + .block_on(first_signal.next()) + .expect("failed to await first signal"); + + drop(first_rt); + drop(first_signal); + + send_signal(libc::SIGUSR1); + + second_rt.block_on(second_signal.next()); +} diff --git a/tokio/tests/signal_drop_signal.rs b/tokio/tests/signal_drop_signal.rs new file mode 100644 index 00000000..3cf5611f --- /dev/null +++ b/tokio/tests/signal_drop_signal.rs @@ -0,0 +1,26 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn dropping_signal_does_not_deregister_any_other_instances() { + let kind = SignalKind::user_defined1(); + + // Signals should not starve based on ordering + let first_duplicate_signal = signal(kind).expect("failed to register first duplicate signal"); + let sig = signal(kind).expect("failed to register signal"); + let second_duplicate_signal = signal(kind).expect("failed to register second duplicate signal"); + + drop(first_duplicate_signal); + drop(second_duplicate_signal); + + send_signal(libc::SIGUSR1); + let _ = sig.into_future().await; +} diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs new file mode 100644 index 00000000..9ecf70ed --- /dev/null +++ b/tokio/tests/signal_multi_rt.rs @@ -0,0 +1,47 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::runtime::current_thread::Runtime; +use tokio::signal::unix::{signal, SignalKind}; + +use std::sync::mpsc::channel; +use std::thread; + +#[test] +fn multi_loop() { + // An "ordinary" (non-future) channel + let (sender, receiver) = channel(); + // Run multiple times, to make sure there are no race conditions + for _ in 0..10 { + // Run multiple event loops, each one in its own thread + let threads: Vec<_> = (0..4) + .map(|_| { + let sender = sender.clone(); + thread::spawn(move || { + let mut rt = Runtime::new().unwrap(); + let _ = rt.block_on(async { + let signal = signal(SignalKind::hangup()).unwrap(); + sender.send(()).unwrap(); + signal.into_future().await + }); + }) + }) + .collect(); + // Wait for them to declare they're ready + for &_ in threads.iter() { + receiver.recv().unwrap(); + } + // Send a signal + send_signal(libc::SIGHUP); + // Make sure the threads terminated correctly + for t in threads { + t.join().unwrap(); + } + } +} diff --git a/tokio/tests/signal_no_rt.rs b/tokio/tests/signal_no_rt.rs new file mode 100644 index 00000000..b512e5e7 --- /dev/null +++ b/tokio/tests/signal_no_rt.rs @@ -0,0 +1,10 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +use tokio::signal::unix::{signal, SignalKind}; + +#[test] +#[should_panic] +fn no_runtime_panics_creating_signals() { + let _ = signal(SignalKind::hangup()); +} diff --git a/tokio/tests/signal_notify_both.rs b/tokio/tests/signal_notify_both.rs new file mode 100644 index 00000000..00385478 --- /dev/null +++ b/tokio/tests/signal_notify_both.rs @@ -0,0 +1,23 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal::unix::{signal, SignalKind}; + +use futures::future; + +#[tokio::test] +async fn notify_both() { + let kind = SignalKind::user_defined2(); + let signal1 = signal(kind).expect("failed to create signal1"); + + let signal2 = signal(kind).expect("failed to create signal2"); + + send_signal(libc::SIGUSR2); + let _ = future::join(signal1.into_future(), signal2.into_future()).await; +} diff --git a/tokio/tests/signal_twice.rs b/tokio/tests/signal_twice.rs new file mode 100644 index 00000000..d8e0facc --- /dev/null +++ b/tokio/tests/signal_twice.rs @@ -0,0 +1,25 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn twice() { + let kind = SignalKind::user_defined1(); + let mut sig = signal(kind).expect("failed to get signal"); + + for _ in 0..2 { + send_signal(libc::SIGUSR1); + + let (item, sig_next) = sig.into_future().await; + assert_eq!(item, Some(())); + + sig = sig_next; + } +} diff --git a/tokio/tests/signal_usr1.rs b/tokio/tests/signal_usr1.rs new file mode 100644 index 00000000..9b6a0dec --- /dev/null +++ b/tokio/tests/signal_usr1.rs @@ -0,0 +1,23 @@ +#![cfg(unix)] +#![warn(rust_2018_idioms)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::prelude::*; +use tokio::signal::unix::{signal, SignalKind}; +use tokio_test::assert_ok; + +#[tokio::test] +async fn signal_usr1() { + let signal = assert_ok!( + signal(SignalKind::user_defined1()), + "failed to create signal" + ); + + send_signal(libc::SIGUSR1); + + let _ = signal.into_future().await; +} diff --git a/tokio/tests/support/signal.rs b/tokio/tests/support/signal.rs new file mode 100644 index 00000000..ea060587 --- /dev/null +++ b/tokio/tests/support/signal.rs @@ -0,0 +1,7 @@ +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs new file mode 100644 index 00000000..09a2a603 --- /dev/null +++ b/tokio/tests/tcp_accept.rs @@ -0,0 +1,39 @@ +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +use std::net::{IpAddr, SocketAddr}; + +macro_rules! test_accept { + ($(($ident:ident, $target:expr),)*) => { + $( + #[tokio::test] + async fn $ident() { + let mut listener = assert_ok!(TcpListener::bind($target).await); + let addr = listener.local_addr().unwrap(); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, _) = assert_ok!(listener.accept().await); + assert_ok!(tx.send(socket)); + }); + + let cli = assert_ok!(TcpStream::connect(&addr).await); + let srv = assert_ok!(rx.await); + + assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap()); + } + )* + } +} + +test_accept! { + (ip_str, "127.0.0.1:0"), + (host_str, "localhost:0"), + (socket_addr, "127.0.0.1:0".parse::<SocketAddr>().unwrap()), + (str_port_tuple, ("127.0.0.1", 0)), + (ip_port_tuple, ("127.0.0.1".parse::<IpAddr>().unwrap(), 0)), +} diff --git a/tokio/tests/tcp_connect.rs b/tokio/tests/tcp_connect.rs new file mode 100644 index 00000000..daf0b3c9 --- /dev/null +++ b/tokio/tests/tcp_connect.rs @@ -0,0 +1,228 @@ +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +use futures::join; + +#[tokio::test] +async fn connect_v4() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv4()); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, addr) = assert_ok!(srv.accept().await); + assert_eq!(addr, assert_ok!(socket.peer_addr())); + assert_ok!(tx.send(socket)); + }); + + let mine = assert_ok!(TcpStream::connect(&addr).await); + let theirs = assert_ok!(rx.await); + + assert_eq!( + assert_ok!(mine.local_addr()), + assert_ok!(theirs.peer_addr()) + ); + assert_eq!( + assert_ok!(theirs.local_addr()), + assert_ok!(mine.peer_addr()) + ); +} + +#[tokio::test] +async fn connect_v6() { + let mut srv = assert_ok!(TcpListener::bind("[::1]:0").await); + let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv6()); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, addr) = assert_ok!(srv.accept().await); + assert_eq!(addr, assert_ok!(socket.peer_addr())); + assert_ok!(tx.send(socket)); + }); + + let mine = assert_ok!(TcpStream::connect(&addr).await); + let theirs = assert_ok!(rx.await); + + assert_eq!( + assert_ok!(mine.local_addr()), + assert_ok!(theirs.peer_addr()) + ); + assert_eq!( + assert_ok!(theirs.local_addr()), + assert_ok!(mine.peer_addr()) + ); +} + +#[tokio::test] +async fn connect_addr_ip_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_slice() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr[..]).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("localhost:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = (addr.ip(), addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("127.0.0.1", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("localhost", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +/* + * TODO: bring this back once TCP exposes HUP again + * +#[cfg(target_os = "linux")] +mod linux { + use tokio::net::{TcpListener, TcpStream}; + use tokio::prelude::*; + use tokio_test::assert_ok; + + use mio::unix::UnixReady; + + use futures_util::future::poll_fn; + use std::io::Write; + use std::time::Duration; + use std::{net, thread}; + + #[tokio::test] + fn poll_hup() { + let addr = assert_ok!("127.0.0.1:0".parse()); + let mut srv = assert_ok!(TcpListener::bind(&addr)); + let addr = assert_ok!(srv.local_addr()); + + tokio::spawn(async move { + let (mut client, _) = assert_ok!(srv.accept().await); + assert_ok!(client.set_linger(Some(Duration::from_millis(0)))); + assert_ok!(client.write_all(b"hello world").await); + + // TODO: Drop? + }); + + /* + let t = thread::spawn(move || { + let mut client = assert_ok!(srv.accept()).0; + client.set_linger(Some(Duration::from_millis(0))).unwrap(); + client.write(b"hello world").unwrap(); + thread::sleep(Duration::from_millis(200)); + }); + */ + + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + // Poll for HUP before reading. + future::poll_fn(|| stream.poll_read_ready(UnixReady::hup().into())) + .wait() + .unwrap(); + + // Same for write half + future::poll_fn(|| stream.poll_write_ready()) + .wait() + .unwrap(); + + let mut buf = vec![0; 11]; + + // Read the data + future::poll_fn(|| stream.poll_read(&mut buf)) + .wait() + .unwrap(); + + assert_eq!(b"hello world", &buf[..]); + + t.join().unwrap(); + } +} +*/ diff --git a/tokio/tests/tcp_echo.rs b/tokio/tests/tcp_echo.rs new file mode 100644 index 00000000..0325ecbb --- /dev/null +++ b/tokio/tests/tcp_echo.rs @@ -0,0 +1,41 @@ +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +#[tokio::test] +async fn echo_server() { + const ITER: usize = 1024; + + let (tx, rx) = oneshot::channel(); + + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + + let msg = "foo bar baz"; + tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + for _ in 0..ITER { + // write + assert_ok!(stream.write_all(msg.as_bytes()).await); + + // read + let mut buf = [0; 11]; + assert_ok!(stream.read_exact(&mut buf).await); + assert_eq!(&buf[..], msg.as_bytes()); + } + + assert_ok!(tx.send(())); + }); + + let (mut stream, _) = assert_ok!(srv.accept().await); + let (mut rd, mut wr) = stream.split(); + + let n = assert_ok!(rd.copy(&mut wr).await); + assert_eq!(n, (ITER * msg.len()) as u64); + + assert_ok!(rx.await); +} diff --git a/tokio/tests/tcp_peek.rs b/tokio/tests/tcp_peek.rs new file mode 100644 index 00000000..34f1ba3a --- /dev/null +++ b/tokio/tests/tcp_peek.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms)] + +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; + +use tokio_test::assert_ok; + +use std::thread; +use std::{convert::TryInto, io::Write, net}; + +#[tokio::test] +async fn peek() { + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let t = thread::spawn(move || assert_ok!(listener.accept()).0); + + let left = net::TcpStream::connect(&addr).unwrap(); + let mut right = t.join().unwrap(); + right.write(&[1, 2, 3, 4]).unwrap(); + + let mut left: TcpStream = left.try_into().unwrap(); + let mut buf = [0u8; 16]; + let n = assert_ok!(left.peek(&mut buf).await); + assert_eq!([1, 2, 3, 4], buf[..n]); + + let n = assert_ok!(left.read(&mut buf).await); + assert_eq!([1, 2, 3, 4], buf[..n]); +} diff --git a/tokio/tests/tcp_shutdown.rs b/tokio/tests/tcp_shutdown.rs new file mode 100644 index 00000000..7e0597c9 --- /dev/null +++ b/tokio/tests/tcp_shutdown.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms)] + +use tokio::io::AsyncWriteExt; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio_test::assert_ok; + +#[tokio::test] +async fn shutdown() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + + tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + assert_ok!(AsyncWriteExt::shutdown(&mut stream).await); + + let mut buf = [0; 1]; + let n = assert_ok!(stream.read(&mut buf).await); + assert_eq!(n, 0); + }); + + let (mut stream, _) = assert_ok!(srv.accept().await); + let (mut rd, mut wr) = stream.split(); + + let n = assert_ok!(rd.copy(&mut wr).await); + assert_eq!(n, 0); +} diff --git a/tokio/tests/tcp_split.rs b/tokio/tests/tcp_split.rs new file mode 100644 index 00000000..ae5f249c --- /dev/null +++ b/tokio/tests/tcp_split.rs @@ -0,0 +1 @@ +// TODO: write tests using TcpStream::split() diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs new file mode 100644 index 00000000..a2e4f3a6 --- /dev/null +++ b/tokio/tests/udp.rs @@ -0,0 +1,72 @@ +#![warn(rust_2018_idioms)] + +use tokio::net::UdpSocket; + +#[tokio::test] +async fn send_recv() -> std::io::Result<()> { + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + + sender.connect(receiver.local_addr()?).await?; + receiver.connect(sender.local_addr()?).await?; + + let message = b"hello!"; + sender.send(message).await?; + + let mut recv_buf = [0u8; 32]; + let len = receiver.recv(&mut recv_buf[..]).await?; + + assert_eq!(&recv_buf[..len], message); + Ok(()) +} + +#[tokio::test] +async fn send_to_recv_from() -> std::io::Result<()> { + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let message = b"hello!"; + let receiver_addr = receiver.local_addr()?; + sender.send_to(message, &receiver_addr).await?; + + let mut recv_buf = [0u8; 32]; + let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?; + + assert_eq!(&recv_buf[..len], message); + assert_eq!(addr, sender.local_addr()?); + Ok(()) +} + +#[tokio::test] +async fn split() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let (mut r, mut s) = socket.split(); + + let msg = b"hello"; + let addr = s.as_ref().local_addr()?; + tokio::spawn(async move { + s.send_to(msg, &addr).await.unwrap(); + }); + let mut recv_buf = [0u8; 32]; + let (len, _) = r.recv_from(&mut recv_buf[..]).await?; + assert_eq!(&recv_buf[..len], msg); + Ok(()) +} + +#[tokio::test] +async fn reunite() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let (s, r) = socket.split(); + assert!(s.reunite(r).is_ok()); + Ok(()) +} + +#[tokio::test] +async fn reunite_error() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let socket1 = UdpSocket::bind("127.0.0.1:0").await?; + let (s, _) = socket.split(); + let (_, r1) = socket1.split(); + assert!(s.reunite(r1).is_err()); + Ok(()) |