summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-25 12:50:15 -0700
committerGitHub <noreply@github.com>2019-10-25 12:50:15 -0700
commit227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch)
tree498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/tests
parent03a9378297c73c2e56a6d6b55db22b92427b850a (diff)
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `net` implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/buffered.rs1
-rw-r--r--tokio/tests/net_bind_resource.rs11
-rw-r--r--tokio/tests/net_driver.rs (renamed from tokio/tests/reactor.rs)7
-rw-r--r--tokio/tests/net_driver_drop.rs (renamed from tokio/tests/drop-core.rs)3
-rw-r--r--tokio/tests/process_issue_42.rs56
-rw-r--r--tokio/tests/process_smoke.rs29
-rw-r--r--tokio/tests/signal_ctrl_c.rs28
-rw-r--r--tokio/tests/signal_drop_recv.rs22
-rw-r--r--tokio/tests/signal_drop_rt.rs37
-rw-r--r--tokio/tests/signal_drop_signal.rs26
-rw-r--r--tokio/tests/signal_multi_rt.rs47
-rw-r--r--tokio/tests/signal_no_rt.rs10
-rw-r--r--tokio/tests/signal_notify_both.rs23
-rw-r--r--tokio/tests/signal_twice.rs25
-rw-r--r--tokio/tests/signal_usr1.rs23
-rw-r--r--tokio/tests/support/signal.rs7
-rw-r--r--tokio/tests/tcp_accept.rs39
-rw-r--r--tokio/tests/tcp_connect.rs228
-rw-r--r--tokio/tests/tcp_echo.rs41
-rw-r--r--tokio/tests/tcp_peek.rs28
-rw-r--r--tokio/tests/tcp_shutdown.rs28
-rw-r--r--tokio/tests/tcp_split.rs1
-rw-r--r--tokio/tests/udp.rs72
-rw-r--r--tokio/tests/uds_cred.rs29
-rw-r--r--tokio/tests/uds_datagram.rs69
-rw-r--r--tokio/tests/uds_split.rs42
-rw-r--r--tokio/tests/uds_stream.rs34
27 files changed, 959 insertions, 7 deletions
diff --git a/tokio/tests/buffered.rs b/tokio/tests/buffered.rs
index fe419099..5d77f866 100644
--- a/tokio/tests/buffered.rs
+++ b/tokio/tests/buffered.rs
@@ -1,5 +1,4 @@
#![warn(rust_2018_idioms)]
-#![cfg(feature = "default")]
use tokio::net::TcpListener;
use tokio::prelude::*;
diff --git a/tokio/tests/net_bind_resource.rs b/tokio/tests/net_bind_resource.rs
new file mode 100644
index 00000000..bb507fcc
--- /dev/null
+++ b/tokio/tests/net_bind_resource.rs
@@ -0,0 +1,11 @@
+use tokio::net::TcpListener;
+
+use std::convert::TryFrom;
+use std::net;
+
+#[test]
+#[should_panic]
+fn no_runtime_panics_binding_net_tcp_listener() {
+ let listener = net::TcpListener::bind("127.0.0.1:0").expect("failed to bind listener");
+ let _ = TcpListener::try_from(listener);
+}
diff --git a/tokio/tests/reactor.rs b/tokio/tests/net_driver.rs
index f1ed8703..998bfcc9 100644
--- a/tokio/tests/reactor.rs
+++ b/tokio/tests/net_driver.rs
@@ -1,8 +1,7 @@
#![warn(rust_2018_idioms)]
-#![cfg(feature = "default")]
-use tokio_net::driver::Reactor;
-use tokio_net::tcp::TcpListener;
+use tokio::net::driver::Reactor;
+use tokio::net::TcpListener;
use tokio_test::{assert_ok, assert_pending};
use futures_util::task::{waker_ref, ArcWake};
@@ -66,7 +65,7 @@ fn test_drop_on_notify() {
{
let handle = reactor.handle();
- let _reactor = tokio_net::driver::set_default(&handle);
+ let _reactor = tokio::net::driver::set_default(&handle);
let waker = waker_ref(&task);
let mut cx = Context::from_waker(&waker);
assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx));
diff --git a/tokio/tests/drop-core.rs b/tokio/tests/net_driver_drop.rs
index f1906ab3..d52c04d1 100644
--- a/tokio/tests/drop-core.rs
+++ b/tokio/tests/net_driver_drop.rs
@@ -1,8 +1,7 @@
#![warn(rust_2018_idioms)]
-#![cfg(feature = "default")]
+use tokio::net::driver::{self, Reactor};
use tokio::net::TcpListener;
-use tokio_net::driver::{self, Reactor};
use tokio_test::{assert_err, assert_pending, assert_ready, task};
#[test]
diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs
new file mode 100644
index 00000000..3631aa7b
--- /dev/null
+++ b/tokio/tests/process_issue_42.rs
@@ -0,0 +1,56 @@
+#![cfg(feature = "process")]
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+use tokio::process::Command;
+use tokio::runtime::current_thread;
+
+use futures_util::future::FutureExt;
+use futures_util::stream::FuturesOrdered;
+use std::process::Stdio;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+fn run_test() {
+ let finished = Arc::new(AtomicBool::new(false));
+ let finished_clone = finished.clone();
+
+ thread::spawn(move || {
+ let mut rt = current_thread::Runtime::new().expect("failed to get runtime");
+ let mut futures = FuturesOrdered::new();
+ rt.block_on(async {
+ for i in 0..2 {
+ futures.push(
+ Command::new("echo")
+ .arg(format!("I am spawned process #{}", i))
+ .stdin(Stdio::null())
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .spawn()
+ .unwrap()
+ .boxed(),
+ )
+ }
+ });
+
+ drop(rt);
+ finished_clone.store(true, Ordering::SeqCst);
+ });
+
+ thread::sleep(Duration::from_millis(1000));
+ assert!(
+ finished.load(Ordering::SeqCst),
+ "FINISHED flag not set, maybe we deadlocked?"
+ );
+}
+
+#[test]
+fn issue_42() {
+ let max = 10;
+ for i in 0..max {
+ println!("running {}/{}", i, max);
+ run_test()
+ }
+}
diff --git a/tokio/tests/process_smoke.rs b/tokio/tests/process_smoke.rs
new file mode 100644
index 00000000..a952f7f6
--- /dev/null
+++ b/tokio/tests/process_smoke.rs
@@ -0,0 +1,29 @@
+#![cfg(feature = "process")]
+#![warn(rust_2018_idioms)]
+
+use tokio::process::Command;
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn simple() {
+ let mut cmd;
+
+ if cfg!(windows) {
+ cmd = Command::new("cmd");
+ cmd.arg("/c");
+ } else {
+ cmd = Command::new("sh");
+ cmd.arg("-c");
+ }
+
+ let mut child = cmd.arg("exit 2").spawn().unwrap();
+
+ let id = child.id();
+ assert!(id > 0);
+
+ let status = assert_ok!((&mut child).await);
+ assert_eq!(status.code(), Some(2));
+
+ assert_eq!(child.id(), id);
+ drop(child.kill());
+}
diff --git a/tokio/tests/signal_ctrl_c.rs b/tokio/tests/signal_ctrl_c.rs
new file mode 100644
index 00000000..ea4efaa2
--- /dev/null
+++ b/tokio/tests/signal_ctrl_c.rs
@@ -0,0 +1,28 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal;
+use tokio::sync::oneshot;
+
+#[tokio::test]
+async fn ctrl_c() {
+ let ctrl_c = signal::ctrl_c().expect("failed to init ctrl_c");
+
+ let (fire, wait) = oneshot::channel();
+
+ // NB: simulate a signal coming in by exercising our signal handler
+ // to avoid complications with sending SIGINT to the test process
+ tokio::spawn(async {
+ wait.await.expect("wait failed");
+ send_signal(libc::SIGINT);
+ });
+
+ let _ = fire.send(());
+ let _ = ctrl_c.into_future().await;
+}
diff --git a/tokio/tests/signal_drop_recv.rs b/tokio/tests/signal_drop_recv.rs
new file mode 100644
index 00000000..2a5c047f
--- /dev/null
+++ b/tokio/tests/signal_drop_recv.rs
@@ -0,0 +1,22 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal::unix::{signal, SignalKind};
+
+#[tokio::test]
+async fn drop_then_get_a_signal() {
+ let kind = SignalKind::user_defined1();
+ let sig = signal(kind).expect("failed to create first signal");
+ drop(sig);
+
+ send_signal(libc::SIGUSR1);
+ let sig = signal(kind).expect("failed to create second signal");
+
+ let _ = sig.into_future().await;
+}
diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs
new file mode 100644
index 00000000..22495e8e
--- /dev/null
+++ b/tokio/tests/signal_drop_rt.rs
@@ -0,0 +1,37 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::runtime::current_thread::Runtime;
+use tokio::signal::unix::{signal, SignalKind};
+
+#[test]
+fn dropping_loops_does_not_cause_starvation() {
+ let kind = SignalKind::user_defined1();
+
+ let mut first_rt = Runtime::new().expect("failed to init first runtime");
+ let mut first_signal =
+ first_rt.block_on(async { signal(kind).expect("failed to register first signal") });
+
+ let mut second_rt = Runtime::new().expect("failed to init second runtime");
+ let mut second_signal =
+ second_rt.block_on(async { signal(kind).expect("failed to register second signal") });
+
+ send_signal(libc::SIGUSR1);
+
+ first_rt
+ .block_on(first_signal.next())
+ .expect("failed to await first signal");
+
+ drop(first_rt);
+ drop(first_signal);
+
+ send_signal(libc::SIGUSR1);
+
+ second_rt.block_on(second_signal.next());
+}
diff --git a/tokio/tests/signal_drop_signal.rs b/tokio/tests/signal_drop_signal.rs
new file mode 100644
index 00000000..3cf5611f
--- /dev/null
+++ b/tokio/tests/signal_drop_signal.rs
@@ -0,0 +1,26 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal::unix::{signal, SignalKind};
+
+#[tokio::test]
+async fn dropping_signal_does_not_deregister_any_other_instances() {
+ let kind = SignalKind::user_defined1();
+
+ // Signals should not starve based on ordering
+ let first_duplicate_signal = signal(kind).expect("failed to register first duplicate signal");
+ let sig = signal(kind).expect("failed to register signal");
+ let second_duplicate_signal = signal(kind).expect("failed to register second duplicate signal");
+
+ drop(first_duplicate_signal);
+ drop(second_duplicate_signal);
+
+ send_signal(libc::SIGUSR1);
+ let _ = sig.into_future().await;
+}
diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs
new file mode 100644
index 00000000..9ecf70ed
--- /dev/null
+++ b/tokio/tests/signal_multi_rt.rs
@@ -0,0 +1,47 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::runtime::current_thread::Runtime;
+use tokio::signal::unix::{signal, SignalKind};
+
+use std::sync::mpsc::channel;
+use std::thread;
+
+#[test]
+fn multi_loop() {
+ // An "ordinary" (non-future) channel
+ let (sender, receiver) = channel();
+ // Run multiple times, to make sure there are no race conditions
+ for _ in 0..10 {
+ // Run multiple event loops, each one in its own thread
+ let threads: Vec<_> = (0..4)
+ .map(|_| {
+ let sender = sender.clone();
+ thread::spawn(move || {
+ let mut rt = Runtime::new().unwrap();
+ let _ = rt.block_on(async {
+ let signal = signal(SignalKind::hangup()).unwrap();
+ sender.send(()).unwrap();
+ signal.into_future().await
+ });
+ })
+ })
+ .collect();
+ // Wait for them to declare they're ready
+ for &_ in threads.iter() {
+ receiver.recv().unwrap();
+ }
+ // Send a signal
+ send_signal(libc::SIGHUP);
+ // Make sure the threads terminated correctly
+ for t in threads {
+ t.join().unwrap();
+ }
+ }
+}
diff --git a/tokio/tests/signal_no_rt.rs b/tokio/tests/signal_no_rt.rs
new file mode 100644
index 00000000..b512e5e7
--- /dev/null
+++ b/tokio/tests/signal_no_rt.rs
@@ -0,0 +1,10 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+use tokio::signal::unix::{signal, SignalKind};
+
+#[test]
+#[should_panic]
+fn no_runtime_panics_creating_signals() {
+ let _ = signal(SignalKind::hangup());
+}
diff --git a/tokio/tests/signal_notify_both.rs b/tokio/tests/signal_notify_both.rs
new file mode 100644
index 00000000..00385478
--- /dev/null
+++ b/tokio/tests/signal_notify_both.rs
@@ -0,0 +1,23 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal::unix::{signal, SignalKind};
+
+use futures::future;
+
+#[tokio::test]
+async fn notify_both() {
+ let kind = SignalKind::user_defined2();
+ let signal1 = signal(kind).expect("failed to create signal1");
+
+ let signal2 = signal(kind).expect("failed to create signal2");
+
+ send_signal(libc::SIGUSR2);
+ let _ = future::join(signal1.into_future(), signal2.into_future()).await;
+}
diff --git a/tokio/tests/signal_twice.rs b/tokio/tests/signal_twice.rs
new file mode 100644
index 00000000..d8e0facc
--- /dev/null
+++ b/tokio/tests/signal_twice.rs
@@ -0,0 +1,25 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal::unix::{signal, SignalKind};
+
+#[tokio::test]
+async fn twice() {
+ let kind = SignalKind::user_defined1();
+ let mut sig = signal(kind).expect("failed to get signal");
+
+ for _ in 0..2 {
+ send_signal(libc::SIGUSR1);
+
+ let (item, sig_next) = sig.into_future().await;
+ assert_eq!(item, Some(()));
+
+ sig = sig_next;
+ }
+}
diff --git a/tokio/tests/signal_usr1.rs b/tokio/tests/signal_usr1.rs
new file mode 100644
index 00000000..9b6a0dec
--- /dev/null
+++ b/tokio/tests/signal_usr1.rs
@@ -0,0 +1,23 @@
+#![cfg(unix)]
+#![warn(rust_2018_idioms)]
+
+mod support {
+ pub mod signal;
+}
+use support::signal::send_signal;
+
+use tokio::prelude::*;
+use tokio::signal::unix::{signal, SignalKind};
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn signal_usr1() {
+ let signal = assert_ok!(
+ signal(SignalKind::user_defined1()),
+ "failed to create signal"
+ );
+
+ send_signal(libc::SIGUSR1);
+
+ let _ = signal.into_future().await;
+}
diff --git a/tokio/tests/support/signal.rs b/tokio/tests/support/signal.rs
new file mode 100644
index 00000000..ea060587
--- /dev/null
+++ b/tokio/tests/support/signal.rs
@@ -0,0 +1,7 @@
+pub fn send_signal(signal: libc::c_int) {
+ use libc::{getpid, kill};
+
+ unsafe {
+ assert_eq!(kill(getpid(), signal), 0);
+ }
+}
diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs
new file mode 100644
index 00000000..09a2a603
--- /dev/null
+++ b/tokio/tests/tcp_accept.rs
@@ -0,0 +1,39 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::oneshot;
+use tokio_test::assert_ok;
+
+use std::net::{IpAddr, SocketAddr};
+
+macro_rules! test_accept {
+ ($(($ident:ident, $target:expr),)*) => {
+ $(
+ #[tokio::test]
+ async fn $ident() {
+ let mut listener = assert_ok!(TcpListener::bind($target).await);
+ let addr = listener.local_addr().unwrap();
+
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (socket, _) = assert_ok!(listener.accept().await);
+ assert_ok!(tx.send(socket));
+ });
+
+ let cli = assert_ok!(TcpStream::connect(&addr).await);
+ let srv = assert_ok!(rx.await);
+
+ assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap());
+ }
+ )*
+ }
+}
+
+test_accept! {
+ (ip_str, "127.0.0.1:0"),
+ (host_str, "localhost:0"),
+ (socket_addr, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
+ (str_port_tuple, ("127.0.0.1", 0)),
+ (ip_port_tuple, ("127.0.0.1".parse::<IpAddr>().unwrap(), 0)),
+}
diff --git a/tokio/tests/tcp_connect.rs b/tokio/tests/tcp_connect.rs
new file mode 100644
index 00000000..daf0b3c9
--- /dev/null
+++ b/tokio/tests/tcp_connect.rs
@@ -0,0 +1,228 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::oneshot;
+use tokio_test::assert_ok;
+
+use futures::join;
+
+#[tokio::test]
+async fn connect_v4() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ assert!(addr.is_ipv4());
+
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (socket, addr) = assert_ok!(srv.accept().await);
+ assert_eq!(addr, assert_ok!(socket.peer_addr()));
+ assert_ok!(tx.send(socket));
+ });
+
+ let mine = assert_ok!(TcpStream::connect(&addr).await);
+ let theirs = assert_ok!(rx.await);
+
+ assert_eq!(
+ assert_ok!(mine.local_addr()),
+ assert_ok!(theirs.peer_addr())
+ );
+ assert_eq!(
+ assert_ok!(theirs.local_addr()),
+ assert_ok!(mine.peer_addr())
+ );
+}
+
+#[tokio::test]
+async fn connect_v6() {
+ let mut srv = assert_ok!(TcpListener::bind("[::1]:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ assert!(addr.is_ipv6());
+
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (socket, addr) = assert_ok!(srv.accept().await);
+ assert_eq!(addr, assert_ok!(socket.peer_addr()));
+ assert_ok!(tx.send(socket));
+ });
+
+ let mine = assert_ok!(TcpStream::connect(&addr).await);
+ let theirs = assert_ok!(rx.await);
+
+ assert_eq!(
+ assert_ok!(mine.local_addr()),
+ assert_ok!(theirs.peer_addr())
+ );
+ assert_eq!(
+ assert_ok!(theirs.local_addr()),
+ assert_ok!(mine.peer_addr())
+ );
+}
+
+#[tokio::test]
+async fn connect_addr_ip_string() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = format!("127.0.0.1:{}", addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(addr).await);
+ };
+
+ join!(server, client);
+}
+
+#[tokio::test]
+async fn connect_addr_ip_str_slice() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = format!("127.0.0.1:{}", addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(&addr[..]).await);
+ };
+
+ join!(server, client);
+}
+
+#[tokio::test]
+async fn connect_addr_host_string() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = format!("localhost:{}", addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(addr).await);
+ };
+
+ join!(server, client);
+}
+
+#[tokio::test]
+async fn connect_addr_ip_port_tuple() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = (addr.ip(), addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(&addr).await);
+ };
+
+ join!(server, client);
+}
+
+#[tokio::test]
+async fn connect_addr_ip_str_port_tuple() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = ("127.0.0.1", addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(&addr).await);
+ };
+
+ join!(server, client);
+}
+
+#[tokio::test]
+async fn connect_addr_host_str_port_tuple() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+ let addr = ("localhost", addr.port());
+
+ let server = async {
+ assert_ok!(srv.accept().await);
+ };
+
+ let client = async {
+ assert_ok!(TcpStream::connect(&addr).await);
+ };
+
+ join!(server, client);
+}
+
+/*
+ * TODO: bring this back once TCP exposes HUP again
+ *
+#[cfg(target_os = "linux")]
+mod linux {
+ use tokio::net::{TcpListener, TcpStream};
+ use tokio::prelude::*;
+ use tokio_test::assert_ok;
+
+ use mio::unix::UnixReady;
+
+ use futures_util::future::poll_fn;
+ use std::io::Write;
+ use std::time::Duration;
+ use std::{net, thread};
+
+ #[tokio::test]
+ fn poll_hup() {
+ let addr = assert_ok!("127.0.0.1:0".parse());
+ let mut srv = assert_ok!(TcpListener::bind(&addr));
+ let addr = assert_ok!(srv.local_addr());
+
+ tokio::spawn(async move {
+ let (mut client, _) = assert_ok!(srv.accept().await);
+ assert_ok!(client.set_linger(Some(Duration::from_millis(0))));
+ assert_ok!(client.write_all(b"hello world").await);
+
+ // TODO: Drop?
+ });
+
+ /*
+ let t = thread::spawn(move || {
+ let mut client = assert_ok!(srv.accept()).0;
+ client.set_linger(Some(Duration::from_millis(0))).unwrap();
+ client.write(b"hello world").unwrap();
+ thread::sleep(Duration::from_millis(200));
+ });
+ */
+
+ let mut stream = assert_ok!(TcpStream::connect(&addr).await);
+
+ // Poll for HUP before reading.
+ future::poll_fn(|| stream.poll_read_ready(UnixReady::hup().into()))
+ .wait()
+ .unwrap();
+
+ // Same for write half
+ future::poll_fn(|| stream.poll_write_ready())
+ .wait()
+ .unwrap();
+
+ let mut buf = vec![0; 11];
+
+ // Read the data
+ future::poll_fn(|| stream.poll_read(&mut buf))
+ .wait()
+ .unwrap();
+
+ assert_eq!(b"hello world", &buf[..]);
+
+ t.join().unwrap();
+ }
+}
+*/
diff --git a/tokio/tests/tcp_echo.rs b/tokio/tests/tcp_echo.rs
new file mode 100644
index 00000000..0325ecbb
--- /dev/null
+++ b/tokio/tests/tcp_echo.rs
@@ -0,0 +1,41 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+use tokio::sync::oneshot;
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn echo_server() {
+ const ITER: usize = 1024;
+
+ let (tx, rx) = oneshot::channel();
+
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+
+ let msg = "foo bar baz";
+ tokio::spawn(async move {
+ let mut stream = assert_ok!(TcpStream::connect(&addr).await);
+
+ for _ in 0..ITER {
+ // write
+ assert_ok!(stream.write_all(msg.as_bytes()).await);
+
+ // read
+ let mut buf = [0; 11];
+ assert_ok!(stream.read_exact(&mut buf).await);
+ assert_eq!(&buf[..], msg.as_bytes());
+ }
+
+ assert_ok!(tx.send(()));
+ });
+
+ let (mut stream, _) = assert_ok!(srv.accept().await);
+ let (mut rd, mut wr) = stream.split();
+
+ let n = assert_ok!(rd.copy(&mut wr).await);
+ assert_eq!(n, (ITER * msg.len()) as u64);
+
+ assert_ok!(rx.await);
+}
diff --git a/tokio/tests/tcp_peek.rs b/tokio/tests/tcp_peek.rs
new file mode 100644
index 00000000..34f1ba3a
--- /dev/null
+++ b/tokio/tests/tcp_peek.rs
@@ -0,0 +1,28 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::AsyncReadExt;
+use tokio::net::TcpStream;
+
+use tokio_test::assert_ok;
+
+use std::thread;
+use std::{convert::TryInto, io::Write, net};
+
+#[tokio::test]
+async fn peek() {
+ let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = listener.local_addr().unwrap();
+ let t = thread::spawn(move || assert_ok!(listener.accept()).0);
+
+ let left = net::TcpStream::connect(&addr).unwrap();
+ let mut right = t.join().unwrap();
+ right.write(&[1, 2, 3, 4]).unwrap();
+
+ let mut left: TcpStream = left.try_into().unwrap();
+ let mut buf = [0u8; 16];
+ let n = assert_ok!(left.peek(&mut buf).await);
+ assert_eq!([1, 2, 3, 4], buf[..n]);
+
+ let n = assert_ok!(left.read(&mut buf).await);
+ assert_eq!([1, 2, 3, 4], buf[..n]);
+}
diff --git a/tokio/tests/tcp_shutdown.rs b/tokio/tests/tcp_shutdown.rs
new file mode 100644
index 00000000..7e0597c9
--- /dev/null
+++ b/tokio/tests/tcp_shutdown.rs
@@ -0,0 +1,28 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::AsyncWriteExt;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn shutdown() {
+ let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(srv.local_addr());
+
+ tokio::spawn(async move {
+ let mut stream = assert_ok!(TcpStream::connect(&addr).await);
+
+ assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
+
+ let mut buf = [0; 1];
+ let n = assert_ok!(stream.read(&mut buf).await);
+ assert_eq!(n, 0);
+ });
+
+ let (mut stream, _) = assert_ok!(srv.accept().await);
+ let (mut rd, mut wr) = stream.split();
+
+ let n = assert_ok!(rd.copy(&mut wr).await);
+ assert_eq!(n, 0);
+}
diff --git a/tokio/tests/tcp_split.rs b/tokio/tests/tcp_split.rs
new file mode 100644
index 00000000..ae5f249c
--- /dev/null
+++ b/tokio/tests/tcp_split.rs
@@ -0,0 +1 @@
+// TODO: write tests using TcpStream::split()
diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs
new file mode 100644
index 00000000..a2e4f3a6
--- /dev/null
+++ b/tokio/tests/udp.rs
@@ -0,0 +1,72 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::net::UdpSocket;
+
+#[tokio::test]
+async fn send_recv() -> std::io::Result<()> {
+ let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ sender.connect(receiver.local_addr()?).await?;
+ receiver.connect(sender.local_addr()?).await?;
+
+ let message = b"hello!";
+ sender.send(message).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let len = receiver.recv(&mut recv_buf[..]).await?;
+
+ assert_eq!(&recv_buf[..len], message);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_recv_from() -> std::io::Result<()> {
+ let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let message = b"hello!";
+ let receiver_addr = receiver.local_addr()?;
+ sender.send_to(message, &receiver_addr).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
+
+ assert_eq!(&recv_buf[..len], message);
+ assert_eq!(addr, sender.local_addr()?);
+ Ok(())
+}
+
+#[tokio::test]
+async fn split() -> std::io::Result<()> {
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let (mut r, mut s) = socket.split();
+
+ let msg = b"hello";
+ let addr = s.as_ref().local_addr()?;
+ tokio::spawn(async move {
+ s.send_to(msg, &addr).await.unwrap();
+ });
+ let mut recv_buf = [0u8; 32];
+ let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
+ assert_eq!(&recv_buf[..len], msg);
+ Ok(())
+}
+
+#[tokio::test]
+async fn reunite() -> std::io::Result<()> {
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let (s, r) = socket.split();
+ assert!(s.reunite(r).is_ok());
+ Ok(())
+}
+
+#[tokio::test]
+async fn reunite_error() -> std::io::Result<()> {
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let socket1 = UdpSocket::bind("127.0.0.1:0").await?;
+ let (s, _) = socket.split();
+ let (_, r1) = socket1.split();
+ assert!(s.reunite(r1).is_err());
+ Ok(())