summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2020-09-24 13:31:49 -0400
committerGitHub <noreply@github.com>2020-09-24 13:31:49 -0400
commit4dfbdbff7e260eb7f046a8dc91ec0c84ae7ab2e8 (patch)
treeb9d06a17d8450f26d8cad8b9068a1ef7a981fafb /tokio/src/runtime
parentc29f13b7a5cc68fa2dfe52991d8b7497b2497725 (diff)
rt: Allow concurrent `Shell:block_on` calls (#2868)
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/builder.rs3
-rw-r--r--tokio/src/runtime/mod.rs22
-rw-r--r--tokio/src/runtime/shell.rs100
3 files changed, 88 insertions, 37 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 99b34eb3..043371c7 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,4 +1,3 @@
-use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
@@ -377,7 +376,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
- kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
+ kind: Kind::Shell(Shell::new(driver)),
handle: Handle {
spawner,
io_handle: resources.io_handle,
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index e4a1cf08..a6a739be 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -243,7 +243,6 @@ cfg_rt_core! {
use crate::task::JoinHandle;
}
-use crate::loom::sync::Mutex;
use std::future::Future;
use std::time::Duration;
@@ -292,7 +291,7 @@ pub struct Runtime {
enum Kind {
/// Not able to execute concurrent tasks. This variant is mostly used to get
/// access to the driver handles.
- Shell(Mutex<Option<Shell>>),
+ Shell(Shell),
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
@@ -442,24 +441,7 @@ impl Runtime {
/// [handle]: fn@Handle::block_on
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.enter(|| match &self.kind {
- Kind::Shell(exec) => {
- // TODO(lucio): clean this up and move this impl into
- // `shell.rs`, this is hacky and bad but will work for
- // now.
- let exec_temp = {
- let mut lock = exec.lock().unwrap();
- lock.take()
- };
-
- if let Some(mut exec_temp) = exec_temp {
- let res = exec_temp.block_on(future);
- exec.lock().unwrap().replace(exec_temp);
- res
- } else {
- let mut enter = crate::runtime::enter(true);
- enter.block_on(future).unwrap()
- }
- }
+ Kind::Shell(exec) => exec.block_on(future),
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs
index 3d631239..486d4fa5 100644
--- a/tokio/src/runtime/shell.rs
+++ b/tokio/src/runtime/shell.rs
@@ -1,18 +1,21 @@
#![allow(clippy::redundant_clone)]
+use crate::future::poll_fn;
use crate::park::{Park, Unpark};
use crate::runtime::driver::Driver;
-use crate::runtime::enter;
+use crate::sync::Notify;
use crate::util::{waker_ref, Wake};
-use std::future::Future;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::Context;
-use std::task::Poll::Ready;
+use std::task::Poll::{Pending, Ready};
+use std::{future::Future, sync::PoisonError};
#[derive(Debug)]
pub(super) struct Shell {
- driver: Driver,
+ driver: Mutex<Option<Driver>>,
+
+ notify: Notify,
/// TODO: don't store this
unpark: Arc<Handle>,
@@ -25,28 +28,57 @@ impl Shell {
pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));
- Shell { driver, unpark }
+ Shell {
+ driver: Mutex::new(Some(driver)),
+ notify: Notify::new(),
+ unpark,
+ }
}
- pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
+ pub(super) fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
- let _e = enter(true);
+ let mut enter = crate::runtime::enter(true);
pin!(f);
- let waker = waker_ref(&self.unpark);
- let mut cx = Context::from_waker(&waker);
-
loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return v;
- }
+ if let Some(driver) = &mut self.take_driver() {
+ return driver.block_on(f);
+ } else {
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = f.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
- self.driver.park().unwrap();
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
}
}
+
+ fn take_driver(&self) -> Option<DriverGuard<'_>> {
+ let mut lock = self.driver.lock().unwrap();
+ let driver = lock.take()?;
+
+ Some(DriverGuard {
+ inner: Some(driver),
+ shell: &self,
+ })
+ }
}
impl Wake for Handle {
@@ -60,3 +92,41 @@ impl Wake for Handle {
arc_self.0.unpark();
}
}
+
+struct DriverGuard<'a> {
+ inner: Option<Driver>,
+ shell: &'a Shell,
+}
+
+impl DriverGuard<'_> {
+ fn block_on<F: Future>(&mut self, f: F) -> F::Output {
+ let driver = self.inner.as_mut().unwrap();
+
+ pin!(f);
+
+ let waker = waker_ref(&self.shell.unpark);
+ let mut cx = Context::from_waker(&waker);
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return v;
+ }
+
+ driver.park().unwrap();
+ }
+ }
+}
+
+impl Drop for DriverGuard<'_> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.take() {
+ self.shell
+ .driver
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ .replace(inner);
+
+ self.shell.notify.notify_one();
+ }
+ }
+}