summaryrefslogtreecommitdiffstats
path: root/tokio/tests/rt_common.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-21 23:28:39 -0800
committerGitHub <noreply@github.com>2019-11-21 23:28:39 -0800
commit8546ff826db8dba1e39b4119ad909fb6cab2492a (patch)
tree0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/tests/rt_common.rs
parent6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff)
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself.
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r--tokio/tests/rt_common.rs160
1 files changed, 150 insertions, 10 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 33f779c7..b360158d 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -10,16 +10,21 @@ macro_rules! rt_test {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
+ .enable_all()
.build()
.unwrap()
}
}
- mod thread_pool {
+ mod threaded_scheduler {
$($t)*
fn rt() -> Runtime {
- Runtime::new().unwrap()
+ tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build()
+ .unwrap()
}
}
}
@@ -341,7 +346,7 @@ rt_test! {
#[test]
fn block_on_socket() {
- let mut rt = Runtime::new().unwrap();
+ let mut rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
@@ -360,6 +365,100 @@ rt_test! {
}
#[test]
+ fn spawn_from_blocking() {
+ let mut rt = rt();
+
+ let out = rt.block_on(async move {
+ let inner = assert_ok!(tokio::task::spawn_blocking(|| {
+ tokio::spawn(async move { "hello" })
+ }).await);
+
+ assert_ok!(inner.await)
+ });
+
+ assert_eq!(out, "hello")
+ }
+
+ #[test]
+ fn delay_from_blocking() {
+ let mut rt = rt();
+
+ rt.block_on(async move {
+ assert_ok!(tokio::task::spawn_blocking(|| {
+ let now = std::time::Instant::now();
+ let dur = Duration::from_millis(1);
+
+ // use the futures' block_on fn to make sure we aren't setting
+ // any Tokio context
+ futures::executor::block_on(async {
+ tokio::time::delay_for(dur).await;
+ });
+
+ assert!(now.elapsed() >= dur);
+ }).await);
+ });
+ }
+
+ #[test]
+ fn socket_from_blocking() {
+ let mut rt = rt();
+
+ rt.block_on(async move {
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+
+ let peer = tokio::task::spawn_blocking(move || {
+ // use the futures' block_on fn to make sure we aren't setting
+ // any Tokio context
+ futures::executor::block_on(async {
+ assert_ok!(TcpStream::connect(addr).await);
+ });
+ });
+
+ // Wait for the client to connect
+ let _ = assert_ok!(listener.accept().await);
+
+ assert_ok!(peer.await);
+ });
+ }
+
+ #[test]
+ fn io_driver_called_when_under_load() {
+ let mut rt = rt();
+
+ // Create a lot of constant load. The scheduler will always be busy.
+ for _ in 0..100 {
+ rt.spawn(async {
+ loop {
+ tokio::task::yield_now().await;
+ }
+ });
+ }
+
+ // Do some I/O work
+ rt.block_on(async {
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+
+ let srv = tokio::spawn(async move {
+ let (mut stream, _) = assert_ok!(listener.accept().await);
+ assert_ok!(stream.write_all(b"hello world").await);
+ });
+
+ let cli = tokio::spawn(async move {
+ let mut stream = assert_ok!(TcpStream::connect(addr).await);
+ let mut dst = vec![0; 11];
+
+ assert_ok!(stream.read_exact(&mut dst).await);
+ assert_eq!(dst, b"hello world");
+ });
+
+ assert_ok!(srv.await);
+ assert_ok!(cli.await);
+ });
+ }
+
+ #[test]
fn client_server_block_on() {
let mut rt = rt();
let (tx, rx) = mpsc::channel();
@@ -371,12 +470,11 @@ rt_test! {
}
#[test]
- #[ignore]
fn panic_in_task() {
- let rt = rt();
- let (tx, rx) = mpsc::channel();
+ let mut rt = rt();
+ let (tx, rx) = oneshot::channel();
- struct Boom(mpsc::Sender<()>);
+ struct Boom(Option<oneshot::Sender<()>>);
impl Future for Boom {
type Output = ();
@@ -389,12 +487,12 @@ rt_test! {
impl Drop for Boom {
fn drop(&mut self) {
assert!(::std::thread::panicking());
- self.0.send(()).unwrap();
+ self.0.take().unwrap().send(()).unwrap();
}
}
- rt.spawn(Boom(tx));
- rx.recv().unwrap();
+ rt.spawn(Boom(Some(tx)));
+ assert_ok!(rt.block_on(rx));
}
#[test]
@@ -428,6 +526,48 @@ rt_test! {
assert_ok!(rt.block_on(handle));
}
+ #[test]
+ fn eagerly_drops_futures_on_shutdown() {
+ use std::sync::mpsc;
+
+ struct Never {
+ drop_tx: mpsc::Sender<()>,
+ }
+
+ impl Future for Never {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.drop_tx.send(()).unwrap();
+ }
+ }
+
+ let mut rt = rt();
+
+ let (drop_tx, drop_rx) = mpsc::channel();
+ let (run_tx, run_rx) = oneshot::channel();
+
+ rt.block_on(async move {
+ tokio::spawn(async move {
+ assert_ok!(run_tx.send(()));
+
+ Never { drop_tx }.await
+ });
+
+ assert_ok!(run_rx.await);
+ });
+
+ drop(rt);
+
+ assert_ok!(drop_rx.recv());
+ }
+
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);