diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-09-24 13:31:49 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-24 13:31:49 -0400 |
commit | 4dfbdbff7e260eb7f046a8dc91ec0c84ae7ab2e8 (patch) | |
tree | b9d06a17d8450f26d8cad8b9068a1ef7a981fafb | |
parent | c29f13b7a5cc68fa2dfe52991d8b7497b2497725 (diff) |
rt: Allow concurrent `Shell:block_on` calls (#2868)
-rw-r--r-- | tokio/src/runtime/builder.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 22 | ||||
-rw-r--r-- | tokio/src/runtime/shell.rs | 100 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/util/mod.rs | 1 |
5 files changed, 90 insertions, 44 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 99b34eb3..043371c7 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,4 +1,3 @@ -use crate::loom::sync::Mutex; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; @@ -377,7 +376,7 @@ impl Builder { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))), + kind: Kind::Shell(Shell::new(driver)), handle: Handle { spawner, io_handle: resources.io_handle, diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index e4a1cf08..a6a739be 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -243,7 +243,6 @@ cfg_rt_core! { use crate::task::JoinHandle; } -use crate::loom::sync::Mutex; use std::future::Future; use std::time::Duration; @@ -292,7 +291,7 @@ pub struct Runtime { enum Kind { /// Not able to execute concurrent tasks. This variant is mostly used to get /// access to the driver handles. - Shell(Mutex<Option<Shell>>), + Shell(Shell), /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] @@ -442,24 +441,7 @@ impl Runtime { /// [handle]: fn@Handle::block_on pub fn block_on<F: Future>(&self, future: F) -> F::Output { self.handle.enter(|| match &self.kind { - Kind::Shell(exec) => { - // TODO(lucio): clean this up and move this impl into - // `shell.rs`, this is hacky and bad but will work for - // now. - let exec_temp = { - let mut lock = exec.lock().unwrap(); - lock.take() - }; - - if let Some(mut exec_temp) = exec_temp { - let res = exec_temp.block_on(future); - exec.lock().unwrap().replace(exec_temp); - res - } else { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).unwrap() - } - } + Kind::Shell(exec) => exec.block_on(future), #[cfg(feature = "rt-core")] Kind::Basic(exec) => exec.block_on(future), #[cfg(feature = "rt-threaded")] diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index 3d631239..486d4fa5 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,18 +1,21 @@ #![allow(clippy::redundant_clone)] +use crate::future::poll_fn; use crate::park::{Park, Unpark}; use crate::runtime::driver::Driver; -use crate::runtime::enter; +use crate::sync::Notify; use crate::util::{waker_ref, Wake}; -use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Context; -use std::task::Poll::Ready; +use std::task::Poll::{Pending, Ready}; +use std::{future::Future, sync::PoisonError}; #[derive(Debug)] pub(super) struct Shell { - driver: Driver, + driver: Mutex<Option<Driver>>, + + notify: Notify, /// TODO: don't store this unpark: Arc<Handle>, @@ -25,28 +28,57 @@ impl Shell { pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); - Shell { driver, unpark } + Shell { + driver: Mutex::new(Some(driver)), + notify: Notify::new(), + unpark, + } } - pub(super) fn block_on<F>(&mut self, f: F) -> F::Output + pub(super) fn block_on<F>(&self, f: F) -> F::Output where F: Future, { - let _e = enter(true); + let mut enter = crate::runtime::enter(true); pin!(f); - let waker = waker_ref(&self.unpark); - let mut cx = Context::from_waker(&waker); - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return v; - } + if let Some(driver) = &mut self.take_driver() { + return driver.block_on(f); + } else { + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = f.as_mut().poll(cx) { + return Ready(Some(out)); + } - self.driver.park().unwrap(); + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } } } + + fn take_driver(&self) -> Option<DriverGuard<'_>> { + let mut lock = self.driver.lock().unwrap(); + let driver = lock.take()?; + + Some(DriverGuard { + inner: Some(driver), + shell: &self, + }) + } } impl Wake for Handle { @@ -60,3 +92,41 @@ impl Wake for Handle { arc_self.0.unpark(); } } + +struct DriverGuard<'a> { + inner: Option<Driver>, + shell: &'a Shell, +} + +impl DriverGuard<'_> { + fn block_on<F: Future>(&mut self, f: F) -> F::Output { + let driver = self.inner.as_mut().unwrap(); + + pin!(f); + + let waker = waker_ref(&self.shell.unpark); + let mut cx = Context::from_waker(&waker); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return v; + } + + driver.park().unwrap(); + } + } +} + +impl Drop for DriverGuard<'_> { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + self.shell + .driver + .lock() + .unwrap_or_else(PoisonError::into_inner) + .replace(inner); + + self.shell.notify.notify_one(); + } + } +} diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 2e674136..88cc4b84 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -457,13 +457,9 @@ cfg_sync! { } cfg_not_sync! { - cfg_rt_core! { - mod notify; - pub(crate) use notify::Notify; - } -} + mod notify; + pub(crate) use notify::Notify; -cfg_not_sync! { cfg_atomic_waker_impl! { mod task; pub(crate) use task::AtomicWaker; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index ad29c0a0..ffe90167 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,6 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))] pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] |