diff options
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/io_driver.rs | 6 | ||||
-rw-r--r-- | tokio/tests/io_driver_drop.rs | 12 | ||||
-rw-r--r-- | tokio/tests/process_issue_42.rs | 6 | ||||
-rw-r--r-- | tokio/tests/rt_basic.rs | 1 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 160 | ||||
-rw-r--r-- | tokio/tests/rt_threaded.rs | 13 | ||||
-rw-r--r-- | tokio/tests/signal_drop_rt.rs | 1 | ||||
-rw-r--r-- | tokio/tests/signal_multi_rt.rs | 1 | ||||
-rw-r--r-- | tokio/tests/time_rt.rs | 6 |
9 files changed, 186 insertions, 20 deletions
diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs index a97de8d2..ec51373f 100644 --- a/tokio/tests/io_driver.rs +++ b/tokio/tests/io_driver.rs @@ -44,7 +44,11 @@ fn test_drop_on_notify() { // shutting down. Then, when the task handle is dropped, the task itself is // dropped. - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let (addr_tx, addr_rx) = mpsc::channel(); diff --git a/tokio/tests/io_driver_drop.rs b/tokio/tests/io_driver_drop.rs index c75631b7..6b923dd3 100644 --- a/tokio/tests/io_driver_drop.rs +++ b/tokio/tests/io_driver_drop.rs @@ -6,7 +6,7 @@ use tokio_test::{assert_err, assert_pending, assert_ready, task}; #[test] fn tcp_doesnt_block() { - let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = rt(); let mut listener = rt.enter(|| { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); @@ -24,7 +24,7 @@ fn tcp_doesnt_block() { #[test] fn drop_wakes() { - let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = rt(); let mut listener = rt.enter(|| { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); @@ -42,3 +42,11 @@ fn drop_wakes() { assert!(task.is_woken()); assert_ready!(task.poll()); } + +fn rt() -> runtime::Runtime { + runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index 5571c199..1b8dcc95 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -18,7 +18,11 @@ fn run_test() { let finished_clone = finished.clone(); thread::spawn(move || { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let mut futures = FuturesOrdered::new(); rt.block_on(async { diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 039bb22f..700f1029 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -29,6 +29,7 @@ fn spawned_task_does_not_progress_without_block_on() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 33f779c7..b360158d 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -10,16 +10,21 @@ macro_rules! rt_test { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } } - mod thread_pool { + mod threaded_scheduler { $($t)* fn rt() -> Runtime { - Runtime::new().unwrap() + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() } } } @@ -341,7 +346,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = Runtime::new().unwrap(); + let mut rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -360,6 +365,100 @@ rt_test! { } #[test] + fn spawn_from_blocking() { + let mut rt = rt(); + + let out = rt.block_on(async move { + let inner = assert_ok!(tokio::task::spawn_blocking(|| { + tokio::spawn(async move { "hello" }) + }).await); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn delay_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + assert_ok!(tokio::task::spawn_blocking(|| { + let now = std::time::Instant::now(); + let dur = Duration::from_millis(1); + + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + tokio::time::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); + }).await); + }); + } + + #[test] + fn socket_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let peer = tokio::task::spawn_blocking(move || { + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + assert_ok!(TcpStream::connect(addr).await); + }); + }); + + // Wait for the client to connect + let _ = assert_ok!(listener.accept().await); + + assert_ok!(peer.await); + }); + } + + #[test] + fn io_driver_called_when_under_load() { + let mut rt = rt(); + + // Create a lot of constant load. The scheduler will always be busy. + for _ in 0..100 { + rt.spawn(async { + loop { + tokio::task::yield_now().await; + } + }); + } + + // Do some I/O work + rt.block_on(async { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let srv = tokio::spawn(async move { + let (mut stream, _) = assert_ok!(listener.accept().await); + assert_ok!(stream.write_all(b"hello world").await); + }); + + let cli = tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(addr).await); + let mut dst = vec![0; 11]; + + assert_ok!(stream.read_exact(&mut dst).await); + assert_eq!(dst, b"hello world"); + }); + + assert_ok!(srv.await); + assert_ok!(cli.await); + }); + } + + #[test] fn client_server_block_on() { let mut rt = rt(); let (tx, rx) = mpsc::channel(); @@ -371,12 +470,11 @@ rt_test! { } #[test] - #[ignore] fn panic_in_task() { - let rt = rt(); - let (tx, rx) = mpsc::channel(); + let mut rt = rt(); + let (tx, rx) = oneshot::channel(); - struct Boom(mpsc::Sender<()>); + struct Boom(Option<oneshot::Sender<()>>); impl Future for Boom { type Output = (); @@ -389,12 +487,12 @@ rt_test! { impl Drop for Boom { fn drop(&mut self) { assert!(::std::thread::panicking()); - self.0.send(()).unwrap(); + self.0.take().unwrap().send(()).unwrap(); } } - rt.spawn(Boom(tx)); - rx.recv().unwrap(); + rt.spawn(Boom(Some(tx))); + assert_ok!(rt.block_on(rx)); } #[test] @@ -428,6 +526,48 @@ rt_test! { assert_ok!(rt.block_on(handle)); } + #[test] + fn eagerly_drops_futures_on_shutdown() { + use std::sync::mpsc; + + struct Never { + drop_tx: mpsc::Sender<()>, + } + + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.drop_tx.send(()).unwrap(); + } + } + + let mut rt = rt(); + + let (drop_tx, drop_rx) = mpsc::channel(); + let (run_tx, run_rx) = oneshot::channel(); + + rt.block_on(async move { + tokio::spawn(async move { + assert_ok!(run_tx.send(())); + + Never { drop_tx }.await + }); + + assert_ok!(run_rx.await); + }); + + drop(rt); + + assert_ok!(drop_rx.recv()); + } + async fn client_server(tx: mpsc::Sender<()>) { let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 8be6d036..4b4c880e 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -18,6 +18,7 @@ fn single_thread() { // No panic when starting a runtime w/ a single thread let _ = runtime::Builder::new() .threaded_scheduler() + .enable_all() .num_threads(1) .build(); } @@ -189,10 +190,11 @@ fn drop_threadpool_drops_futures() { let rt = runtime::Builder::new() .threaded_scheduler() - .after_start(move || { + .enable_all() + .on_thread_start(move || { a.fetch_add(1, Relaxed); }) - .before_stop(move || { + .on_thread_stop(move || { b.fetch_add(1, Relaxed); }) .build() @@ -218,7 +220,7 @@ fn drop_threadpool_drops_futures() { } #[test] -fn after_start_and_before_stop_is_called() { +fn start_stop_callbacks_called() { use std::sync::atomic::{AtomicUsize, Ordering}; let after_start = Arc::new(AtomicUsize::new(0)); @@ -228,10 +230,11 @@ fn after_start_and_before_stop_is_called() { let before_inner = before_stop.clone(); let mut rt = tokio::runtime::Builder::new() .threaded_scheduler() - .after_start(move || { + .enable_all() + .on_thread_start(move || { after_inner.clone().fetch_add(1, Ordering::Relaxed); }) - .before_stop(move || { + .on_thread_stop(move || { before_inner.clone().fetch_add(1, Ordering::Relaxed); }) .build() diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index 0cb7d482..fc790c28 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -38,6 +38,7 @@ fn dropping_loops_does_not_cause_starvation() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 8020c593..e34e7c09 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -48,6 +48,7 @@ fn multi_loop() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index 235d1960..7de9c345 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -27,7 +27,11 @@ fn timer_with_threaded_runtime() { fn timer_with_basic_scheduler() { use tokio::runtime::Builder; - let mut rt = Builder::new().basic_scheduler().build().unwrap(); + let mut rt = Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { |