diff options
Diffstat (limited to 'tokio/src/runtime/shell.rs')
-rw-r--r-- | tokio/src/runtime/shell.rs | 100 |
1 files changed, 85 insertions, 15 deletions
diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index 3d631239..486d4fa5 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,18 +1,21 @@ #![allow(clippy::redundant_clone)] +use crate::future::poll_fn; use crate::park::{Park, Unpark}; use crate::runtime::driver::Driver; -use crate::runtime::enter; +use crate::sync::Notify; use crate::util::{waker_ref, Wake}; -use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Context; -use std::task::Poll::Ready; +use std::task::Poll::{Pending, Ready}; +use std::{future::Future, sync::PoisonError}; #[derive(Debug)] pub(super) struct Shell { - driver: Driver, + driver: Mutex<Option<Driver>>, + + notify: Notify, /// TODO: don't store this unpark: Arc<Handle>, @@ -25,28 +28,57 @@ impl Shell { pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); - Shell { driver, unpark } + Shell { + driver: Mutex::new(Some(driver)), + notify: Notify::new(), + unpark, + } } - pub(super) fn block_on<F>(&mut self, f: F) -> F::Output + pub(super) fn block_on<F>(&self, f: F) -> F::Output where F: Future, { - let _e = enter(true); + let mut enter = crate::runtime::enter(true); pin!(f); - let waker = waker_ref(&self.unpark); - let mut cx = Context::from_waker(&waker); - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return v; - } + if let Some(driver) = &mut self.take_driver() { + return driver.block_on(f); + } else { + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = f.as_mut().poll(cx) { + return Ready(Some(out)); + } - self.driver.park().unwrap(); + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } } } + + fn take_driver(&self) -> Option<DriverGuard<'_>> { + let mut lock = self.driver.lock().unwrap(); + let driver = lock.take()?; + + Some(DriverGuard { + inner: Some(driver), + shell: &self, + }) + } } impl Wake for Handle { @@ -60,3 +92,41 @@ impl Wake for Handle { arc_self.0.unpark(); } } + +struct DriverGuard<'a> { + inner: Option<Driver>, + shell: &'a Shell, +} + +impl DriverGuard<'_> { + fn block_on<F: Future>(&mut self, f: F) -> F::Output { + let driver = self.inner.as_mut().unwrap(); + + pin!(f); + + let waker = waker_ref(&self.shell.unpark); + let mut cx = Context::from_waker(&waker); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return v; + } + + driver.park().unwrap(); + } + } +} + +impl Drop for DriverGuard<'_> { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + self.shell + .driver + .lock() + .unwrap_or_else(PoisonError::into_inner) + .replace(inner); + + self.shell.notify.notify_one(); + } + } +} |