summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/io_driver.rs6
-rw-r--r--tokio/tests/io_driver_drop.rs12
-rw-r--r--tokio/tests/process_issue_42.rs6
-rw-r--r--tokio/tests/rt_basic.rs1
-rw-r--r--tokio/tests/rt_common.rs160
-rw-r--r--tokio/tests/rt_threaded.rs13
-rw-r--r--tokio/tests/signal_drop_rt.rs1
-rw-r--r--tokio/tests/signal_multi_rt.rs1
-rw-r--r--tokio/tests/time_rt.rs6
9 files changed, 186 insertions, 20 deletions
diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs
index a97de8d2..ec51373f 100644
--- a/tokio/tests/io_driver.rs
+++ b/tokio/tests/io_driver.rs
@@ -44,7 +44,11 @@ fn test_drop_on_notify() {
// shutting down. Then, when the task handle is dropped, the task itself is
// dropped.
- let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+ let mut rt = runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap();
let (addr_tx, addr_rx) = mpsc::channel();
diff --git a/tokio/tests/io_driver_drop.rs b/tokio/tests/io_driver_drop.rs
index c75631b7..6b923dd3 100644
--- a/tokio/tests/io_driver_drop.rs
+++ b/tokio/tests/io_driver_drop.rs
@@ -6,7 +6,7 @@ use tokio_test::{assert_err, assert_pending, assert_ready, task};
#[test]
fn tcp_doesnt_block() {
- let rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+ let rt = rt();
let mut listener = rt.enter(|| {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
@@ -24,7 +24,7 @@ fn tcp_doesnt_block() {
#[test]
fn drop_wakes() {
- let rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+ let rt = rt();
let mut listener = rt.enter(|| {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
@@ -42,3 +42,11 @@ fn drop_wakes() {
assert!(task.is_woken());
assert_ready!(task.poll());
}
+
+fn rt() -> runtime::Runtime {
+ runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap()
+}
diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs
index 5571c199..1b8dcc95 100644
--- a/tokio/tests/process_issue_42.rs
+++ b/tokio/tests/process_issue_42.rs
@@ -18,7 +18,11 @@ fn run_test() {
let finished_clone = finished.clone();
thread::spawn(move || {
- let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+ let mut rt = runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap();
let mut futures = FuturesOrdered::new();
rt.block_on(async {
diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs
index 039bb22f..700f1029 100644
--- a/tokio/tests/rt_basic.rs
+++ b/tokio/tests/rt_basic.rs
@@ -29,6 +29,7 @@ fn spawned_task_does_not_progress_without_block_on() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
+ .enable_all()
.build()
.unwrap()
}
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 33f779c7..b360158d 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -10,16 +10,21 @@ macro_rules! rt_test {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
+ .enable_all()
.build()
.unwrap()
}
}
- mod thread_pool {
+ mod threaded_scheduler {
$($t)*
fn rt() -> Runtime {
- Runtime::new().unwrap()
+ tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build()
+ .unwrap()
}
}
}
@@ -341,7 +346,7 @@ rt_test! {
#[test]
fn block_on_socket() {
- let mut rt = Runtime::new().unwrap();
+ let mut rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
@@ -360,6 +365,100 @@ rt_test! {
}
#[test]
+ fn spawn_from_blocking() {
+ let mut rt = rt();
+
+ let out = rt.block_on(async move {
+ let inner = assert_ok!(tokio::task::spawn_blocking(|| {
+ tokio::spawn(async move { "hello" })
+ }).await);
+
+ assert_ok!(inner.await)
+ });
+
+ assert_eq!(out, "hello")
+ }
+
+ #[test]
+ fn delay_from_blocking() {
+ let mut rt = rt();
+
+ rt.block_on(async move {
+ assert_ok!(tokio::task::spawn_blocking(|| {
+ let now = std::time::Instant::now();
+ let dur = Duration::from_millis(1);
+
+ // use the futures' block_on fn to make sure we aren't setting
+ // any Tokio context
+ futures::executor::block_on(async {
+ tokio::time::delay_for(dur).await;
+ });
+
+ assert!(now.elapsed() >= dur);
+ }).await);
+ });
+ }
+
+ #[test]
+ fn socket_from_blocking() {
+ let mut rt = rt();
+
+ rt.block_on(async move {
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+
+ let peer = tokio::task::spawn_blocking(move || {
+ // use the futures' block_on fn to make sure we aren't setting
+ // any Tokio context
+ futures::executor::block_on(async {
+ assert_ok!(TcpStream::connect(addr).await);
+ });
+ });
+
+ // Wait for the client to connect
+ let _ = assert_ok!(listener.accept().await);
+
+ assert_ok!(peer.await);
+ });
+ }
+
+ #[test]
+ fn io_driver_called_when_under_load() {
+ let mut rt = rt();
+
+ // Create a lot of constant load. The scheduler will always be busy.
+ for _ in 0..100 {
+ rt.spawn(async {
+ loop {
+ tokio::task::yield_now().await;
+ }
+ });
+ }
+
+ // Do some I/O work
+ rt.block_on(async {
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+
+ let srv = tokio::spawn(async move {
+ let (mut stream, _) = assert_ok!(listener.accept().await);
+ assert_ok!(stream.write_all(b"hello world").await);
+ });
+
+ let cli = tokio::spawn(async move {
+ let mut stream = assert_ok!(TcpStream::connect(addr).await);
+ let mut dst = vec![0; 11];
+
+ assert_ok!(stream.read_exact(&mut dst).await);
+ assert_eq!(dst, b"hello world");
+ });
+
+ assert_ok!(srv.await);
+ assert_ok!(cli.await);
+ });
+ }
+
+ #[test]
fn client_server_block_on() {
let mut rt = rt();
let (tx, rx) = mpsc::channel();
@@ -371,12 +470,11 @@ rt_test! {
}
#[test]
- #[ignore]
fn panic_in_task() {
- let rt = rt();
- let (tx, rx) = mpsc::channel();
+ let mut rt = rt();
+ let (tx, rx) = oneshot::channel();
- struct Boom(mpsc::Sender<()>);
+ struct Boom(Option<oneshot::Sender<()>>);
impl Future for Boom {
type Output = ();
@@ -389,12 +487,12 @@ rt_test! {
impl Drop for Boom {
fn drop(&mut self) {
assert!(::std::thread::panicking());
- self.0.send(()).unwrap();
+ self.0.take().unwrap().send(()).unwrap();
}
}
- rt.spawn(Boom(tx));
- rx.recv().unwrap();
+ rt.spawn(Boom(Some(tx)));
+ assert_ok!(rt.block_on(rx));
}
#[test]
@@ -428,6 +526,48 @@ rt_test! {
assert_ok!(rt.block_on(handle));
}
+ #[test]
+ fn eagerly_drops_futures_on_shutdown() {
+ use std::sync::mpsc;
+
+ struct Never {
+ drop_tx: mpsc::Sender<()>,
+ }
+
+ impl Future for Never {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.drop_tx.send(()).unwrap();
+ }
+ }
+
+ let mut rt = rt();
+
+ let (drop_tx, drop_rx) = mpsc::channel();
+ let (run_tx, run_rx) = oneshot::channel();
+
+ rt.block_on(async move {
+ tokio::spawn(async move {
+ assert_ok!(run_tx.send(()));
+
+ Never { drop_tx }.await
+ });
+
+ assert_ok!(run_rx.await);
+ });
+
+ drop(rt);
+
+ assert_ok!(drop_rx.recv());
+ }
+
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index 8be6d036..4b4c880e 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -18,6 +18,7 @@ fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new()
.threaded_scheduler()
+ .enable_all()
.num_threads(1)
.build();
}
@@ -189,10 +190,11 @@ fn drop_threadpool_drops_futures() {
let rt = runtime::Builder::new()
.threaded_scheduler()
- .after_start(move || {
+ .enable_all()
+ .on_thread_start(move || {
a.fetch_add(1, Relaxed);
})
- .before_stop(move || {
+ .on_thread_stop(move || {
b.fetch_add(1, Relaxed);
})
.build()
@@ -218,7 +220,7 @@ fn drop_threadpool_drops_futures() {
}
#[test]
-fn after_start_and_before_stop_is_called() {
+fn start_stop_callbacks_called() {
use std::sync::atomic::{AtomicUsize, Ordering};
let after_start = Arc::new(AtomicUsize::new(0));
@@ -228,10 +230,11 @@ fn after_start_and_before_stop_is_called() {
let before_inner = before_stop.clone();
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
- .after_start(move || {
+ .enable_all()
+ .on_thread_start(move || {
after_inner.clone().fetch_add(1, Ordering::Relaxed);
})
- .before_stop(move || {
+ .on_thread_stop(move || {
before_inner.clone().fetch_add(1, Ordering::Relaxed);
})
.build()
diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs
index 0cb7d482..fc790c28 100644
--- a/tokio/tests/signal_drop_rt.rs
+++ b/tokio/tests/signal_drop_rt.rs
@@ -38,6 +38,7 @@ fn dropping_loops_does_not_cause_starvation() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
+ .enable_all()
.build()
.unwrap()
}
diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs
index 8020c593..e34e7c09 100644
--- a/tokio/tests/signal_multi_rt.rs
+++ b/tokio/tests/signal_multi_rt.rs
@@ -48,6 +48,7 @@ fn multi_loop() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
+ .enable_all()
.build()
.unwrap()
}
diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs
index 235d1960..7de9c345 100644
--- a/tokio/tests/time_rt.rs
+++ b/tokio/tests/time_rt.rs
@@ -27,7 +27,11 @@ fn timer_with_threaded_runtime() {
fn timer_with_basic_scheduler() {
use tokio::runtime::Builder;
- let mut rt = Builder::new().basic_scheduler().build().unwrap();
+ let mut rt = Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap();
let (tx, rx) = mpsc::channel();
rt.block_on(async move {