summaryrefslogtreecommitdiffstats
path: root/tokio/tests/rt_common.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r--tokio/tests/rt_common.rs140
1 files changed, 58 insertions, 82 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 4211a667..35e2ea81 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -9,38 +9,41 @@ macro_rules! rt_test {
mod basic_scheduler {
$($t)*
- fn rt() -> Runtime {
+ fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
+ .into()
}
}
mod threaded_scheduler_4_threads {
$($t)*
- fn rt() -> Runtime {
+ fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(4)
.enable_all()
.build()
.unwrap()
+ .into()
}
}
mod threaded_scheduler_1_thread {
$($t)*
- fn rt() -> Runtime {
+ fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.unwrap()
+ .into()
}
}
}
@@ -72,7 +75,7 @@ rt_test! {
#[test]
fn block_on_sync() {
- let mut rt = rt();
+ let rt = rt();
let mut win = false;
rt.block_on(async {
@@ -82,41 +85,12 @@ rt_test! {
assert!(win);
}
- #[test]
- fn block_on_handle_sync() {
- let rt = rt();
-
- let mut win = false;
- rt.handle().block_on(async {
- win = true;
- });
-
- assert!(win);
- }
#[test]
fn block_on_async() {
- let mut rt = rt();
-
- let out = rt.block_on(async {
- let (tx, rx) = oneshot::channel();
-
- thread::spawn(move || {
- thread::sleep(Duration::from_millis(50));
- tx.send("ZOMG").unwrap();
- });
-
- assert_ok!(rx.await)
- });
-
- assert_eq!(out, "ZOMG");
- }
-
- #[test]
- fn block_on_handle_async() {
let rt = rt();
- let out = rt.handle().block_on(async {
+ let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
@@ -132,7 +106,7 @@ rt_test! {
#[test]
fn spawn_one_bg() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -149,7 +123,7 @@ rt_test! {
#[test]
fn spawn_one_join() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -172,7 +146,7 @@ rt_test! {
#[test]
fn spawn_two() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@@ -199,7 +173,7 @@ rt_test! {
const ITER: usize = 200;
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
@@ -249,7 +223,7 @@ rt_test! {
const ITER: usize = 500;
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
tokio::spawn(async move {
@@ -305,7 +279,7 @@ rt_test! {
#[test]
fn spawn_await_chain() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
assert_ok!(tokio::spawn(async {
@@ -320,7 +294,7 @@ rt_test! {
#[test]
fn outstanding_tasks_dropped() {
- let mut rt = rt();
+ let rt = rt();
let cnt = Arc::new(());
@@ -343,16 +317,16 @@ rt_test! {
#[test]
#[should_panic]
fn nested_rt() {
- let mut rt1 = rt();
- let mut rt2 = rt();
+ let rt1 = rt();
+ let rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
- let mut rt1 = rt();
- let mut rt2 = rt1.block_on(async { rt() });
+ let rt1 = rt();
+ let rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
@@ -360,7 +334,7 @@ rt_test! {
#[test]
fn complete_block_on_under_load() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -383,7 +357,7 @@ rt_test! {
#[test]
fn complete_task_under_load() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@@ -412,8 +386,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_idle() {
- let mut rt = rt();
- let handle = rt.handle().clone();
+ let rt = rt();
+ let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@@ -432,8 +406,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_under_load() {
- let mut rt = rt();
- let handle = rt.handle().clone();
+ let rt = rt();
+ let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@@ -457,7 +431,7 @@ rt_test! {
#[test]
fn delay_at_root() {
- let mut rt = rt();
+ let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
@@ -471,7 +445,7 @@ rt_test! {
#[test]
fn delay_in_spawn() {
- let mut rt = rt();
+ let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
@@ -492,7 +466,7 @@ rt_test! {
#[test]
fn block_on_socket() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
@@ -512,7 +486,7 @@ rt_test! {
#[test]
fn spawn_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@@ -527,7 +501,7 @@ rt_test! {
#[test]
fn spawn_blocking_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@@ -542,7 +516,7 @@ rt_test! {
#[test]
fn delay_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async move {
assert_ok!(tokio::task::spawn_blocking(|| {
@@ -562,7 +536,7 @@ rt_test! {
#[test]
fn socket_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async move {
let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -586,7 +560,7 @@ rt_test! {
#[test]
fn spawn_blocking_after_shutdown() {
let rt = rt();
- let handle = rt.handle().clone();
+ let handle = rt.clone();
// Shutdown
drop(rt);
@@ -615,7 +589,7 @@ rt_test! {
// test is disabled.
#[cfg(not(windows))]
fn io_driver_called_when_under_load() {
- let mut rt = rt();
+ let rt = rt();
// Create a lot of constant load. The scheduler will always be busy.
for _ in 0..100 {
@@ -651,7 +625,7 @@ rt_test! {
#[test]
fn client_server_block_on() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
@@ -662,7 +636,7 @@ rt_test! {
#[test]
fn panic_in_task() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = oneshot::channel();
struct Boom(Option<oneshot::Sender<()>>);
@@ -689,7 +663,7 @@ rt_test! {
#[test]
#[should_panic]
fn panic_in_block_on() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async { panic!() });
}
@@ -709,7 +683,7 @@ rt_test! {
#[test]
fn enter_and_spawn() {
- let mut rt = rt();
+ let rt = rt();
let handle = rt.enter(|| {
tokio::spawn(async {})
});
@@ -739,7 +713,7 @@ rt_test! {
}
}
- let mut rt = rt();
+ let rt = rt();
let (drop_tx, drop_rx) = mpsc::channel();
let (run_tx, run_rx) = oneshot::channel();
@@ -775,17 +749,17 @@ rt_test! {
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
- let mut rt = rt();
+ let rt = rt();
- let h1 = rt.handle().clone();
+ let h1 = rt.clone();
- rt.handle().spawn(async move {
+ rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
let _ = rx1.await;
tx3.send(()).unwrap();
});
- rt.handle().spawn(async move {
+ rt.spawn(async move {
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
@@ -802,7 +776,7 @@ rt_test! {
let _ = rx2.await;
});
- rt.handle().spawn(async move {
+ rt.spawn(async move {
let _ = rx3.await;
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
@@ -823,7 +797,7 @@ rt_test! {
use std::net::Ipv6Addr;
for _ in 1..10 {
- let mut runtime = rt();
+ let runtime = rt();
runtime.block_on(async {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
@@ -854,7 +828,7 @@ rt_test! {
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
- let mut runtime = rt();
+ let runtime = rt();
runtime.block_on(async move {
task::spawn_blocking(move || {
@@ -865,18 +839,18 @@ rt_test! {
rx.await.unwrap();
});
- runtime.shutdown_timeout(Duration::from_millis(100));
+ Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
}
#[test]
fn shutdown_wakeup_time() {
- let mut runtime = rt();
+ let runtime = rt();
runtime.block_on(async move {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
});
- runtime.shutdown_timeout(Duration::from_secs(10_000));
+ Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
}
// This test is currently ignored on Windows because of a
@@ -894,7 +868,9 @@ rt_test! {
thread::spawn(|| {
R.with(|cell| {
- *cell.borrow_mut() = Some(rt());
+ let rt = rt();
+ let rt = Arc::try_unwrap(rt).unwrap();
+ *cell.borrow_mut() = Some(rt);
});
let _rt = rt();
@@ -927,10 +903,10 @@ rt_test! {
#[test]
fn local_set_block_on_socket() {
- let mut rt = rt();
+ let rt = rt();
let local = task::LocalSet::new();
- local.block_on(&mut rt, async move {
+ local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -948,12 +924,12 @@ rt_test! {
#[test]
fn local_set_client_server_block_on() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = mpsc::channel();
let local = task::LocalSet::new();
- local.block_on(&mut rt, async move { client_server_local(tx).await });
+ local.block_on(&rt, async move { client_server_local(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
@@ -987,7 +963,7 @@ rt_test! {
fn coop() {
use std::task::Poll::Ready;
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
// Create a bunch of tasks
@@ -1019,7 +995,7 @@ rt_test! {
const NUM: usize = 100;
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();