summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2020-09-23 14:35:10 -0400
committerGitHub <noreply@github.com>2020-09-23 14:35:10 -0400
commitf25f12d57638a2928b3f738b3b1392d8773e276e (patch)
tree04da3ba7022a42bf8d1d08a039fcc1fc2fc95313 /tokio/src/runtime
parent0f70530ee7cda68b68f2f8131b5866cfa937ee1f (diff)
rt: Allow concurrent `block_on`'s with basic_scheduler (#2804)
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs192
-rw-r--r--tokio/src/runtime/builder.rs2
-rw-r--r--tokio/src/runtime/mod.rs23
3 files changed, 158 insertions, 59 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 7bf8b445..0c0e95a6 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -1,22 +1,35 @@
+use crate::future::poll_fn;
+use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
-use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
+use crate::sync::Notify;
use crate::util::linked_list::{Link, LinkedList};
-use crate::util::{waker_ref, Wake};
+use crate::util::{waker_ref, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::sync::{Arc, Mutex};
-use std::task::Poll::Ready;
+use std::sync::{Arc, PoisonError};
+use std::task::Poll::{Pending, Ready};
use std::time::Duration;
/// Executes tasks on the current thread
-pub(crate) struct BasicScheduler<P>
-where
- P: Park,
-{
+pub(crate) struct BasicScheduler<P: Park> {
+ /// Inner state guarded by a mutex that is shared
+ /// between all `block_on` calls.
+ inner: Mutex<Option<Inner<P>>>,
+
+ /// Notifier for waking up other threads to steal the
+ /// parker.
+ notify: Notify,
+
+ /// Sendable task spawner
+ spawner: Spawner,
+}
+
+/// The inner scheduler that owns the task queue and the main parker P.
+struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
@@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}
-/// Thread-local context
+/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
@@ -68,38 +81,43 @@ struct Context {
tasks: RefCell<Tasks>,
}
-/// Initial queue capacity
+/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often ot check the remote queue first
+/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;
-// Tracks the current BasicScheduler
+// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);
-impl<P> BasicScheduler<P>
-where
- P: Park,
-{
+impl<P: Park> BasicScheduler<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());
- BasicScheduler {
+ let spawner = Spawner {
+ shared: Arc::new(Shared {
+ queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ unpark: unpark as Box<dyn Unpark>,
+ }),
+ };
+
+ let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
- spawner: Spawner {
- shared: Arc::new(Shared {
- queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
- unpark: unpark as Box<dyn Unpark>,
- }),
- },
+ spawner: spawner.clone(),
tick: 0,
park,
+ }));
+
+ BasicScheduler {
+ inner,
+ notify: Notify::new(),
+ spawner,
}
}
@@ -108,7 +126,6 @@ where
}
/// Spawns a future onto the thread pool
- #[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
@@ -117,13 +134,57 @@ where
self.spawner.spawn(future)
}
- pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
- where
- F: Future,
- {
+ pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
+ pin!(future);
+
+ // Attempt to steal the dedicated parker and block_on the future if we can there,
+ // othwerwise, lets select on a notification that the parker is available
+ // or the future is complete.
+ loop {
+ if let Some(inner) = &mut self.take_inner() {
+ return inner.block_on(future);
+ } else {
+ let mut enter = crate::runtime::enter(false);
+
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = future.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
+
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
+ }
+ }
+
+ fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
+ let inner = self.inner.lock().unwrap().take()?;
+
+ Some(InnerGuard {
+ inner: Some(inner),
+ basic_scheduler: &self,
+ })
+ }
+}
+
+impl<P: Park> Inner<P> {
+ /// Block on the future provided and drive the runtime's driver.
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
- let _enter = runtime::enter(false);
- let waker = waker_ref(&scheduler.spawner.shared);
+ let _enter = crate::runtime::enter(false);
+ let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
@@ -178,16 +239,16 @@ where
/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
-fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
+fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
- F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
+ F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
- scheduler: &'a mut BasicScheduler<P>,
+ scheduler: &'a mut Inner<P>,
}
impl<P: Park> Drop for Guard<'_, P> {
@@ -214,12 +275,23 @@ where
CURRENT.set(context, || f(scheduler, context))
}
-impl<P> Drop for BasicScheduler<P>
-where
- P: Park,
-{
+impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
- enter(self, |scheduler, context| {
+ // Avoid a double panic if we are currently panicking and
+ // the lock may be poisoned.
+
+ let mut inner = match self
+ .inner
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ .take()
+ {
+ Some(inner) => inner,
+ None if std::thread::panicking() => return,
+ None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
+ };
+
+ enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
@@ -269,6 +341,10 @@ impl Spawner {
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
}
+
+ fn waker_ref(&self) -> WakerRef<'_> {
+ waker_ref(&self.shared)
+ }
}
impl fmt::Debug for Spawner {
@@ -325,3 +401,43 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}
+
+// ===== InnerGuard =====
+
+/// Used to ensure we always place the Inner value
+/// back into its slot in `BasicScheduler`, even if the
+/// future panics.
+struct InnerGuard<'a, P: Park> {
+ inner: Option<Inner<P>>,
+ basic_scheduler: &'a BasicScheduler<P>,
+}
+
+impl<P: Park> InnerGuard<'_, P> {
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
+ // The only time inner gets set to `None` is if we have dropped
+ // already so this unwrap is safe.
+ self.inner.as_mut().unwrap().block_on(future)
+ }
+}
+
+impl<P: Park> Drop for InnerGuard<'_, P> {
+ fn drop(&mut self) {
+ if let Some(scheduler) = self.inner.take() {
+ // We can ignore the poison error here since we are
+ // just replacing the state.
+ let mut lock = self
+ .basic_scheduler
+ .inner
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner);
+
+ // Replace old scheduler back into the state to allow
+ // other threads to pick it up and drive it.
+ lock.replace(scheduler);
+
+ // Wake up other possible threads that could steal
+ // the dedicated parker P.
+ self.basic_scheduler.notify.notify_one()
+ }
+ }
+}
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 4072b04e..99b34eb3 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -496,7 +496,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
- kind: Kind::Basic(Mutex::new(Some(scheduler))),
+ kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index 884c2b46..e4a1cf08 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -296,7 +296,7 @@ enum Kind {
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
- Basic(Mutex<Option<BasicScheduler<driver::Driver>>>),
+ Basic(BasicScheduler<driver::Driver>),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
@@ -401,7 +401,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
- Kind::Basic(_exec) => self.handle.spawner.spawn(future),
+ Kind::Basic(exec) => exec.spawn(future),
}
}
@@ -461,24 +461,7 @@ impl Runtime {
}
}
#[cfg(feature = "rt-core")]
- Kind::Basic(exec) => {
- // TODO(lucio): clean this up and move this impl into
- // `basic_scheduler.rs`, this is hacky and bad but will work for
- // now.
- let exec_temp = {
- let mut lock = exec.lock().unwrap();
- lock.take()
- };
-
- if let Some(mut exec_temp) = exec_temp {
- let res = exec_temp.block_on(future);
- exec.lock().unwrap().replace(exec_temp);
- res
- } else {
- let mut enter = crate::runtime::enter(true);
- enter.block_on(future).unwrap()
- }
- }
+ Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})