summaryrefslogtreecommitdiffstats
path: root/tokio-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-executor/src')
-rw-r--r--tokio-executor/src/enter.rs24
-rw-r--r--tokio-executor/src/executor.rs15
-rw-r--r--tokio-executor/src/global.rs33
-rw-r--r--tokio-executor/src/park.rs43
4 files changed, 77 insertions, 38 deletions
diff --git a/tokio-executor/src/enter.rs b/tokio-executor/src/enter.rs
index 0ef6ddcb..02b35420 100644
--- a/tokio-executor/src/enter.rs
+++ b/tokio-executor/src/enter.rs
@@ -1,9 +1,8 @@
-use futures::{self, Future};
use std::cell::{Cell, RefCell};
use std::error::Error;
use std::fmt;
+use std::future::Future;
use std::marker::PhantomData;
-use std::prelude::v1::*;
thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
@@ -65,8 +64,25 @@ pub fn enter() -> Result<Enter, EnterError> {
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
- pub fn block_on<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
- futures::executor::spawn(f).wait_future()
+ pub fn block_on<F: Future>(&mut self, mut f: F) -> F::Output {
+ use crate::park::{Park, ParkThread};
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll::Ready;
+
+ let park = ParkThread::new();
+ let waker = park.unpark().into_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
+ // no longer be accessed, making the pinning safe.
+ let mut f = unsafe { Pin::new_unchecked(&mut f) };
+
+ loop {
+ if let Ready(v) = f.as_mut().poll(&mut cx) {
+ return v;
+ }
+ }
}
}
diff --git a/tokio-executor/src/executor.rs b/tokio-executor/src/executor.rs
index c0b156f0..45a65ca1 100644
--- a/tokio-executor/src/executor.rs
+++ b/tokio-executor/src/executor.rs
@@ -1,5 +1,6 @@
use crate::SpawnError;
-use futures::Future;
+use std::future::Future;
+use std::pin::Pin;
/// A value that executes futures.
///
@@ -82,16 +83,14 @@ pub trait Executor {
/// use futures::future::lazy;
///
/// # fn docs(my_executor: &mut dyn Executor) {
- /// my_executor.spawn(Box::new(lazy(|| {
+ /// my_executor.spawn(Box::pin(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// # }
/// ```
- fn spawn(
- &mut self,
- future: Box<dyn Future<Item = (), Error = ()> + Send>,
- ) -> Result<(), SpawnError>;
+ fn spawn(&mut self, future: Pin<Box<dyn Future<Output = ()> + Send>>)
+ -> Result<(), SpawnError>;
/// Provides a best effort **hint** to whether or not `spawn` will succeed.
///
@@ -116,7 +115,7 @@ pub trait Executor {
///
/// # fn docs(my_executor: &mut dyn Executor) {
/// if my_executor.status().is_ok() {
- /// my_executor.spawn(Box::new(lazy(|| {
+ /// my_executor.spawn(Box::pin(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
@@ -133,7 +132,7 @@ pub trait Executor {
impl<E: Executor + ?Sized> Executor for Box<E> {
fn spawn(
&mut self,
- future: Box<dyn Future<Item = (), Error = ()> + Send>,
+ future: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), SpawnError> {
(**self).spawn(future)
}
diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs
index 1c4c53ff..f2745123 100644
--- a/tokio-executor/src/global.rs
+++ b/tokio-executor/src/global.rs
@@ -1,6 +1,7 @@
use super::{Enter, Executor, SpawnError};
-use futures::{future, Future};
use std::cell::Cell;
+use std::future::Future;
+use std::pin::Pin;
/// Executes futures on the default executor for the current execution context.
///
@@ -70,7 +71,7 @@ thread_local! {
impl super::Executor for DefaultExecutor {
fn spawn(
&mut self,
- future: Box<dyn Future<Item = (), Error = ()> + Send>,
+ future: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), SpawnError> {
DefaultExecutor::with_current(|executor| executor.spawn(future))
.unwrap_or_else(|| Err(SpawnError::shutdown()))
@@ -84,10 +85,10 @@ impl super::Executor for DefaultExecutor {
impl<T> super::TypedExecutor<T> for DefaultExecutor
where
- T: Future<Item = (), Error = ()> + Send + 'static,
+ T: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
- super::Executor::spawn(self, Box::new(future))
+ super::Executor::spawn(self, Box::pin(future))
}
fn status(&self) -> Result<(), SpawnError> {
@@ -95,26 +96,6 @@ where
}
}
-impl<T> future::Executor<T> for DefaultExecutor
-where
- T: Future<Item = (), Error = ()> + Send + 'static,
-{
- fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
- if let Err(e) = super::Executor::status(self) {
- let kind = if e.is_at_capacity() {
- future::ExecuteErrorKind::NoCapacity
- } else {
- future::ExecuteErrorKind::Shutdown
- };
-
- return Err(future::ExecuteError::new(kind, future));
- }
-
- let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
- Ok(())
- }
-}
-
// ===== global spawn fns =====
/// Submits a future for execution on the default executor -- usually a
@@ -153,9 +134,9 @@ where
/// ```
pub fn spawn<T>(future: T)
where
- T: Future<Item = (), Error = ()> + Send + 'static,
+ T: Future<Output = ()> + Send + 'static,
{
- DefaultExecutor::current().spawn(Box::new(future)).unwrap()
+ DefaultExecutor::current().spawn(Box::pin(future)).unwrap()
}
/// Set the default executor for the duration of the closure
diff --git a/tokio-executor/src/park.rs b/tokio-executor/src/park.rs
index b7c4dbf0..35cd846b 100644
--- a/tokio-executor/src/park.rs
+++ b/tokio-executor/src/park.rs
@@ -46,8 +46,10 @@
use crossbeam_utils::sync::{Parker, Unparker};
use std::marker::PhantomData;
+use std::mem;
use std::rc::Rc;
use std::sync::Arc;
+use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
/// Block the current thread.
@@ -223,3 +225,44 @@ impl Unpark for UnparkThread {
self.inner.unpark();
}
}
+
+static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
+
+impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
+ }
+ }
+}
+
+unsafe fn unparker_to_raw_waker(unparker: Unparker) -> RawWaker {
+ RawWaker::new(Unparker::into_raw(unparker), &VTABLE)
+}
+
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Unparker::from_raw(raw);
+
+ // Increment the ref count
+ mem::forget(unparker.clone());
+
+ unparker_to_raw_waker(unparker)
+}
+
+unsafe fn wake(raw: *const ()) {
+ let unparker = Unparker::from_raw(raw);
+ unparker.unpark();
+}
+
+unsafe fn wake_by_ref(raw: *const ()) {
+ let unparker = Unparker::from_raw(raw);
+ unparker.unpark();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(unparker);
+}
+
+unsafe fn drop(raw: *const ()) {
+ let _ = Unparker::from_raw(raw);
+}