diff options
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r-- | tokio/tests/rt_common.rs | 140 |
1 files changed, 58 insertions, 82 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 4211a667..35e2ea81 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -9,38 +9,41 @@ macro_rules! rt_test { mod basic_scheduler { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_4_threads { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(4) .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_1_thread { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(1) .enable_all() .build() .unwrap() + .into() } } } @@ -72,7 +75,7 @@ rt_test! { #[test] fn block_on_sync() { - let mut rt = rt(); + let rt = rt(); let mut win = false; rt.block_on(async { @@ -82,41 +85,12 @@ rt_test! { assert!(win); } - #[test] - fn block_on_handle_sync() { - let rt = rt(); - - let mut win = false; - rt.handle().block_on(async { - win = true; - }); - - assert!(win); - } #[test] fn block_on_async() { - let mut rt = rt(); - - let out = rt.block_on(async { - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - thread::sleep(Duration::from_millis(50)); - tx.send("ZOMG").unwrap(); - }); - - assert_ok!(rx.await) - }); - - assert_eq!(out, "ZOMG"); - } - - #[test] - fn block_on_handle_async() { let rt = rt(); - let out = rt.handle().block_on(async { + let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); thread::spawn(move || { @@ -132,7 +106,7 @@ rt_test! { #[test] fn spawn_one_bg() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -149,7 +123,7 @@ rt_test! { #[test] fn spawn_one_join() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -172,7 +146,7 @@ rt_test! { #[test] fn spawn_two() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -199,7 +173,7 @@ rt_test! { const ITER: usize = 200; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); @@ -249,7 +223,7 @@ rt_test! { const ITER: usize = 500; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { @@ -305,7 +279,7 @@ rt_test! { #[test] fn spawn_await_chain() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { @@ -320,7 +294,7 @@ rt_test! { #[test] fn outstanding_tasks_dropped() { - let mut rt = rt(); + let rt = rt(); let cnt = Arc::new(()); @@ -343,16 +317,16 @@ rt_test! { #[test] #[should_panic] fn nested_rt() { - let mut rt1 = rt(); - let mut rt2 = rt(); + let rt1 = rt(); + let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { - let mut rt1 = rt(); - let mut rt2 = rt1.block_on(async { rt() }); + let rt1 = rt(); + let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); @@ -360,7 +334,7 @@ rt_test! { #[test] fn complete_block_on_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -383,7 +357,7 @@ rt_test! { #[test] fn complete_task_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -412,8 +386,8 @@ rt_test! { #[test] fn spawn_from_other_thread_idle() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -432,8 +406,8 @@ rt_test! { #[test] fn spawn_from_other_thread_under_load() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -457,7 +431,7 @@ rt_test! { #[test] fn delay_at_root() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -471,7 +445,7 @@ rt_test! { #[test] fn delay_in_spawn() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -492,7 +466,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -512,7 +486,7 @@ rt_test! { #[test] fn spawn_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -527,7 +501,7 @@ rt_test! { #[test] fn spawn_blocking_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -542,7 +516,7 @@ rt_test! { #[test] fn delay_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { @@ -562,7 +536,7 @@ rt_test! { #[test] fn socket_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -586,7 +560,7 @@ rt_test! { #[test] fn spawn_blocking_after_shutdown() { let rt = rt(); - let handle = rt.handle().clone(); + let handle = rt.clone(); // Shutdown drop(rt); @@ -615,7 +589,7 @@ rt_test! { // test is disabled. #[cfg(not(windows))] fn io_driver_called_when_under_load() { - let mut rt = rt(); + let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { @@ -651,7 +625,7 @@ rt_test! { #[test] fn client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); @@ -662,7 +636,7 @@ rt_test! { #[test] fn panic_in_task() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option<oneshot::Sender<()>>); @@ -689,7 +663,7 @@ rt_test! { #[test] #[should_panic] fn panic_in_block_on() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { panic!() }); } @@ -709,7 +683,7 @@ rt_test! { #[test] fn enter_and_spawn() { - let mut rt = rt(); + let rt = rt(); let handle = rt.enter(|| { tokio::spawn(async {}) }); @@ -739,7 +713,7 @@ rt_test! { } } - let mut rt = rt(); + let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); @@ -775,17 +749,17 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); - let h1 = rt.handle().clone(); + let h1 = rt.clone(); - rt.handle().spawn(async move { + rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. let _ = rx1.await; tx3.send(()).unwrap(); }); - rt.handle().spawn(async move { + rt.spawn(async move { // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. @@ -802,7 +776,7 @@ rt_test! { let _ = rx2.await; }); - rt.handle().spawn(async move { + rt.spawn(async move { let _ = rx3.await; // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. @@ -823,7 +797,7 @@ rt_test! { use std::net::Ipv6Addr; for _ in 1..10 { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); @@ -854,7 +828,7 @@ rt_test! { #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { @@ -865,18 +839,18 @@ rt_test! { rx.await.unwrap(); }); - runtime.shutdown_timeout(Duration::from_millis(100)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } #[test] fn shutdown_wakeup_time() { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { tokio::time::delay_for(std::time::Duration::from_millis(100)).await; }); - runtime.shutdown_timeout(Duration::from_secs(10_000)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); } // This test is currently ignored on Windows because of a @@ -894,7 +868,9 @@ rt_test! { thread::spawn(|| { R.with(|cell| { - *cell.borrow_mut() = Some(rt()); + let rt = rt(); + let rt = Arc::try_unwrap(rt).unwrap(); + *cell.borrow_mut() = Some(rt); }); let _rt = rt(); @@ -927,10 +903,10 @@ rt_test! { #[test] fn local_set_block_on_socket() { - let mut rt = rt(); + let rt = rt(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -948,12 +924,12 @@ rt_test! { #[test] fn local_set_client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); @@ -987,7 +963,7 @@ rt_test! { fn coop() { use std::task::Poll::Ready; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { // Create a bunch of tasks @@ -1019,7 +995,7 @@ rt_test! { const NUM: usize = 100; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); |