diff options
Diffstat (limited to 'tokio-executor/src')
-rw-r--r-- | tokio-executor/src/enter.rs | 24 | ||||
-rw-r--r-- | tokio-executor/src/executor.rs | 15 | ||||
-rw-r--r-- | tokio-executor/src/global.rs | 33 | ||||
-rw-r--r-- | tokio-executor/src/park.rs | 43 |
4 files changed, 77 insertions, 38 deletions
diff --git a/tokio-executor/src/enter.rs b/tokio-executor/src/enter.rs index 0ef6ddcb..02b35420 100644 --- a/tokio-executor/src/enter.rs +++ b/tokio-executor/src/enter.rs @@ -1,9 +1,8 @@ -use futures::{self, Future}; use std::cell::{Cell, RefCell}; use std::error::Error; use std::fmt; +use std::future::Future; use std::marker::PhantomData; -use std::prelude::v1::*; thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); @@ -65,8 +64,25 @@ pub fn enter() -> Result<Enter, EnterError> { impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. - pub fn block_on<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> { - futures::executor::spawn(f).wait_future() + pub fn block_on<F: Future>(&mut self, mut f: F) -> F::Output { + use crate::park::{Park, ParkThread}; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll::Ready; + + let park = ParkThread::new(); + let waker = park.unpark().into_waker(); + let mut cx = Context::from_waker(&waker); + + // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can + // no longer be accessed, making the pinning safe. + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + + loop { + if let Ready(v) = f.as_mut().poll(&mut cx) { + return v; + } + } } } diff --git a/tokio-executor/src/executor.rs b/tokio-executor/src/executor.rs index c0b156f0..45a65ca1 100644 --- a/tokio-executor/src/executor.rs +++ b/tokio-executor/src/executor.rs @@ -1,5 +1,6 @@ use crate::SpawnError; -use futures::Future; +use std::future::Future; +use std::pin::Pin; /// A value that executes futures. /// @@ -82,16 +83,14 @@ pub trait Executor { /// use futures::future::lazy; /// /// # fn docs(my_executor: &mut dyn Executor) { - /// my_executor.spawn(Box::new(lazy(|| { + /// my_executor.spawn(Box::pin(lazy(|| { /// println!("running on the executor"); /// Ok(()) /// }))).unwrap(); /// # } /// ``` - fn spawn( - &mut self, - future: Box<dyn Future<Item = (), Error = ()> + Send>, - ) -> Result<(), SpawnError>; + fn spawn(&mut self, future: Pin<Box<dyn Future<Output = ()> + Send>>) + -> Result<(), SpawnError>; /// Provides a best effort **hint** to whether or not `spawn` will succeed. /// @@ -116,7 +115,7 @@ pub trait Executor { /// /// # fn docs(my_executor: &mut dyn Executor) { /// if my_executor.status().is_ok() { - /// my_executor.spawn(Box::new(lazy(|| { + /// my_executor.spawn(Box::pin(lazy(|| { /// println!("running on the executor"); /// Ok(()) /// }))).unwrap(); @@ -133,7 +132,7 @@ pub trait Executor { impl<E: Executor + ?Sized> Executor for Box<E> { fn spawn( &mut self, - future: Box<dyn Future<Item = (), Error = ()> + Send>, + future: Pin<Box<dyn Future<Output = ()> + Send>>, ) -> Result<(), SpawnError> { (**self).spawn(future) } diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs index 1c4c53ff..f2745123 100644 --- a/tokio-executor/src/global.rs +++ b/tokio-executor/src/global.rs @@ -1,6 +1,7 @@ use super::{Enter, Executor, SpawnError}; -use futures::{future, Future}; use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; /// Executes futures on the default executor for the current execution context. /// @@ -70,7 +71,7 @@ thread_local! { impl super::Executor for DefaultExecutor { fn spawn( &mut self, - future: Box<dyn Future<Item = (), Error = ()> + Send>, + future: Pin<Box<dyn Future<Output = ()> + Send>>, ) -> Result<(), SpawnError> { DefaultExecutor::with_current(|executor| executor.spawn(future)) .unwrap_or_else(|| Err(SpawnError::shutdown())) @@ -84,10 +85,10 @@ impl super::Executor for DefaultExecutor { impl<T> super::TypedExecutor<T> for DefaultExecutor where - T: Future<Item = (), Error = ()> + Send + 'static, + T: Future<Output = ()> + Send + 'static, { fn spawn(&mut self, future: T) -> Result<(), SpawnError> { - super::Executor::spawn(self, Box::new(future)) + super::Executor::spawn(self, Box::pin(future)) } fn status(&self) -> Result<(), SpawnError> { @@ -95,26 +96,6 @@ where } } -impl<T> future::Executor<T> for DefaultExecutor -where - T: Future<Item = (), Error = ()> + Send + 'static, -{ - fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { - if let Err(e) = super::Executor::status(self) { - let kind = if e.is_at_capacity() { - future::ExecuteErrorKind::NoCapacity - } else { - future::ExecuteErrorKind::Shutdown - }; - - return Err(future::ExecuteError::new(kind, future)); - } - - let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future))); - Ok(()) - } -} - // ===== global spawn fns ===== /// Submits a future for execution on the default executor -- usually a @@ -153,9 +134,9 @@ where /// ``` pub fn spawn<T>(future: T) where - T: Future<Item = (), Error = ()> + Send + 'static, + T: Future<Output = ()> + Send + 'static, { - DefaultExecutor::current().spawn(Box::new(future)).unwrap() + DefaultExecutor::current().spawn(Box::pin(future)).unwrap() } /// Set the default executor for the duration of the closure diff --git a/tokio-executor/src/park.rs b/tokio-executor/src/park.rs index b7c4dbf0..35cd846b 100644 --- a/tokio-executor/src/park.rs +++ b/tokio-executor/src/park.rs @@ -46,8 +46,10 @@ use crossbeam_utils::sync::{Parker, Unparker}; use std::marker::PhantomData; +use std::mem; use std::rc::Rc; use std::sync::Arc; +use std::task::{RawWaker, RawWakerVTable, Waker}; use std::time::Duration; /// Block the current thread. @@ -223,3 +225,44 @@ impl Unpark for UnparkThread { self.inner.unpark(); } } + +static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); + +impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) + } + } +} + +unsafe fn unparker_to_raw_waker(unparker: Unparker) -> RawWaker { + RawWaker::new(Unparker::into_raw(unparker), &VTABLE) +} + +unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Unparker::from_raw(raw); + + // Increment the ref count + mem::forget(unparker.clone()); + + unparker_to_raw_waker(unparker) +} + +unsafe fn wake(raw: *const ()) { + let unparker = Unparker::from_raw(raw); + unparker.unpark(); +} + +unsafe fn wake_by_ref(raw: *const ()) { + let unparker = Unparker::from_raw(raw); + unparker.unpark(); + + // We don't actually own a reference to the unparker + mem::forget(unparker); +} + +unsafe fn drop(raw: *const ()) { + let _ = Unparker::from_raw(raw); +} |