diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-08-27 20:05:48 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-27 20:05:48 -0400 |
commit | d600ab9a8f37e9eff3fa8587069a816b65b6da0b (patch) | |
tree | 06d14901604c5c7822b43d9f4973fdccd15509e7 /tokio/tests | |
parent | d9d909cb4c6d326423ee02fbcf6bbfe5553d2c0a (diff) |
rt: Refactor `Runtime::block_on` to take `&self` (#2782)
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/io_driver.rs | 2 | ||||
-rw-r--r-- | tokio/tests/rt_basic.rs | 6 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 140 | ||||
-rw-r--r-- | tokio/tests/rt_threaded.rs | 20 | ||||
-rw-r--r-- | tokio/tests/signal_drop_rt.rs | 4 | ||||
-rw-r--r-- | tokio/tests/signal_multi_rt.rs | 2 | ||||
-rw-r--r-- | tokio/tests/task_blocking.rs | 18 | ||||
-rw-r--r-- | tokio/tests/task_local_set.rs | 18 | ||||
-rw-r--r-- | tokio/tests/time_rt.rs | 2 |
9 files changed, 92 insertions, 120 deletions
diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs index b85abd8c..d4f4f8d4 100644 --- a/tokio/tests/io_driver.rs +++ b/tokio/tests/io_driver.rs @@ -45,7 +45,7 @@ fn test_drop_on_notify() { // shutting down. Then, when the task handle is dropped, the task itself is // dropped. - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0885992d..3813c480 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -12,7 +12,7 @@ use std::time::Duration; fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { assert_ok!(tx.send("hello")); @@ -65,7 +65,7 @@ fn no_extra_poll() { }; let npolls = Arc::clone(&rx.npolls); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { while rx.next().await.is_some() {} }); rt.block_on(async { @@ -100,7 +100,7 @@ fn acquire_mutex_in_drop() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { let _ = rx2.await; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 4211a667..35e2ea81 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -9,38 +9,41 @@ macro_rules! rt_test { mod basic_scheduler { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_4_threads { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(4) .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_1_thread { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc<Runtime> { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(1) .enable_all() .build() .unwrap() + .into() } } } @@ -72,7 +75,7 @@ rt_test! { #[test] fn block_on_sync() { - let mut rt = rt(); + let rt = rt(); let mut win = false; rt.block_on(async { @@ -82,41 +85,12 @@ rt_test! { assert!(win); } - #[test] - fn block_on_handle_sync() { - let rt = rt(); - - let mut win = false; - rt.handle().block_on(async { - win = true; - }); - - assert!(win); - } #[test] fn block_on_async() { - let mut rt = rt(); - - let out = rt.block_on(async { - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - thread::sleep(Duration::from_millis(50)); - tx.send("ZOMG").unwrap(); - }); - - assert_ok!(rx.await) - }); - - assert_eq!(out, "ZOMG"); - } - - #[test] - fn block_on_handle_async() { let rt = rt(); - let out = rt.handle().block_on(async { + let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); thread::spawn(move || { @@ -132,7 +106,7 @@ rt_test! { #[test] fn spawn_one_bg() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -149,7 +123,7 @@ rt_test! { #[test] fn spawn_one_join() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -172,7 +146,7 @@ rt_test! { #[test] fn spawn_two() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -199,7 +173,7 @@ rt_test! { const ITER: usize = 200; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); @@ -249,7 +223,7 @@ rt_test! { const ITER: usize = 500; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { @@ -305,7 +279,7 @@ rt_test! { #[test] fn spawn_await_chain() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { @@ -320,7 +294,7 @@ rt_test! { #[test] fn outstanding_tasks_dropped() { - let mut rt = rt(); + let rt = rt(); let cnt = Arc::new(()); @@ -343,16 +317,16 @@ rt_test! { #[test] #[should_panic] fn nested_rt() { - let mut rt1 = rt(); - let mut rt2 = rt(); + let rt1 = rt(); + let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { - let mut rt1 = rt(); - let mut rt2 = rt1.block_on(async { rt() }); + let rt1 = rt(); + let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); @@ -360,7 +334,7 @@ rt_test! { #[test] fn complete_block_on_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -383,7 +357,7 @@ rt_test! { #[test] fn complete_task_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -412,8 +386,8 @@ rt_test! { #[test] fn spawn_from_other_thread_idle() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -432,8 +406,8 @@ rt_test! { #[test] fn spawn_from_other_thread_under_load() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -457,7 +431,7 @@ rt_test! { #[test] fn delay_at_root() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -471,7 +445,7 @@ rt_test! { #[test] fn delay_in_spawn() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -492,7 +466,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -512,7 +486,7 @@ rt_test! { #[test] fn spawn_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -527,7 +501,7 @@ rt_test! { #[test] fn spawn_blocking_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -542,7 +516,7 @@ rt_test! { #[test] fn delay_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { @@ -562,7 +536,7 @@ rt_test! { #[test] fn socket_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -586,7 +560,7 @@ rt_test! { #[test] fn spawn_blocking_after_shutdown() { let rt = rt(); - let handle = rt.handle().clone(); + let handle = rt.clone(); // Shutdown drop(rt); @@ -615,7 +589,7 @@ rt_test! { // test is disabled. #[cfg(not(windows))] fn io_driver_called_when_under_load() { - let mut rt = rt(); + let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { @@ -651,7 +625,7 @@ rt_test! { #[test] fn client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); @@ -662,7 +636,7 @@ rt_test! { #[test] fn panic_in_task() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option<oneshot::Sender<()>>); @@ -689,7 +663,7 @@ rt_test! { #[test] #[should_panic] fn panic_in_block_on() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { panic!() }); } @@ -709,7 +683,7 @@ rt_test! { #[test] fn enter_and_spawn() { - let mut rt = rt(); + let rt = rt(); let handle = rt.enter(|| { tokio::spawn(async {}) }); @@ -739,7 +713,7 @@ rt_test! { } } - let mut rt = rt(); + let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); @@ -775,17 +749,17 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); - let h1 = rt.handle().clone(); + let h1 = rt.clone(); - rt.handle().spawn(async move { + rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. let _ = rx1.await; tx3.send(()).unwrap(); }); - rt.handle().spawn(async move { + rt.spawn(async move { // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. @@ -802,7 +776,7 @@ rt_test! { let _ = rx2.await; }); - rt.handle().spawn(async move { + rt.spawn(async move { let _ = rx3.await; // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. @@ -823,7 +797,7 @@ rt_test! { use std::net::Ipv6Addr; for _ in 1..10 { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); @@ -854,7 +828,7 @@ rt_test! { #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { @@ -865,18 +839,18 @@ rt_test! { rx.await.unwrap(); }); - runtime.shutdown_timeout(Duration::from_millis(100)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } #[test] fn shutdown_wakeup_time() { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { tokio::time::delay_for(std::time::Duration::from_millis(100)).await; }); - runtime.shutdown_timeout(Duration::from_secs(10_000)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); } // This test is currently ignored on Windows because of a @@ -894,7 +868,9 @@ rt_test! { thread::spawn(|| { R.with(|cell| { - *cell.borrow_mut() = Some(rt()); + let rt = rt(); + let rt = Arc::try_unwrap(rt).unwrap(); + *cell.borrow_mut() = Some(rt); }); let _rt = rt(); @@ -927,10 +903,10 @@ rt_test! { #[test] fn local_set_block_on_socket() { - let mut rt = rt(); + let rt = rt(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -948,12 +924,12 @@ rt_test! { #[test] fn local_set_client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); @@ -987,7 +963,7 @@ rt_test! { fn coop() { use std::task::Poll::Ready; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { // Create a bunch of tasks @@ -1019,7 +995,7 @@ rt_test! { const NUM: usize = 100; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index b5ec96de..a67c090e 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -57,22 +57,20 @@ fn many_oneshot_futures() { } #[test] fn many_multishot_futures() { - use tokio::sync::mpsc; - const CHAIN: usize = 200; const CYCLES: usize = 5; const TRACKS: usize = 50; for _ in 0..50 { - let mut rt = rt(); + let rt = rt(); let mut start_txs = Vec::with_capacity(TRACKS); let mut final_rxs = Vec::with_capacity(TRACKS); for _ in 0..TRACKS { - let (start_tx, mut chain_rx) = mpsc::channel(10); + let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10); for _ in 0..CHAIN { - let (mut next_tx, next_rx) = mpsc::channel(10); + let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10); // Forward all the messages rt.spawn(async move { @@ -85,7 +83,7 @@ fn many_multishot_futures() { } // This final task cycles if needed - let (mut final_tx, final_rx) = mpsc::channel(10); + let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10); let mut cycle_tx = start_tx.clone(); let mut rem = CYCLES; @@ -123,7 +121,7 @@ fn many_multishot_futures() { #[test] fn spawn_shutdown() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async { @@ -230,7 +228,7 @@ fn start_stop_callbacks_called() { let after_inner = after_start.clone(); let before_inner = before_stop.clone(); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .on_thread_start(move || { @@ -331,9 +329,7 @@ fn multi_threadpool() { // channel yields occasionally even if there are values ready to receive. #[test] fn coop_and_block_in_place() { - use tokio::sync::mpsc; - - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() // Setting max threads to 1 prevents another thread from claiming the // runtime worker yielded as part of `block_in_place` and guarantees the @@ -344,7 +340,7 @@ fn coop_and_block_in_place() { .unwrap(); rt.block_on(async move { - let (mut tx, mut rx) = mpsc::channel(1024); + let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024); // Fill the channel for _ in 0..1024 { diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index aeedd96e..709e0d41 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -14,11 +14,11 @@ use tokio::signal::unix::{signal, SignalKind}; fn dropping_loops_does_not_cause_starvation() { let kind = SignalKind::user_defined1(); - let mut first_rt = rt(); + let first_rt = rt(); let mut first_signal = first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); - let mut second_rt = rt(); + let second_rt = rt(); let mut second_signal = second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 9d784695..78319a75 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -24,7 +24,7 @@ fn multi_loop() { .map(|_| { let sender = sender.clone(); thread::spawn(move || { - let mut rt = rt(); + let rt = rt(); let _ = rt.block_on(async { let mut signal = signal(SignalKind::hangup()).unwrap(); sender.send(()).unwrap(); diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 4ca1596e..6cb11584 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -79,7 +79,7 @@ async fn no_block_in_basic_scheduler() { #[test] fn yes_block_in_threaded_block_on() { - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -91,7 +91,7 @@ fn yes_block_in_threaded_block_on() { #[test] #[should_panic] fn no_block_in_basic_block_on() { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); rt.block_on(async { task::block_in_place(|| {}); }); @@ -99,14 +99,14 @@ fn no_block_in_basic_block_on() { #[test] fn can_enter_basic_rt_from_within_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); outer.block_on(async { tokio::task::block_in_place(|| { - let mut inner = tokio::runtime::Builder::new() + let inner = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -120,7 +120,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -147,7 +147,7 @@ fn useful_panic_message_when_dropping_rt_in_rt() { #[test] fn can_shutdown_with_zero_timeout_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -163,7 +163,7 @@ fn can_shutdown_with_zero_timeout_in_runtime() { #[test] fn can_shutdown_now_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -179,7 +179,7 @@ fn can_shutdown_now_in_runtime() { #[test] fn coop_disabled_in_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .enable_time() .build() @@ -213,7 +213,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { let (done_tx, done_rx) = std::sync::mpsc::channel(); let done = done_tx.clone(); thread::spawn(move || { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index bf80b8ee..23e92586 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -133,12 +133,12 @@ fn local_threadpool_blocking_in_place() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .enable_all() .build() .unwrap(); - LocalSet::new().block_on(&mut rt, async { + LocalSet::new().block_on(&rt, async { assert!(ON_RT_THREAD.with(|cell| cell.get())); let join = task::spawn_local(async move { assert!(ON_RT_THREAD.with(|cell| cell.get())); @@ -246,12 +246,12 @@ fn join_local_future_elsewhere() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); let local = LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let join = task::spawn_local(async move { println!("hello world running..."); @@ -286,7 +286,7 @@ fn drop_cancels_tasks() { use std::rc::Rc; // This test reproduces issue #1842 - let mut rt = rt(); + let rt = rt(); let rc1 = Rc::new(()); let rc2 = rc1.clone(); @@ -303,7 +303,7 @@ fn drop_cancels_tasks() { } }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { started_rx.await.unwrap(); }); drop(local); @@ -362,11 +362,11 @@ fn drop_cancels_remote_tasks() { with_timeout(Duration::from_secs(60), || { let (tx, mut rx) = mpsc::channel::<()>(1024); - let mut rt = rt(); + let rt = rt(); let local = LocalSet::new(); local.spawn_local(async move { while rx.recv().await.is_some() {} }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { time::delay_for(Duration::from_millis(1)).await; }); @@ -385,7 +385,7 @@ fn local_tasks_wake_join_all() { use futures::future::join_all; use tokio::task::LocalSet; - let mut rt = rt(); + let rt = rt(); let set = LocalSet::new(); let mut handles = Vec::new(); diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index b739f1b2..19bcd27d 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -28,7 +28,7 @@ fn timer_with_threaded_runtime() { fn timer_with_basic_scheduler() { use tokio::runtime::Builder; - let mut rt = Builder::new() + let rt = Builder::new() .basic_scheduler() .enable_all() .build() |