diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/tests/rt_common.rs | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r-- | tokio/tests/rt_common.rs | 160 |
1 files changed, 150 insertions, 10 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 33f779c7..b360158d 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -10,16 +10,21 @@ macro_rules! rt_test { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } } - mod thread_pool { + mod threaded_scheduler { $($t)* fn rt() -> Runtime { - Runtime::new().unwrap() + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() } } } @@ -341,7 +346,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = Runtime::new().unwrap(); + let mut rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -360,6 +365,100 @@ rt_test! { } #[test] + fn spawn_from_blocking() { + let mut rt = rt(); + + let out = rt.block_on(async move { + let inner = assert_ok!(tokio::task::spawn_blocking(|| { + tokio::spawn(async move { "hello" }) + }).await); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn delay_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + assert_ok!(tokio::task::spawn_blocking(|| { + let now = std::time::Instant::now(); + let dur = Duration::from_millis(1); + + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + tokio::time::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); + }).await); + }); + } + + #[test] + fn socket_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let peer = tokio::task::spawn_blocking(move || { + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + assert_ok!(TcpStream::connect(addr).await); + }); + }); + + // Wait for the client to connect + let _ = assert_ok!(listener.accept().await); + + assert_ok!(peer.await); + }); + } + + #[test] + fn io_driver_called_when_under_load() { + let mut rt = rt(); + + // Create a lot of constant load. The scheduler will always be busy. + for _ in 0..100 { + rt.spawn(async { + loop { + tokio::task::yield_now().await; + } + }); + } + + // Do some I/O work + rt.block_on(async { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let srv = tokio::spawn(async move { + let (mut stream, _) = assert_ok!(listener.accept().await); + assert_ok!(stream.write_all(b"hello world").await); + }); + + let cli = tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(addr).await); + let mut dst = vec![0; 11]; + + assert_ok!(stream.read_exact(&mut dst).await); + assert_eq!(dst, b"hello world"); + }); + + assert_ok!(srv.await); + assert_ok!(cli.await); + }); + } + + #[test] fn client_server_block_on() { let mut rt = rt(); let (tx, rx) = mpsc::channel(); @@ -371,12 +470,11 @@ rt_test! { } #[test] - #[ignore] fn panic_in_task() { - let rt = rt(); - let (tx, rx) = mpsc::channel(); + let mut rt = rt(); + let (tx, rx) = oneshot::channel(); - struct Boom(mpsc::Sender<()>); + struct Boom(Option<oneshot::Sender<()>>); impl Future for Boom { type Output = (); @@ -389,12 +487,12 @@ rt_test! { impl Drop for Boom { fn drop(&mut self) { assert!(::std::thread::panicking()); - self.0.send(()).unwrap(); + self.0.take().unwrap().send(()).unwrap(); } } - rt.spawn(Boom(tx)); - rx.recv().unwrap(); + rt.spawn(Boom(Some(tx))); + assert_ok!(rt.block_on(rx)); } #[test] @@ -428,6 +526,48 @@ rt_test! { assert_ok!(rt.block_on(handle)); } + #[test] + fn eagerly_drops_futures_on_shutdown() { + use std::sync::mpsc; + + struct Never { + drop_tx: mpsc::Sender<()>, + } + + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.drop_tx.send(()).unwrap(); + } + } + + let mut rt = rt(); + + let (drop_tx, drop_rx) = mpsc::channel(); + let (run_tx, run_rx) = oneshot::channel(); + + rt.block_on(async move { + tokio::spawn(async move { + assert_ok!(run_tx.send(())); + + Never { drop_tx }.await + }); + + assert_ok!(run_rx.await); + }); + + drop(rt); + + assert_ok!(drop_rx.recv()); + } + async fn client_server(tx: mpsc::Sender<()>) { let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |