summaryrefslogtreecommitdiffstats
path: root/tokio/tests/rt_common.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-05 10:31:37 -0800
committerGitHub <noreply@github.com>2020-03-05 10:31:37 -0800
commita78b1c65ccfb9692ca5d3ed8ddde934f40091d83 (patch)
treec88e547d6913b204f590aea54dc03328ee3cb094 /tokio/tests/rt_common.rs
parent5ede2e4d6b2f732e83e33f9693682dffc6c9f5b0 (diff)
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
A refactor of the scheduler internals focusing on simplifying and reducing unsafety. There are no fundamental logic changes. * The state transitions of the core task component are refined and reduced. * `basic_scheduler` has most unsafety removed. * `local_set` has most unsafety removed. * `threaded_scheduler` limits most unsafety to its queue implementation.
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r--tokio/tests/rt_common.rs116
1 files changed, 68 insertions, 48 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 64dd3680..52d33a51 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -307,7 +307,7 @@ rt_test! {
}
#[test]
- fn spawn_from_other_thread() {
+ fn spawn_from_other_thread_idle() {
let mut rt = rt();
let handle = rt.handle().clone();
@@ -327,6 +327,31 @@ rt_test! {
}
#[test]
+ fn spawn_from_other_thread_under_load() {
+ let mut rt = rt();
+ let handle = rt.handle().clone();
+
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ handle.spawn(async move {
+ assert_ok!(tx.send(()));
+ });
+ });
+
+ rt.block_on(async move {
+ // Spin hard
+ tokio::spawn(async {
+ loop {
+ yield_once().await;
+ }
+ });
+
+ assert_ok!(rx.await);
+ });
+ }
+
+ #[test]
fn delay_at_root() {
let mut rt = rt();
@@ -680,7 +705,7 @@ rt_test! {
fn io_notify_while_shutting_down() {
use std::net::Ipv6Addr;
- for _ in 1..100 {
+ for _ in 1..10 {
let mut runtime = rt();
runtime.block_on(async {
@@ -768,66 +793,61 @@ rt_test! {
tx.send(()).unwrap();
}
- mod local_set {
- use tokio::task;
- use super::*;
-
- #[test]
- fn block_on_socket() {
- let mut rt = rt();
- let local = task::LocalSet::new();
-
- local.block_on(&mut rt, async move {
- let (tx, rx) = oneshot::channel();
+ #[test]
+ fn local_set_block_on_socket() {
+ let mut rt = rt();
+ let local = task::LocalSet::new();
- let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
- let addr = listener.local_addr().unwrap();
+ local.block_on(&mut rt, async move {
+ let (tx, rx) = oneshot::channel();
- task::spawn_local(async move {
- let _ = listener.accept().await;
- tx.send(()).unwrap();
- });
+ let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
- TcpStream::connect(&addr).await.unwrap();
- rx.await.unwrap();
+ task::spawn_local(async move {
+ let _ = listener.accept().await;
+ tx.send(()).unwrap();
});
- }
- #[test]
- fn client_server_block_on() {
- let mut rt = rt();
- let (tx, rx) = mpsc::channel();
+ TcpStream::connect(&addr).await.unwrap();
+ rx.await.unwrap();
+ });
+ }
- let local = task::LocalSet::new();
+ #[test]
+ fn local_set_client_server_block_on() {
+ let mut rt = rt();
+ let (tx, rx) = mpsc::channel();
- local.block_on(&mut rt, async move { client_server_local(tx).await });
+ let local = task::LocalSet::new();
- assert_ok!(rx.try_recv());
- assert_err!(rx.try_recv());
- }
+ local.block_on(&mut rt, async move { client_server_local(tx).await });
- async fn client_server_local(tx: mpsc::Sender<()>) {
- let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ assert_ok!(rx.try_recv());
+ assert_err!(rx.try_recv());
+ }
- // Get the assigned address
- let addr = assert_ok!(server.local_addr());
+ async fn client_server_local(tx: mpsc::Sender<()>) {
+ let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
- // Spawn the server
- task::spawn_local(async move {
- // Accept a socket
- let (mut socket, _) = server.accept().await.unwrap();
+ // Get the assigned address
+ let addr = assert_ok!(server.local_addr());
- // Write some data
- socket.write_all(b"hello").await.unwrap();
- });
+ // Spawn the server
+ task::spawn_local(async move {
+ // Accept a socket
+ let (mut socket, _) = server.accept().await.unwrap();
+
+ // Write some data
+ socket.write_all(b"hello").await.unwrap();
+ });
- let mut client = TcpStream::connect(&addr).await.unwrap();
+ let mut client = TcpStream::connect(&addr).await.unwrap();
- let mut buf = vec![];
- client.read_to_end(&mut buf).await.unwrap();
+ let mut buf = vec![];
+ client.read_to_end(&mut buf).await.unwrap();
- assert_eq!(buf, b"hello");
- tx.send(()).unwrap();
- }
+ assert_eq!(buf, b"hello");
+ tx.send(()).unwrap();
}
}