diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-09-23 14:35:10 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 14:35:10 -0400 |
commit | f25f12d57638a2928b3f738b3b1392d8773e276e (patch) | |
tree | 04da3ba7022a42bf8d1d08a039fcc1fc2fc95313 /tokio/src | |
parent | 0f70530ee7cda68b68f2f8131b5866cfa937ee1f (diff) |
rt: Allow concurrent `block_on`'s with basic_scheduler (#2804)
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 192 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 23 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs | 7 | ||||
-rw-r--r-- | tokio/src/sync/notify.rs | 7 | ||||
-rw-r--r-- | tokio/src/util/mod.rs | 4 |
6 files changed, 176 insertions, 59 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 7bf8b445..0c0e95a6 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,22 +1,35 @@ +use crate::future::poll_fn; +use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::sync::Notify; use crate::util::linked_list::{Link, LinkedList}; -use crate::util::{waker_ref, Wake}; +use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; -use std::task::Poll::Ready; +use std::sync::{Arc, PoisonError}; +use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler<P> -where - P: Park, -{ +pub(crate) struct BasicScheduler<P: Park> { + /// Inner state guarded by a mutex that is shared + /// between all `block_on` calls. + inner: Mutex<Option<Inner<P>>>, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, + + /// Sendable task spawner + spawner: Spawner, +} + +/// The inner scheduler that owns the task queue and the main parker P. +struct Inner<P: Park> { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -59,7 +72,7 @@ struct Shared { unpark: Box<dyn Unpark>, } -/// Thread-local context +/// Thread-local context. struct Context { /// Shared scheduler state shared: Arc<Shared>, @@ -68,38 +81,43 @@ struct Context { tasks: RefCell<Tasks>, } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often ot check the remote queue first +/// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -// Tracks the current BasicScheduler +// Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl<P> BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> BasicScheduler<P> { pub(crate) fn new(park: P) -> BasicScheduler<P> { let unpark = Box::new(park.unpark()); - BasicScheduler { + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }; + + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - spawner: Spawner { - shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), - unpark: unpark as Box<dyn Unpark>, - }), - }, + spawner: spawner.clone(), tick: 0, park, + })); + + BasicScheduler { + inner, + notify: Notify::new(), + spawner, } } @@ -108,7 +126,6 @@ where } /// Spawns a future onto the thread pool - #[allow(dead_code)] pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, @@ -117,13 +134,57 @@ where self.spawner.spawn(future) } - pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output - where - F: Future, - { + pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(inner) = &mut self.take_inner() { + return inner.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } + } + + fn take_inner(&self) -> Option<InnerGuard<'_, P>> { + let inner = self.inner.lock().unwrap().take()?; + + Some(InnerGuard { + inner: Some(inner), + basic_scheduler: &self, + }) + } +} + +impl<P: Park> Inner<P> { + /// Block on the future provided and drive the runtime's driver. + fn block_on<F: Future>(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { - let _enter = runtime::enter(false); - let waker = waker_ref(&scheduler.spawner.shared); + let _enter = crate::runtime::enter(false); + let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); @@ -178,16 +239,16 @@ where /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where - F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + F: FnOnce(&mut Inner<P>, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option<Context>, - scheduler: &'a mut BasicScheduler<P>, + scheduler: &'a mut Inner<P>, } impl<P: Park> Drop for Guard<'_, P> { @@ -214,12 +275,23 @@ where CURRENT.set(context, || f(scheduler, context)) } -impl<P> Drop for BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> Drop for BasicScheduler<P> { fn drop(&mut self) { - enter(self, |scheduler, context| { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let mut inner = match self + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner) + .take() + { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), + }; + + enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { @@ -269,6 +341,10 @@ impl Spawner { fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { self.shared.queue.lock().unwrap().pop_front() } + + fn waker_ref(&self) -> WakerRef<'_> { + waker_ref(&self.shared) + } } impl fmt::Debug for Spawner { @@ -325,3 +401,43 @@ impl Wake for Shared { arc_self.unpark.unpark(); } } + +// ===== InnerGuard ===== + +/// Used to ensure we always place the Inner value +/// back into its slot in `BasicScheduler`, even if the +/// future panics. +struct InnerGuard<'a, P: Park> { + inner: Option<Inner<P>>, + basic_scheduler: &'a BasicScheduler<P>, +} + +impl<P: Park> InnerGuard<'_, P> { + fn block_on<F: Future>(&mut self, future: F) -> F::Output { + // The only time inner gets set to `None` is if we have dropped + // already so this unwrap is safe. + self.inner.as_mut().unwrap().block_on(future) + } +} + +impl<P: Park> Drop for InnerGuard<'_, P> { + fn drop(&mut self) { + if let Some(scheduler) = self.inner.take() { + // We can ignore the poison error here since we are + // just replacing the state. + let mut lock = self + .basic_scheduler + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner); + + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + lock.replace(scheduler); + + // Wake up other possible threads that could steal + // the dedicated parker P. + self.basic_scheduler.notify.notify_one() + } + } +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 4072b04e..99b34eb3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -496,7 +496,7 @@ cfg_rt_core! { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Basic(Mutex::new(Some(scheduler))), + kind: Kind::Basic(scheduler), handle: Handle { spawner, io_handle: resources.io_handle, diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 884c2b46..e4a1cf08 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -296,7 +296,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex<Option<BasicScheduler<driver::Driver>>>), + Basic(BasicScheduler<driver::Driver>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] @@ -401,7 +401,7 @@ impl Runtime { Kind::Shell(_) => panic!("task execution disabled"), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(_exec) => self.handle.spawner.spawn(future), + Kind::Basic(exec) => exec.spawn(future), } } @@ -461,24 +461,7 @@ impl Runtime { } } #[cfg(feature = "rt-core")] - Kind::Basic(exec) => { - // TODO(lucio): clean this up and move this impl into - // `basic_scheduler.rs`, this is hacky and bad but will work for - // now. - let exec_temp = { - let mut lock = exec.lock().unwrap(); - lock.take() - }; - - if let Some(mut exec_temp) = exec_temp { - let res = exec_temp.block_on(future); - exec.lock().unwrap().replace(exec_temp); - res - } else { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).unwrap() - } - } + Kind::Basic(exec) => exec.block_on(future), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.block_on(future), }) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 1a584383..2e674136 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -457,6 +457,13 @@ cfg_sync! { } cfg_not_sync! { + cfg_rt_core! { + mod notify; + pub(crate) use notify::Notify; + } +} + +cfg_not_sync! { cfg_atomic_waker_impl! { mod task; pub(crate) use task::AtomicWaker; diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 56bbc51b..d319e8aa 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -1,3 +1,10 @@ +// Allow `unreachable_pub` warnings when sync is not enabled +// due to the usage of `Notify` within the `rt-core` feature set. +// When this module is compiled with `sync` enabled we will warn on +// this lint. When `rt-core` is enabled we use `pub(crate)` which +// triggers this warning but it is safe to ignore in this case. +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] + use crate::loom::sync::atomic::AtomicU8; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c5439f48..278d6343 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -12,6 +12,10 @@ mod rand; mod wake; pub(crate) use wake::{waker_ref, Wake}; +cfg_rt_core! { + pub(crate) use wake::WakerRef; +} + cfg_rt_threaded! { pub(crate) use rand::FastRand; |