diff options
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/clock.rs | 20 | ||||
-rw-r--r-- | tokio/tests/current_thread.rs | 781 | ||||
-rw-r--r-- | tokio/tests/process_issue_42.rs | 5 | ||||
-rw-r--r-- | tokio/tests/rt_current_thread.rs | 339 | ||||
-rw-r--r-- | tokio/tests/rt_thread_pool.rs (renamed from tokio/tests/runtime_threaded.rs) | 29 | ||||
-rw-r--r-- | tokio/tests/runtime_current_thread.rs | 137 | ||||
-rw-r--r-- | tokio/tests/signal_drop_rt.rs | 13 | ||||
-rw-r--r-- | tokio/tests/signal_multi_rt.rs | 11 | ||||
-rw-r--r-- | tokio/tests/timer_hammer.rs | 13 | ||||
-rw-r--r-- | tokio/tests/timer_rt.rs | 7 |
10 files changed, 405 insertions, 950 deletions
diff --git a/tokio/tests/clock.rs b/tokio/tests/clock.rs index 0000fc6d..29035bfb 100644 --- a/tokio/tests/clock.rs +++ b/tokio/tests/clock.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio::runtime::{self, current_thread}; +use tokio::runtime; use tokio::timer::clock::Clock; use tokio::timer::*; @@ -20,14 +20,16 @@ fn clock_and_timer_concurrent() { let when = Instant::now() + Duration::from_millis(5_000); let clock = Clock::new_with_now(MockNow(when)); - let rt = runtime::Builder::new().clock(clock).build().unwrap(); + let mut rt = runtime::Builder::new().clock(clock).build().unwrap(); let (tx, rx) = mpsc::channel(); - rt.spawn(async move { - delay(when).await; - assert!(Instant::now() < when); - tx.send(()).unwrap(); + rt.block_on(async move { + tokio::spawn(async move { + delay(when).await; + assert!(Instant::now() < when); + tx.send(()).unwrap(); + }) }); rx.recv().unwrap(); @@ -38,7 +40,11 @@ fn clock_and_timer_single_threaded() { let when = Instant::now() + Duration::from_millis(5_000); let clock = Clock::new_with_now(MockNow(when)); - let mut rt = current_thread::Builder::new().clock(clock).build().unwrap(); + let mut rt = runtime::Builder::new() + .current_thread() + .clock(clock) + .build() + .unwrap(); rt.block_on(async move { delay(when).await; diff --git a/tokio/tests/current_thread.rs b/tokio/tests/current_thread.rs deleted file mode 100644 index 6a788884..00000000 --- a/tokio/tests/current_thread.rs +++ /dev/null @@ -1,781 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(not(miri))] - -use tokio::executor::current_thread::{self, block_on_all, CurrentThread, TaskExecutor}; -use tokio::executor::TypedExecutor; -use tokio::sync::oneshot; - -use std::any::Any; -use std::cell::{Cell, RefCell}; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -mod from_block_on_all { - use super::*; - fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static>(spawn: F) { - let cnt = Rc::new(Cell::new(0)); - let c = cnt.clone(); - - let msg = block_on_all(async move { - c.set(1 + c.get()); - - // Spawn! - spawn(Box::pin(async move { - c.set(1 + c.get()); - })); - - "hello" - }); - - assert_eq!(2, cnt.get()); - assert_eq!(msg, "hello"); - } - - #[test] - fn spawn() { - test(current_thread::spawn) - } - - #[test] - fn execute() { - test(|f| { - TaskExecutor::current().spawn(f).unwrap(); - }); - } -} - -#[test] -fn block_waits() { - let (tx, rx) = oneshot::channel(); - - thread::spawn(|| { - thread::sleep(Duration::from_millis(1000)); - tx.send(()).unwrap(); - }); - - let cnt = Rc::new(Cell::new(0)); - let cnt2 = cnt.clone(); - - block_on_all(async move { - rx.await.unwrap(); - cnt.set(1 + cnt.get()); - }); - - assert_eq!(1, cnt2.get()); -} - -#[test] -fn spawn_many() { - const ITER: usize = 200; - - let cnt = Rc::new(Cell::new(0)); - let mut tokio_current_thread = CurrentThread::new(); - - for _ in 0..ITER { - let cnt = cnt.clone(); - tokio_current_thread.spawn(async move { - cnt.set(1 + cnt.get()); - }); - } - - tokio_current_thread.run().unwrap(); - - assert_eq!(cnt.get(), ITER); -} - -mod does_not_set_global_executor_by_default { - use super::*; - - fn test<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<(), E> + 'static, E>( - spawn: F, - ) { - block_on_all(async { - spawn(Box::pin(async {})).unwrap_err(); - }); - } - - #[test] - fn spawn() { - test(|f| tokio::executor::DefaultExecutor::current().spawn(f)) - } -} - -mod from_block_on_future { - use super::*; - - fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>)>(spawn: F) { - let cnt = Rc::new(Cell::new(0)); - let cnt2 = cnt.clone(); - - let mut tokio_current_thread = CurrentThread::new(); - - tokio_current_thread.block_on(async move { - let cnt3 = cnt2.clone(); - - spawn(Box::pin(async move { - cnt3.set(1 + cnt3.get()); - })); - }); - - tokio_current_thread.run().unwrap(); - - assert_eq!(1, cnt.get()); - } - - #[test] - fn spawn() { - test(current_thread::spawn); - } - - #[test] - fn execute() { - test(|f| { - current_thread::TaskExecutor::current().spawn(f).unwrap(); - }); - } -} - -mod outstanding_tasks_are_dropped_when_executor_is_dropped { - use super::*; - - #[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed. - async fn never(_rc: Rc<()>) { - loop { - yield_once().await; - } - } - - fn test<F, G>(spawn: F, dotspawn: G) - where - F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static, - G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>), - { - let mut rc = Rc::new(()); - - let mut tokio_current_thread = CurrentThread::new(); - dotspawn(&mut tokio_current_thread, Box::pin(never(rc.clone()))); - - drop(tokio_current_thread); - - // Ensure the daemon is dropped - assert!(Rc::get_mut(&mut rc).is_some()); - - // Using the global spawn fn - - let mut rc = Rc::new(()); - let rc2 = rc.clone(); - - let mut tokio_current_thread = CurrentThread::new(); - - tokio_current_thread.block_on(async move { - spawn(Box::pin(never(rc2))); - }); - - drop(tokio_current_thread); - - // Ensure the daemon is dropped - assert!(Rc::get_mut(&mut rc).is_some()); - } - - #[test] - fn spawn() { - test(current_thread::spawn, |rt, f| { - rt.spawn(f); - }) - } - - #[test] - fn execute() { - test( - |f| { - current_thread::TaskExecutor::current().spawn(f).unwrap(); - }, - // Note: `CurrentThread` doesn't currently implement - // `futures::Executor`, so we'll call `.spawn(...)` rather than - // `.execute(...)` for now. If `CurrentThread` is changed to - // implement Executor, change this to `.execute(...).unwrap()`. - |rt, f| { - rt.spawn(f); - }, - ); - } -} - -#[test] -#[should_panic] -fn nesting_run() { - block_on_all(async { - block_on_all(async {}); - }); -} - -mod run_in_future { - use super::*; - - #[test] - #[should_panic] - fn spawn() { - block_on_all(async { - current_thread::spawn(async { - block_on_all(async {}); - }); - }); - } - - #[test] - #[should_panic] - fn execute() { - block_on_all(async { - current_thread::TaskExecutor::current() - .spawn(async { - block_on_all(async {}); - }) - .unwrap(); - }); - } -} - -#[test] -fn tick_on_infini_future() { - let num = Rc::new(Cell::new(0)); - - #[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed. - async fn infini(num: Rc<Cell<usize>>) { - loop { - num.set(1 + num.get()); - yield_once().await - } - } - - CurrentThread::new() - .spawn(infini(num.clone())) - .turn(None) - .unwrap(); - - assert_eq!(1, num.get()); -} - -mod tasks_are_scheduled_fairly { - use super::*; - - #[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed. - async fn spin(state: Rc<RefCell<[i32; 2]>>, idx: usize) { - loop { - // borrow_mut scope - { - let mut state = state.borrow_mut(); - - if idx == 0 { - let diff = state[0] - state[1]; - - assert!(diff.abs() <= 1); - - if state[0] >= 50 { - return; - } - } - - state[idx] += 1; - - if state[idx] >= 100 { - return; - } - } - - yield_once().await; - } - } - - fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>)>(spawn: F) { - let state = Rc::new(RefCell::new([0, 0])); - - block_on_all(async move { - spawn(Box::pin(spin(state.clone(), 0))); - spawn(Box::pin(spin(state, 1))); - }); - } - - #[test] - fn spawn() { - test(current_thread::spawn) - } - - #[test] - fn execute() { - test(|f| { - current_thread::TaskExecutor::current().spawn(f).unwrap(); - }) - } -} - -mod and_turn { - use super::*; - - fn test<F, G>(spawn: F, dotspawn: G) - where - F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static, - G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>), - { - let cnt = Rc::new(Cell::new(0)); - let c = cnt.clone(); - - let mut tokio_current_thread = CurrentThread::new(); - - // Spawn a basic task to get the executor to turn - dotspawn(&mut tokio_current_thread, Box::pin(async {})); - - // Turn once... - tokio_current_thread.turn(None).unwrap(); - - dotspawn( - &mut tokio_current_thread, - Box::pin(async move { - c.set(1 + c.get()); - - // Spawn! - spawn(Box::pin(async move { - c.set(1 + c.get()); - })); - }), - ); - - // This does not run the newly spawned thread - tokio_current_thread.turn(None).unwrap(); - assert_eq!(1, cnt.get()); - - // This runs the newly spawned thread - tokio_current_thread.turn(None).unwrap(); - assert_eq!(2, cnt.get()); - } - - #[test] - fn spawn() { - test(current_thread::spawn, |rt, f| { - rt.spawn(f); - }) - } - - #[test] - fn execute() { - test( - |f| { - current_thread::TaskExecutor::current().spawn(f).unwrap(); - }, - // Note: `CurrentThread` doesn't currently implement - // `futures::Executor`, so we'll call `.spawn(...)` rather than - // `.execute(...)` for now. If `CurrentThread` is changed to - // implement Executor, change this to `.execute(...).unwrap()`. - |rt, f| { - rt.spawn(f); - }, - ); - } -} - -mod in_drop { - use super::*; - struct OnDrop<F: FnOnce()>(Option<F>); - - impl<F: FnOnce()> Drop for OnDrop<F> { - fn drop(&mut self) { - (self.0.take().unwrap())(); - } - } - - async fn noop(_data: Box<dyn Any>) {} - - fn test<F, G>(spawn: F, dotspawn: G) - where - F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static, - G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>), - { - let mut tokio_current_thread = CurrentThread::new(); - - let (tx, rx) = oneshot::channel(); - - dotspawn( - &mut tokio_current_thread, - Box::pin(noop(Box::new(OnDrop(Some(move || { - spawn(Box::pin(async move { - tx.send(()).unwrap(); - })); - }))))), - ); - - tokio_current_thread.block_on(rx).unwrap(); - tokio_current_thread.run().unwrap(); - } - - #[test] - fn spawn() { - test(current_thread::spawn, |rt, f| { - rt.spawn(f); - }) - } - - #[test] - fn execute() { - test( - |f| { - current_thread::TaskExecutor::current().spawn(f).unwrap(); - }, - // Note: `CurrentThread` doesn't currently implement - // `futures::Executor`, so we'll call `.spawn(...)` rather than - // `.execute(...)` for now. If `CurrentThread` is changed to - // implement Executor, change this to `.execute(...).unwrap()`. - |rt, f| { - rt.spawn(f); - }, - ); - } -} - -/* -#[test] -fn hammer_turn() { - use futures::sync::mpsc; - - const ITER: usize = 100; - const N: usize = 100; - const THREADS: usize = 4; - - for _ in 0..ITER { - let mut ths = vec![]; - - // Add some jitter - for _ in 0..THREADS { - let th = thread::spawn(|| { - let mut tokio_current_thread = CurrentThread::new(); - - let (tx, rx) = mpsc::unbounded(); - - tokio_current_thread.spawn({ - let cnt = Rc::new(Cell::new(0)); - let c = cnt.clone(); - - rx.for_each(move |_| { - c.set(1 + c.get()); - Ok(()) - }) - .map_err(|e| panic!("err={:?}", e)) - .map(move |v| { - assert_eq!(N, cnt.get()); - v - }) - }); - - thread::spawn(move || { - for _ in 0..N { - tx.unbounded_send(()).unwrap(); - thread::yield_now(); - } - }); - - while !tokio_current_thread.is_idle() { - tokio_current_thread.turn(None).unwrap(); - } - }); - - ths.push(th); - } - - for th in ths { - th.join().unwrap(); - } - } -} -*/ - -#[test] -fn turn_has_polled() { - let mut tokio_current_thread = CurrentThread::new(); - - // Spawn oneshot receiver - let (sender, receiver) = oneshot::channel::<()>(); - tokio_current_thread.spawn(async move { - let _ = receiver.await; - }); - - // Turn once... - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - - // Should've polled the receiver once, but considered it not ready - assert!(res.has_polled()); - - // Turn another time - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - - // Should've polled nothing, the receiver is not ready yet - assert!(!res.has_polled()); - - // Make the receiver ready - sender.send(()).unwrap(); - - // Turn another time - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - - // Should've polled the receiver, it's ready now - assert!(res.has_polled()); - - // Now the executor should be empty - assert!(tokio_current_thread.is_idle()); - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - - // So should've polled nothing - assert!(!res.has_polled()); -} - -// Our own mock Park that is never really waiting and the only -// thing it does is to send, on request, something (once) to a oneshot -// channel -struct MyPark { - sender: Option<oneshot::Sender<()>>, - send_now: Rc<Cell<bool>>, -} - -struct MyUnpark; - -impl tokio::executor::park::Park for MyPark { - type Unpark = MyUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MyUnpark - } - - fn park(&mut self) -> Result<(), Self::Error> { - // If called twice with send_now, this will intentionally panic - if self.send_now.get() { - self.sender.take().unwrap().send(()).unwrap(); - } - - Ok(()) - } - - fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { - self.park() - } -} - -impl tokio::executor::park::Unpark for MyUnpark { - fn unpark(&self) {} -} - -#[test] -fn turn_fair() { - let send_now = Rc::new(Cell::new(false)); - - let (sender, receiver) = oneshot::channel::<()>(); - let (sender_2, receiver_2) = oneshot::channel::<()>(); - let (sender_3, receiver_3) = oneshot::channel::<()>(); - - let my_park = MyPark { - sender: Some(sender_3), - send_now: send_now.clone(), - }; - - let mut tokio_current_thread = CurrentThread::new_with_park(my_park); - - let receiver_1_done = Rc::new(Cell::new(false)); - let receiver_1_done_clone = receiver_1_done.clone(); - - // Once an item is received on the oneshot channel, it will immediately - // immediately make the second oneshot channel ready - - tokio_current_thread.spawn(async move { - receiver.await.unwrap(); - sender_2.send(()).unwrap(); - receiver_1_done_clone.set(true); - }); - - let receiver_2_done = Rc::new(Cell::new(false)); - let receiver_2_done_clone = receiver_2_done.clone(); - - tokio_current_thread.spawn(async move { - receiver_2.await.unwrap(); - receiver_2_done_clone.set(true); - }); - - // The third receiver is only woken up from our Park implementation, it simulates - // e.g. a socket that first has to be polled to know if it is ready now - let receiver_3_done = Rc::new(Cell::new(false)); - let receiver_3_done_clone = receiver_3_done.clone(); - - tokio_current_thread.spawn(async move { - receiver_3.await.unwrap(); - receiver_3_done_clone.set(true); - }); - - // First turn should've polled both and considered them not ready - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - assert!(res.has_polled()); - - // Next turn should've polled nothing - let res = tokio_current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap(); - assert!(!res.has_polled()); - - assert!(!receiver_1_done.get()); - assert!(!receiver_2_done.get()); - assert!(!receiver_3_done.get()); - - // After this the receiver future will wake up the second receiver future, - // so there are pending futures again - sender.send(()).unwrap(); - - // Now the first receiver should be done, the second receiver should be ready - // to be polled again and the socket not yet - let res = tokio_current_thread.turn(None).unwrap(); - assert!(res.has_polled()); - - assert!(receiver_1_done.get()); - assert!(!receiver_2_done.get()); - assert!(!receiver_3_done.get()); - - // Now let our park implementation know that it should send something to sender 3 - send_now.set(true); - - // This should resolve the second receiver directly, but also poll the socket - // and read the packet from it. If it didn't do both here, we would handle - // futures that are woken up from the reactor and directly unfairly and would - // favour the ones that are woken up directly. - let res = tokio_current_thread.turn(None).unwrap(); - assert!(res.has_polled()); - - assert!(receiver_1_done.get()); - assert!(receiver_2_done.get()); - assert!(receiver_3_done.get()); - - // Don't send again - send_now.set(false); - - // Now we should be idle and turning should not poll anything - assert!(tokio_current_thread.is_idle()); - let res = tokio_current_thread.turn(None).unwrap(); - assert!(!res.has_polled()); -} - -#[test] -fn spawn_from_other_thread() { - let mut current_thread = CurrentThread::new(); - - let handle = current_thread.handle(); - let (sender, receiver) = oneshot::channel::<()>(); - - thread::spawn(move || { - handle - .spawn(async move { - sender.send(()).unwrap(); - }) - .unwrap(); - }); - - let _ = current_thread.block_on(receiver).unwrap(); -} - -#[test] -fn spawn_from_other_thread_unpark() { - use std::sync::mpsc::channel as mpsc_channel; - - let mut current_thread = CurrentThread::new(); - - let handle = current_thread.handle(); - let (sender_1, receiver_1) = oneshot::channel::<()>(); - let (sender_2, receiver_2) = mpsc_channel::<()>(); - - thread::spawn(move || { - let _ = receiver_2.recv().unwrap(); - - handle - .spawn(async move { - sender_1.send(()).unwrap(); - }) - .unwrap(); - }); - - // Ensure that unparking the executor works correctly. It will first - // check if there are new futures (there are none), then execute the - // lazy future below which will cause the future to be spawned from - // the other thread. Then the executor will park but should be woken - // up because *now* we have a new future to schedule - let _ = current_thread.block_on(async move { - // inlined 'lazy' - async move { - sender_2.send(()).unwrap(); - } - .await; - receiver_1.await.unwrap(); - }); -} - -#[test] -fn spawn_from_executor_with_handle() { - let mut current_thread = CurrentThread::new(); - let handle = current_thread.handle(); - let (tx, rx) = oneshot::channel(); - - current_thread.spawn(async move { - handle - .spawn(async move { - tx.send(()).unwrap(); - }) - .unwrap(); - }); - - current_thread.block_on(rx).unwrap(); -} - -#[test] -fn handle_status() { - let current_thread = CurrentThread::new(); - let handle = current_thread.handle(); - assert!(handle.status().is_ok()); - - drop(current_thread); - assert!(handle.spawn(async { () }).is_err()); - assert!(handle.status().is_err()); -} - -#[test] -fn handle_is_sync() { - let current_thread = CurrentThread::new(); - let handle = current_thread.handle(); - - let _box: Box<dyn Sync> = Box::new(handle); -} - -async fn yield_once() { - YieldOnce(false).await -} - -struct YieldOnce(bool); - -impl Future for YieldOnce { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.0 { - Poll::Ready(()) - } else { - self.0 = true; - // Push to the back of the executor's queue - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index 3631aa7b..9de9d0bf 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -3,7 +3,7 @@ #![warn(rust_2018_idioms)] use tokio::process::Command; -use tokio::runtime::current_thread; +use tokio::runtime; use futures_util::future::FutureExt; use futures_util::stream::FuturesOrdered; @@ -18,7 +18,8 @@ fn run_test() { let finished_clone = finished.clone(); thread::spawn(move || { - let mut rt = current_thread::Runtime::new().expect("failed to get runtime"); + let mut rt = runtime::Builder::new().current_thread().build().unwrap(); + let mut futures = FuturesOrdered::new(); rt.block_on(async { for i in 0..2 { diff --git a/tokio/tests/rt_current_thread.rs b/tokio/tests/rt_current_thread.rs new file mode 100644 index 00000000..1f0fdece --- /dev/null +++ b/tokio/tests/rt_current_thread.rs @@ -0,0 +1,339 @@ +#![warn(rust_2018_idioms)] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio::runtime::Runtime; +use tokio::sync::oneshot; +use tokio::timer; +use tokio_test::{assert_err, assert_ok}; + +use futures_util::future::poll_fn; +use std::sync::{mpsc, Arc}; +use std::task::Poll; +use std::thread; +use std::time::{Duration, Instant}; + +#[test] +fn block_on_sync() { + let mut rt = rt(); + + let mut win = false; + rt.block_on(async { + win = true; + }); + + assert!(win); +} + +#[test] +fn block_on_async() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + tx.send("ZOMG").unwrap(); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); +} + +#[test] +fn spawn_one() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); +} + +#[test] +fn spawn_two() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(async move { + assert_ok!(tx1.send("ZOMG")); + }); + + tokio::spawn(async move { + let msg = assert_ok!(rx1.await); + assert_ok!(tx2.send(msg)); + }); + + assert_ok!(rx2.await) + }); + + assert_eq!(out, "ZOMG"); +} + +#[test] +fn spawn_many() { + use tokio::sync::mpsc; + + const ITER: usize = 10; + + let mut rt = rt(); + + let out = rt.block_on(async { + let (done_tx, mut done_rx) = mpsc::unbounded_channel(); + + let mut txs = (0..ITER) + .map(|i| { + let (tx, rx) = oneshot::channel(); + let mut done_tx = done_tx.clone(); + + tokio::spawn(async move { + let msg = assert_ok!(rx.await); + assert_eq!(i, msg); + assert_ok!(done_tx.try_send(msg)); + }); + + tx + }) + .collect::<Vec<_>>(); + + drop(done_tx); + + thread::spawn(move || { + for (i, tx) in txs.drain(..).enumerate() { + assert_ok!(tx.send(i)); + } + }); + + let mut out = vec![]; + while let Some(i) = done_rx.recv().await { + out.push(i); + } + + out.sort(); + out + }); + + assert_eq!(ITER, out.len()); + + for i in 0..ITER { + assert_eq!(i, out[i]); + } +} + +#[test] +fn outstanding_tasks_dropped() { + let mut rt = rt(); + + let cnt = Arc::new(()); + + rt.block_on(async { + let cnt = cnt.clone(); + + tokio::spawn(poll_fn(move |_| { + assert_eq!(2, Arc::strong_count(&cnt)); + Poll::Pending + })); + }); + + assert_eq!(2, Arc::strong_count(&cnt)); + + drop(rt); + + assert_eq!(1, Arc::strong_count(&cnt)); +} + +#[test] +#[should_panic] +fn nested_rt() { + let mut rt1 = rt(); + let mut rt2 = rt(); + + rt1.block_on(async { rt2.block_on(async { "hello" }) }); +} + +#[test] +fn create_rt_in_block_on() { + let mut rt1 = rt(); + let mut rt2 = rt1.block_on(async { rt() }); + let out = rt2.block_on(async { "ZOMG" }); + + assert_eq!(out, "ZOMG"); +} + +#[test] +fn complete_block_on_under_load() { + let mut rt = rt(); + + rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + assert_ok!(tx.send(())); + }); + + assert_ok!(rx.await); + }); +} + +#[test] +fn complete_task_under_load() { + let mut rt = rt(); + + rt.block_on(async { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + assert_ok!(tx1.send(())); + }); + + tokio::spawn(async move { + assert_ok!(rx1.await); + assert_ok!(tx2.send(())); + }); + + assert_ok!(rx2.await); + }); +} + +#[test] +fn spawn_from_other_thread() { + let mut rt = rt(); + let sp = rt.spawner(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + + sp.spawn(async move { + assert_ok!(tx.send(())); + }); + }); + + rt.block_on(async move { + assert_ok!(rx.await); + }); +} + +#[test] +fn delay_at_root() { + let mut rt = rt(); + + let now = Instant::now(); + let dur = Duration::from_millis(50); + + rt.block_on(async move { + timer::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); +} + +#[test] +fn delay_in_spawn() { + let mut rt = rt(); + + let now = Instant::now(); + let dur = Duration::from_millis(50); + + rt.block_on(async move { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + timer::delay_for(dur).await; + assert_ok!(tx.send(())); + }); + + assert_ok!(rx.await); + }); + + assert!(now.elapsed() >= dur); +} + +#[test] +fn client_server_block_on() { + let _ = env_logger::try_init(); + + let mut rt = rt(); + let (tx, rx) = mpsc::channel(); + + rt.block_on(async move { client_server(tx).await }); + + assert_ok!(rx.try_recv()); + assert_err!(rx.try_recv()); +} + +async fn yield_once() { + let mut yielded = false; + poll_fn(|cx| { + if yielded { + Poll::Ready(()) + } else { + yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await +} + +async fn client_server(tx: mpsc::Sender<()>) { + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + + // Get the assigned address + let addr = assert_ok!(server.local_addr()); + + // Spawn the ser |