summaryrefslogtreecommitdiffstats
path: root/tokio-current-thread
diff options
context:
space:
mode:
authorjpbriquet <jpdigitbox@gmail.com>2018-06-12 19:26:03 +0200
committerCarl Lerche <me@carllerche.com>2018-06-12 10:26:03 -0700
commit2cd854c2c7dd918fe67106e79e7c8eadbe47f1c1 (patch)
treeccf95f0a925ea66a05103d3f9834f751dd36274a /tokio-current-thread
parentba05c39d65cd48b0263b37a32b96721fb8a47933 (diff)
tokio-current-thread crate (#370)
Extract `tokio::executor::current_thread` to a tokio-current-thread crate. Deprecated fns stay in the old location. The new crate only contains thee most recent API.
Diffstat (limited to 'tokio-current-thread')
-rw-r--r--tokio-current-thread/CHANGELOG.md3
-rw-r--r--tokio-current-thread/Cargo.toml22
-rw-r--r--tokio-current-thread/LICENSE25
-rw-r--r--tokio-current-thread/README.md19
-rw-r--r--tokio-current-thread/src/lib.rs709
-rw-r--r--tokio-current-thread/src/scheduler.rs772
-rw-r--r--tokio-current-thread/tests/current_thread.rs622
7 files changed, 2172 insertions, 0 deletions
diff --git a/tokio-current-thread/CHANGELOG.md b/tokio-current-thread/CHANGELOG.md
new file mode 100644
index 00000000..066575d4
--- /dev/null
+++ b/tokio-current-thread/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Unreleased
+
+* Extract `tokio::executor::current_thread` to a tokio-current-thread crate (#356)
diff --git a/tokio-current-thread/Cargo.toml b/tokio-current-thread/Cargo.toml
new file mode 100644
index 00000000..ce227d74
--- /dev/null
+++ b/tokio-current-thread/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "tokio-current-thread"
+
+# When releasing to crates.io:
+# - Update html_root_url.
+# - Update CHANGELOG.md.
+# - Create "v0.1.x" git tag.
+version = "0.1.0"
+documentation = "https://docs.rs/tokio-current-thread"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://github.com/tokio-rs/tokio"
+license = "MIT"
+authors = ["Carl Lerche <me@carllerche.com>"]
+description = """
+Single threaded executor which manage many tasks concurrently on the current thread.
+"""
+keywords = ["futures", "tokio"]
+categories = ["concurrency", "asynchronous"]
+
+[dependencies]
+tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
+futures = "0.1.19"
diff --git a/tokio-current-thread/LICENSE b/tokio-current-thread/LICENSE
new file mode 100644
index 00000000..38c1e27b
--- /dev/null
+++ b/tokio-current-thread/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2018 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/tokio-current-thread/README.md b/tokio-current-thread/README.md
new file mode 100644
index 00000000..eb60e7d8
--- /dev/null
+++ b/tokio-current-thread/README.md
@@ -0,0 +1,19 @@
+# tokio-current-thread
+
+Single threaded executor for Tokio.
+
+[Documentation](https://tokio-rs.github.io/tokio/tokio_current_thread/)
+
+## Overview
+
+This crate provides the single threaded executor which execute many tasks concurrently.
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs
new file mode 100644
index 00000000..81c74297
--- /dev/null
+++ b/tokio-current-thread/src/lib.rs
@@ -0,0 +1,709 @@
+//! A single-threaded executor which executes tasks on the same thread from which
+//! they are spawned.
+//!
+//!
+//! The crate provides:
+//!
+//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread.
+//! The easiest way to start a new [`CurrentThread`] executor is to call
+//! [`block_on_all`] with an initial task to seed the executor.
+//! All tasks that are being managed by a [`CurrentThread`] executor are able to
+//! spawn additional tasks by calling [`spawn`].
+//!
+//!
+//! Application authors will not use this crate directly. Instead, they will use the
+//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they
+//! are building a custom task executor.
+//!
+//! For more details, see [executor module] documentation in the Tokio crate.
+//!
+//! [`CurrentThread`]: struct.CurrentThread.html
+//! [`spawn`]: fn.spawn.html
+//! [`block_on_all`]: fn.block_on_all.html
+//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html
+
+#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.0")]
+#![deny(warnings, missing_docs, missing_debug_implementations)]
+
+extern crate futures;
+extern crate tokio_executor;
+
+mod scheduler;
+
+use self::scheduler::Scheduler;
+
+use tokio_executor::{Enter, SpawnError};
+use tokio_executor::park::{Park, Unpark, ParkThread};
+
+use futures::{executor, Async, Future};
+use futures::future::{Executor, ExecuteError, ExecuteErrorKind};
+
+use std::fmt;
+use std::cell::Cell;
+use std::rc::Rc;
+use std::time::{Duration, Instant};
+use std::sync::mpsc;
+
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
+/// Executes tasks on the current thread
+pub struct CurrentThread<P: Park = ParkThread> {
+ /// Execute futures and receive unpark notifications.
+ scheduler: Scheduler<P::Unpark>,
+
+ /// Current number of futures being executed
+ num_futures: usize,
+
+ /// Thread park handle
+ park: P,
+
+ /// Handle for spawning new futures from other threads
+ spawn_handle: Handle,
+
+ /// Receiver for futures spawned from other threads
+ spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>,
+}
+
+/// Executes futures on the current thread.
+///
+/// All futures executed using this executor will be executed on the current
+/// thread. As such, `run` will wait for these futures to complete before
+/// returning.
+///
+/// For more details, see the [module level](index.html) documentation.
+#[derive(Debug, Clone)]
+pub struct TaskExecutor {
+ // Prevent the handle from moving across threads.
+ _p: ::std::marker::PhantomData<Rc<()>>,
+}
+
+/// Returned by the `turn` function.
+#[derive(Debug)]
+pub struct Turn {
+ polled: bool
+}
+
+impl Turn {
+ /// `true` if any futures were polled at all and `false` otherwise.
+ pub fn has_polled(&self) -> bool {
+ self.polled
+ }
+}
+
+/// A `CurrentThread` instance bound to a supplied execution context.
+pub struct Entered<'a, P: Park + 'a> {
+ executor: &'a mut CurrentThread<P>,
+ enter: &'a mut Enter,
+}
+
+/// Error returned by the `run` function.
+#[derive(Debug)]
+pub struct RunError {
+ _p: (),
+}
+
+/// Error returned by the `run_timeout` function.
+#[derive(Debug)]
+pub struct RunTimeoutError {
+ timeout: bool,
+}
+
+/// Error returned by the `turn` function.
+#[derive(Debug)]
+pub struct TurnError {
+ _p: (),
+}
+
+/// Error returned by the `block_on` function.
+#[derive(Debug)]
+pub struct BlockError<T> {
+ inner: Option<T>,
+}
+
+/// This is mostly split out to make the borrow checker happy.
+struct Borrow<'a, U: 'a> {
+ scheduler: &'a mut Scheduler<U>,
+ num_futures: &'a mut usize,
+}
+
+trait SpawnLocal {
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>);
+}
+
+struct CurrentRunner {
+ spawn: Cell<Option<*mut SpawnLocal>>,
+}
+
+/// Current thread's task runner. This is set in `TaskRunner::with`
+thread_local!(static CURRENT: CurrentRunner = CurrentRunner {
+ spawn: Cell::new(None),
+});
+
+/// Run the executor bootstrapping the execution with the provided future.
+///
+/// This creates a new [`CurrentThread`] executor, spawns the provided future,
+/// and blocks the current thread until the provided future and **all**
+/// subsequently spawned futures complete. In other words:
+///
+/// * If the provided bootstrap future does **not** spawn any additional tasks,
+/// `block_on_all` returns once `future` completes.
+/// * If the provided bootstrap future **does** spawn additional tasks, then
+/// `block_on_all` returns once **all** spawned futures complete.
+///
+/// See [module level][mod] documentation for more details.
+///
+/// [`CurrentThread`]: struct.CurrentThread.html
+/// [mod]: index.html
+pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
+where F: Future,
+{
+ let mut current_thread = CurrentThread::new();
+
+ let ret = current_thread.block_on(future);
+ current_thread.run().unwrap();
+
+ ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
+}
+
+/// Executes a future on the current thread.
+///
+/// The provided future must complete or be canceled before `run` will return.
+///
+/// Unlike [`tokio::spawn`], this function will always spawn on a
+/// `CurrentThread` executor and is able to spawn futures that are not `Send`.
+///
+/// # Panics
+///
+/// This function can only be invoked from the context of a `run` call; any
+/// other use will result in a panic.
+///
+/// [`tokio::spawn`]: ../fn.spawn.html
+pub fn spawn<F>(future: F)
+where F: Future<Item = (), Error = ()> + 'static
+{
+ TaskExecutor::current()
+ .spawn_local(Box::new(future))
+ .unwrap();
+}
+
+// ===== impl CurrentThread =====
+
+impl CurrentThread<ParkThread> {
+ /// Create a new instance of `CurrentThread`.
+ pub fn new() -> Self {
+ CurrentThread::new_with_park(ParkThread::new())
+ }
+}
+
+impl<P: Park> CurrentThread<P> {
+ /// Create a new instance of `CurrentThread` backed by the given park
+ /// handle.
+ pub fn new_with_park(park: P) -> Self {
+ let unpark = park.unpark();
+
+ let (spawn_sender, spawn_receiver) = mpsc::channel();
+
+ let scheduler = Scheduler::new(unpark);
+ let notify = scheduler.notify();
+
+ CurrentThread {
+ scheduler: scheduler,
+ num_futures: 0,
+ park,
+ spawn_handle: Handle { sender: spawn_sender, notify: notify },
+ spawn_receiver: spawn_receiver,
+ }
+ }
+
+ /// Returns `true` if the executor is currently idle.
+ ///
+ /// An idle executor is defined by not currently having any spawned tasks.
+ pub fn is_idle(&self) -> bool {
+ self.num_futures == 0
+ }
+
+ /// Spawn the future on the executor.
+ ///
+ /// This internally queues the future to be executed once `run` is called.
+ pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ where F: Future<Item = (), Error = ()> + 'static,
+ {
+ self.borrow().spawn_local(Box::new(future));
+ self
+ }
+
+ /// Synchronously waits for the provided `future` to complete.
+ ///
+ /// This function can be used to synchronously block the current thread
+ /// until the provided `future` has resolved either successfully or with an
+ /// error. The result of the future is then returned from this function
+ /// call.
+ ///
+ /// Note that this function will **also** execute any spawned futures on the
+ /// current thread, but will **not** block until these other spawned futures
+ /// have completed.
+ ///
+ /// The caller is responsible for ensuring that other spawned futures
+ /// complete execution.
+ pub fn block_on<F>(&mut self, future: F)
+ -> Result<F::Item, BlockError<F::Error>>
+ where F: Future
+ {
+ let mut enter = tokio_executor::enter().unwrap();
+ self.enter(&mut enter).block_on(future)
+ }
+
+ /// Run the executor to completion, blocking the thread until **all**
+ /// spawned futures have completed.
+ pub fn run(&mut self) -> Result<(), RunError> {
+ let mut enter = tokio_executor::enter().unwrap();
+ self.enter(&mut enter).run()
+ }
+
+ /// Run the executor to completion, blocking the thread until all
+ /// spawned futures have completed **or** `duration` time has elapsed.
+ pub fn run_timeout(&mut self, duration: Duration)
+ -> Result<(), RunTimeoutError>
+ {
+ let mut enter = tokio_executor::enter().unwrap();
+ self.enter(&mut enter).run_timeout(duration)
+ }
+
+ /// Perform a single iteration of the event loop.
+ ///
+ /// This function blocks the current thread even if the executor is idle.
+ pub fn turn(&mut self, duration: Option<Duration>)
+ -> Result<Turn, TurnError>
+ {
+ let mut enter = tokio_executor::enter().unwrap();
+ self.enter(&mut enter).turn(duration)
+ }
+
+ /// Bind `CurrentThread` instance with an execution context.
+ pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> {
+ Entered {
+ executor: self,
+ enter,
+ }
+ }
+
+ /// Returns a reference to the underlying `Park` instance.
+ pub fn get_park(&self) -> &P {
+ &self.park
+ }
+
+ /// Returns a mutable reference to the underlying `Park` instance.
+ pub fn get_park_mut(&mut self) -> &mut P {
+ &mut self.park
+ }
+
+ fn borrow(&mut self) -> Borrow<P::Unpark> {
+ Borrow {
+ scheduler: &mut self.scheduler,
+ num_futures: &mut self.num_futures,
+ }
+ }
+
+ /// Get a new handle to spawn futures on the executor
+ ///
+ /// Different to the executor itself, the handle can be sent to different
+ /// threads and can be used to spawn futures on the executor.
+ pub fn handle(&self) -> Handle {
+ self.spawn_handle.clone()
+ }
+}
+
+impl tokio_executor::Executor for CurrentThread {
+ fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
+ -> Result<(), SpawnError>
+ {
+ self.borrow().spawn_local(future);
+ Ok(())
+ }
+
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ panic!("Futures 0.2 integration is not available for current_thread");
+ }
+}
+
+impl<P: Park> fmt::Debug for CurrentThread<P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("CurrentThread")
+ .field("scheduler", &self.scheduler)
+ .field("num_futures", &self.num_futures)
+ .finish()
+ }
+}
+
+// ===== impl Entered =====
+
+impl<'a, P: Park> Entered<'a, P> {
+ /// Spawn the future on the executor.
+ ///
+ /// This internally queues the future to be executed once `run` is called.
+ pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ where F: Future<Item = (), Error = ()> + 'static,
+ {
+ self.executor.borrow().spawn_local(Box::new(future));
+ self
+ }
+
+ /// Synchronously waits for the provided `future` to complete.
+ ///
+ /// This function can be used to synchronously block the current thread
+ /// until the provided `future` has resolved either successfully or with an
+ /// error. The result of the future is then returned from this function
+ /// call.
+ ///
+ /// Note that this function will **also** execute any spawned futures on the
+ /// current thread, but will **not** block until these other spawned futures
+ /// have completed.
+ ///
+ /// The caller is responsible for ensuring that other spawned futures
+ /// complete execution.
+ pub fn block_on<F>(&mut self, future: F)
+ -> Result<F::Item, BlockError<F::Error>>
+ where F: Future
+ {
+ let mut future = executor::spawn(future);
+ let notify = self.executor.scheduler.notify();
+
+ loop {
+ let res = self.executor.borrow().enter(self.enter, || {
+ future.poll_future_notify(&notify, 0)
+ });
+
+ match res {
+ Ok(Async::Ready(e)) => return Ok(e),
+ Err(e) => return Err(BlockError { inner: Some(e) }),
+ Ok(Async::NotReady) => {}
+ }
+
+ self.tick();
+
+ if let Err(_) = self.executor.park.park() {
+ return Err(BlockError { inner: None });
+ }
+ }
+ }
+
+ /// Run the executor to completion, blocking the thread until **all**
+ /// spawned futures have completed.
+ pub fn run(&mut self) -> Result<(), RunError> {
+ self.run_timeout2(None)
+ .map_err(|_| RunError { _p: () })
+ }
+
+ /// Run the executor to completion, blocking the thread until all
+ /// spawned futures have completed **or** `duration` time has elapsed.
+ pub fn run_timeout(&mut self, duration: Duration)
+ -> Result<(), RunTimeoutError>
+ {
+ self.run_timeout2(Some(duration))
+ }
+
+ /// Perform a single iteration of the event loop.
+ ///
+ /// This function blocks the current thread even if the executor is idle.
+ pub fn turn(&mut self, duration: Option<Duration>)
+ -> Result<Turn, TurnError>
+ {
+ let res = if self.executor.scheduler.has_pending_futures() {
+ self.executor.park.park_timeout(Duration::from_millis(0))
+ } else {
+ match duration {
+ Some(duration) => self.executor.park.park_timeout(duration),
+ None => self.executor.park.park(),
+ }
+ };
+
+ if res.is_err() {
+ return Err(TurnError { _p: () });
+ }
+
+ let polled = self.tick();
+
+ Ok(Turn { polled })
+ }
+
+ /// Returns a reference to the underlying `Park` instance.
+ pub fn get_park(&self) -> &P {
+ &self.executor.park
+ }
+
+ /// Returns a mutable reference to the underlying `Park` instance.
+ pub fn get_park_mut(&mut self) -> &mut P {
+ &mut self.executor.park
+ }
+
+ fn run_timeout2(&mut self, dur: Option<Duration>)
+ -> Result<(), RunTimeoutError>
+ {
+ if self.executor.is_idle() {
+ // Nothing to do
+ return Ok(());
+ }
+
+ let mut time = dur.map(|dur| (Instant::now() + dur, dur));
+
+ loop {
+ self.tick();
+
+ if self.executor.is_idle() {
+ return Ok(());
+ }
+
+ match time {
+ Some((until, rem)) => {
+ if let Err(_) = self.executor.park.park_timeout(rem) {
+ return Err(RunTimeoutError::new(false));
+ }
+
+ let now = Instant::now();
+
+ if now >= until {
+ return Err(RunTimeoutError::new(true));
+ }
+
+ time = Some((until, until - now));
+ }
+ None => {
+ if let Err(_) = self.executor.park.park() {
+ return Err(RunTimeoutError::new(false));
+ }
+ }
+ }
+ }
+ }
+
+ /// Returns `true` if any futures were processed
+ fn tick(&mut self) -> bool {
+ // Spawn any futures that were spawned from other threads by manually
+ // looping over the receiver stream
+
+ // FIXME: Slightly ugly but needed to make the borrow checker happy
+ let (mut borrow, spawn_receiver) = (
+ Borrow {
+ scheduler: &mut self.executor.scheduler,
+ num_futures: &mut self.executor.num_futures,
+ },
+ &mut self.executor.spawn_receiver,
+ );
+
+ while let Ok(future) = spawn_receiver.try_recv() {
+ borrow.spawn_local(future);
+ }
+
+ // After any pending futures were scheduled, do the actual tick
+ borrow.scheduler.tick(
+ &mut *self.enter,
+ borrow.num_futures)
+ }
+}
+
+impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Entered")
+ .field("executor", &self.executor)
+ .field("enter", &self.enter)
+ .finish()
+ }
+}
+
+// ===== impl Handle =====
+
+/// Handle to spawn a future on the corresponding `CurrentThread` instance
+#[derive(Clone)]
+pub struct Handle {
+ sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
+ notify: executor::NotifyHandle,
+}
+
+// Manual implementation because the Sender does not implement Debug
+impl fmt::Debug for Handle {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Handle")
+ .finish()
+ }
+}
+
+impl Handle {
+ /// Spawn a future onto the `CurrentThread` instance corresponding to this handle
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
+ /// instance of the `Handle` does not exist anymore.
+ pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
+ where F: Future<Item = (), Error = ()> + Send + 'static {
+ self.sender.send(Box::new(future))
+ .expect("CurrentThread does not exist anymore");
+ // use 0 for the id, CurrentThread does not make use of it
+ self.notify.notify(0);
+
+ Ok(())
+ }
+}
+
+// ===== impl TaskExecutor =====
+
+impl TaskExecutor {
+ /// Returns an executor that executes futures on the current thread.
+ ///
+ /// The user of `TaskExecutor` must ensure that when a future is submitted,
+ /// that it is done within the context of a call to `run`.
+ ///
+ /// For more details, see the [module level](index.html) documentation.
+ pub fn current() -> TaskExecutor {
+ TaskExecutor {
+ _p: ::std::marker::PhantomData,
+ }
+ }
+
+ /// Spawn a future onto the current `CurrentThread` instance.
+ pub fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>)
+ -> Result<(), SpawnError>
+ {
+ CURRENT.with(|current| {
+ match current.spawn.get() {
+ Some(spawn) => {
+ unsafe { (*spawn).spawn_local(future) };
+ Ok(())
+ }
+ None => {
+ Err(SpawnError::shutdown())
+ }
+ }
+ })
+ }
+}
+
+impl tokio_executor::Executor for TaskExecutor {
+ fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
+ -> Result<(), SpawnError>
+ {
+ self.spawn_local(future)
+ }
+
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ panic!("Futures 0.2 integration is not available for current_thread");
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ CURRENT.with(|current| {
+ if current.spawn.get().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ })
+ }
+}
+
+impl<F> Executor<F> for TaskExecutor
+where F: Future<Item = (), Error = ()> + 'static
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ CURRENT.with(|current| {
+ match current.spawn.get() {
+ Some(spawn) => {
+ unsafe { (*spawn).spawn_local(Box::new(future)) };
+ Ok(())
+ }
+ None => {
+ Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future))
+ }
+ }
+ })
+ }
+}
+
+// ===== impl Borrow =====
+
+impl<'a, U: Unpark> Borrow<'a, U> {
+ fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R
+ where F: FnOnce() -> R,
+ {
+ CURRENT.with(|current| {
+ current.set_spawn(self, || {
+ f()
+ })
+ })
+ }
+}
+
+impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) {
+ *self.num_futures += 1;
+ self.scheduler.schedule(future);
+ }
+}
+
+// ===== impl CurrentRunner =====
+
+impl CurrentRunner {
+ fn set_spawn<F, R>(&self, spawn: &mut SpawnLocal, f: F) -> R
+ where F: FnOnce() -> R
+ {
+ struct Reset<'a>(&'a CurrentRunner);
+
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.0.spawn.set(None);
+ }
+ }
+
+ let _reset = Reset(self);
+
+ let spawn = unsafe { hide_lt(spawn as *mut SpawnLocal) };
+ self.spawn.set(Some(spawn));
+
+ f()
+ }
+}
+
+unsafe fn hide_lt<'a>(p: *mut (SpawnLocal + 'a)) -> *mut (SpawnLocal + 'static) {
+ use std::mem;
+ mem::transmute(p)
+}
+
+// ===== impl RunTimeoutError =====
+
+impl RunTimeoutError {
+ fn new(timeout: bool) -> Self {
+ RunTimeoutError { timeout }
+ }
+
+ /// Returns `true` if the error was caused by the operation timing out.
+ pub fn is_timeout(&self) -> bool {
+ self.timeout
+ }
+}
+
+impl From<tokio_executor::EnterError> for RunTimeoutError {
+ fn from(_: tokio_executor::EnterError) -> Self {
+ RunTimeoutError::new(false)
+ }
+}
+
+// ===== impl BlockError =====
+
+impl<T> BlockError<T> {
+ /// Returns the error yielded by the future being blocked on
+ pub fn into_inner(self) -> Option<T> {
+ self.inner
+ }
+}
+
+impl<T> From<tokio_executor::EnterError> for BlockError<T> {
+ fn from(_: tokio_executor::EnterError) -> Self {
+ BlockError { inner: None }
+ }
+}
diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs
new file mode 100644
index 00000000..c66523bf
--- /dev/null
+++ b/tokio-current-thread/src/scheduler.rs
@@ -0,0 +1,772 @@
+use super::Borrow;
+use tokio_executor::Enter;
+use tokio_executor::park::Unpark;
+
+use futures::{Future, Async};
+use futures::executor::{self, Spawn, UnsafeNotify, NotifyHandle};
+
+use std::cell::UnsafeCell;
+use std::fmt::{self, Debug};
+use std::mem;
+use std::ptr;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
+use std::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize};
+use std::sync::{Arc, Weak};
+use std::usize;
+use std::thread;
+use std::marker::PhantomData;
+
+/// A generic task-aware scheduler.
+///
+/// This is used both by `FuturesUnordered` and the current-thread executor.
+pub struct Scheduler<U> {
+ inner: Arc<Inner<U>>,
+ nodes: List<U>,
+}
+
+pub struct Notify<'a, U: 'a>(&'a Arc<Node<U>>);
+
+// A linked-list of nodes
+struct List<U> {
+ len: usize,
+ head: *const Node<U>,
+ tail: *const Node<U>,
+}
+
+// Scheduler is implemented using two linked lists. The first linked list tracks
+// all items managed by a `Scheduler`. This list is stored on the `Scheduler`
+// struct and is **not** thread safe. The second linked list is an
+// implementation of the intrusive MPSC queue algorithm described by
+// 1024cores.net and is stored on `Inner`. This linked list can push items to
+// the back concurrently but only one consumer may pop from the front. To
+// enforce this requirement, all popping will be performed via fns on
+// `Scheduler` that take `&mut self`.
+//
+// When a item is submitted to the set a node is allocated and inserted in
+// both linked lists. This means that all insertion operations **must** be
+// originated from `Scheduler` with `&mut self` The next call to `tick` will
+// (eventually) see this node and call `poll` on the item.
+//
+// Nodes are wrapped in `Arc` cells which manage the lifetime of the node.
+// However, `Arc` handles are sometimes cast to `*const Node` pointers.
+// Specifically, when a node is stored in at least one of the two lists
+// described above, this represents a logical `Arc` handle. This is how
+// `Scheduler` maintains its reference to all nodes it manages. Each
+// `NotifyHandle` instance is an `Arc<Node>` as well.
+//
+// When `Scheduler` drops, it clears the linked list of all nodes that it
+// manages. When doing so, it must attempt to decrement the reference count (by
+// dropping an Arc handle). However, it can **only** decrement the reference
+// count if the node is not currently stored in the mpsc channel. If the node
+// **is** "queued" in the mpsc channel, then the arc reference count cannot be
+// decremented. Once the node is popped from the mpsc channel, then the final
+// arc reference count can be decremented, thus freeing the node.
+
+struct Inner<U> {
+ // Thread unpark handle
+ unpark: U,
+
+ // Tick number
+ tick_num: AtomicUsize,
+
+ // Head/tail of the readiness queue
+ head_readiness: AtomicPtr<Node<U>>,
+ tail_readiness: UnsafeCell<*const Node<U>>,
+
+ // Used as part of the MPSC queue algorithm
+ stub: Arc<Node<U>>,
+}
+
+unsafe impl<U: Sync + Send> Send for Inner<U> {}
+unsafe impl<U: Sync + Send> Sync for Inner<U> {}
+
+impl<U: Unpark> executor::Notify for Inner<U> {
+ fn notify(&self, _: usize) {
+ self.unpark.unpark();
+ }
+}
+
+struct Node<U> {
+ // The item
+ item: UnsafeCell<Option<Task>>,
+
+ // The tick at which this node was notified
+ notified_at: AtomicUsize,
+
+ // Next pointer for linked list tracking all active nodes
+ next_all: UnsafeCell<*const Node<U>>,
+
+ // Previous node in linked list tracking all active nodes
+ prev_all: UnsafeCell<*const Node<U>>,
+
+ // Next pointer in readiness queue
+ next_readiness: AtomicPtr<Node<U>>,
+
+ // Whether or not this node is currently in the mpsc queue.
+ queued: AtomicBool,
+
+ // Queue that we'll be enqueued to when notified
+ queue: Weak<Inner<U>>,
+}
+
+/// Returned by `Inner::dequeue`, representing either a dequeue success (with
+/// the dequeued node), an empty list, or an inconsistent state.
+///
+/// The inconsistent state is described in more detail at [1024cores], but
+/// roughly indicates that a node will be ready to dequeue sometime shortly in
+/// the future and the caller should try again soon.
+///
+/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+enum Dequeue<U> {
+ Data(*const Node<U>),
+ Empty,
+ Yield,
+ Inconsistent,
+}
+
+/// Wraps a spawned boxed future
+struct Task(Spawn<Box<Future<Item = (), Error = ()>>>);
+
+/// A task that is scheduled. `turn` must be called
+pub struct Scheduled<'a, U: 'a> {
+ task: &'a mut Task,
+ notify: &'a Notify<'a, U>,
+ done: &'a mut bool,
+}
+
+impl<U> Scheduler<U>
+where U: Unpark,
+{
+ /// Constructs a new, empty `Scheduler`
+ ///
+ /// The returned `Scheduler` does not contain any items and, in this
+ /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`.
+ pub fn new(unpark: U) -> Self {
+ let stub = Arc::new(Node {
+ item: UnsafeCell::new(None),
+ notified_at: AtomicUsize::new(0),
+ next_all: UnsafeCell::new(ptr::null()),
+ prev_all: UnsafeCell::new(ptr::null()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Weak::new(),
+ });
+ let stub_ptr = &*stub as *const Node<U>;
+ let inner = Arc::new(Inner {
+ unpark,
+ tick_num: AtomicUsize::new(0),
+ head_readiness: AtomicPtr::new(stub_ptr as *mut _),
+ tail_readiness: UnsafeCell::new(stub_ptr),
+ stub: stub,
+ });
+
+ Scheduler {
+ inner: inner,
+ nodes: List::new(),
+ }
+ }
+
+ pub fn notify(&self) -> NotifyHandle {
+ self.inner.clone().into()
+ }
+
+ pub fn schedule(&mut self, item: Box<Future<Item = (), Error = ()>>) {
+ // Get the current scheduler tick
+ let tick_num = self.inner.tick_num.load(SeqCst);
+
+ let node = Arc::new(Node {
+ item: UnsafeCell::new(Some(Task::new(item))),
+ notified_at: AtomicUsize::new(tick_num),
+ next_all: UnsafeCell::new(ptr::null_mut()),
+ prev_all: UnsafeCell::new(ptr::null_mut()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Arc::downgrade(&self.inner),
+ });
+
+ // Right now our node has a strong reference count of 1. We transfer
+ // ownership of this reference count to our internal linked list
+ // and we'll reclaim ownership through the `unlink` function below.
+ let ptr = self.nodes.push_back(node);
+
+ // We'll need to get the item "into the system" to start tracking it,
+ // e.g. getting its unpark notifications going to us tracking which
+ // items are ready. To do that we unconditionally enqueue it for
+ // polling here.