summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/shell.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/shell.rs')
-rw-r--r--tokio/src/runtime/shell.rs100
1 files changed, 85 insertions, 15 deletions
diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs
index 3d631239..486d4fa5 100644
--- a/tokio/src/runtime/shell.rs
+++ b/tokio/src/runtime/shell.rs
@@ -1,18 +1,21 @@
#![allow(clippy::redundant_clone)]
+use crate::future::poll_fn;
use crate::park::{Park, Unpark};
use crate::runtime::driver::Driver;
-use crate::runtime::enter;
+use crate::sync::Notify;
use crate::util::{waker_ref, Wake};
-use std::future::Future;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::Context;
-use std::task::Poll::Ready;
+use std::task::Poll::{Pending, Ready};
+use std::{future::Future, sync::PoisonError};
#[derive(Debug)]
pub(super) struct Shell {
- driver: Driver,
+ driver: Mutex<Option<Driver>>,
+
+ notify: Notify,
/// TODO: don't store this
unpark: Arc<Handle>,
@@ -25,28 +28,57 @@ impl Shell {
pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));
- Shell { driver, unpark }
+ Shell {
+ driver: Mutex::new(Some(driver)),
+ notify: Notify::new(),
+ unpark,
+ }
}
- pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
+ pub(super) fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
- let _e = enter(true);
+ let mut enter = crate::runtime::enter(true);
pin!(f);
- let waker = waker_ref(&self.unpark);
- let mut cx = Context::from_waker(&waker);
-
loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return v;
- }
+ if let Some(driver) = &mut self.take_driver() {
+ return driver.block_on(f);
+ } else {
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = f.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
- self.driver.park().unwrap();
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
}
}
+
+ fn take_driver(&self) -> Option<DriverGuard<'_>> {
+ let mut lock = self.driver.lock().unwrap();
+ let driver = lock.take()?;
+
+ Some(DriverGuard {
+ inner: Some(driver),
+ shell: &self,
+ })
+ }
}
impl Wake for Handle {
@@ -60,3 +92,41 @@ impl Wake for Handle {
arc_self.0.unpark();
}
}
+
+struct DriverGuard<'a> {
+ inner: Option<Driver>,
+ shell: &'a Shell,
+}
+
+impl DriverGuard<'_> {
+ fn block_on<F: Future>(&mut self, f: F) -> F::Output {
+ let driver = self.inner.as_mut().unwrap();
+
+ pin!(f);
+
+ let waker = waker_ref(&self.shell.unpark);
+ let mut cx = Context::from_waker(&waker);
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return v;
+ }
+
+ driver.park().unwrap();
+ }
+ }
+}
+
+impl Drop for DriverGuard<'_> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.take() {
+ self.shell
+ .driver
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ .replace(inner);
+
+ self.shell.notify.notify_one();
+ }
+ }
+}