diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-05 10:31:37 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-05 10:31:37 -0800 |
commit | a78b1c65ccfb9692ca5d3ed8ddde934f40091d83 (patch) | |
tree | c88e547d6913b204f590aea54dc03328ee3cb094 /tokio/tests/rt_common.rs | |
parent | 5ede2e4d6b2f732e83e33f9693682dffc6c9f5b0 (diff) |
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
A refactor of the scheduler internals focusing on simplifying and
reducing unsafety. There are no fundamental logic changes.
* The state transitions of the core task component are refined and
reduced.
* `basic_scheduler` has most unsafety removed.
* `local_set` has most unsafety removed.
* `threaded_scheduler` limits most unsafety to its queue implementation.
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r-- | tokio/tests/rt_common.rs | 116 |
1 files changed, 68 insertions, 48 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 64dd3680..52d33a51 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -307,7 +307,7 @@ rt_test! { } #[test] - fn spawn_from_other_thread() { + fn spawn_from_other_thread_idle() { let mut rt = rt(); let handle = rt.handle().clone(); @@ -327,6 +327,31 @@ rt_test! { } #[test] + fn spawn_from_other_thread_under_load() { + let mut rt = rt(); + let handle = rt.handle().clone(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + handle.spawn(async move { + assert_ok!(tx.send(())); + }); + }); + + rt.block_on(async move { + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + assert_ok!(rx.await); + }); + } + + #[test] fn delay_at_root() { let mut rt = rt(); @@ -680,7 +705,7 @@ rt_test! { fn io_notify_while_shutting_down() { use std::net::Ipv6Addr; - for _ in 1..100 { + for _ in 1..10 { let mut runtime = rt(); runtime.block_on(async { @@ -768,66 +793,61 @@ rt_test! { tx.send(()).unwrap(); } - mod local_set { - use tokio::task; - use super::*; - - #[test] - fn block_on_socket() { - let mut rt = rt(); - let local = task::LocalSet::new(); - - local.block_on(&mut rt, async move { - let (tx, rx) = oneshot::channel(); + #[test] + fn local_set_block_on_socket() { + let mut rt = rt(); + let local = task::LocalSet::new(); - let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + local.block_on(&mut rt, async move { + let (tx, rx) = oneshot::channel(); - task::spawn_local(async move { - let _ = listener.accept().await; - tx.send(()).unwrap(); - }); + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); - TcpStream::connect(&addr).await.unwrap(); - rx.await.unwrap(); + task::spawn_local(async move { + let _ = listener.accept().await; + tx.send(()).unwrap(); }); - } - #[test] - fn client_server_block_on() { - let mut rt = rt(); - let (tx, rx) = mpsc::channel(); + TcpStream::connect(&addr).await.unwrap(); + rx.await.unwrap(); + }); + } - let local = task::LocalSet::new(); + #[test] + fn local_set_client_server_block_on() { + let mut rt = rt(); + let (tx, rx) = mpsc::channel(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + let local = task::LocalSet::new(); - assert_ok!(rx.try_recv()); - assert_err!(rx.try_recv()); - } + local.block_on(&mut rt, async move { client_server_local(tx).await }); - async fn client_server_local(tx: mpsc::Sender<()>) { - let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + assert_ok!(rx.try_recv()); + assert_err!(rx.try_recv()); + } - // Get the assigned address - let addr = assert_ok!(server.local_addr()); + async fn client_server_local(tx: mpsc::Sender<()>) { + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); - // Spawn the server - task::spawn_local(async move { - // Accept a socket - let (mut socket, _) = server.accept().await.unwrap(); + // Get the assigned address + let addr = assert_ok!(server.local_addr()); - // Write some data - socket.write_all(b"hello").await.unwrap(); - }); + // Spawn the server + task::spawn_local(async move { + // Accept a socket + let (mut socket, _) = server.accept().await.unwrap(); + + // Write some data + socket.write_all(b"hello").await.unwrap(); + }); - let mut client = TcpStream::connect(&addr).await.unwrap(); + let mut client = TcpStream::connect(&addr).await.unwrap(); - let mut buf = vec![]; - client.read_to_end(&mut buf).await.unwrap(); + let mut buf = vec![]; + client.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"hello"); - tx.send(()).unwrap(); - } + assert_eq!(buf, b"hello"); + tx.send(()).unwrap(); } } |