diff options
author | jpbriquet <jpdigitbox@gmail.com> | 2018-06-12 19:26:03 +0200 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-06-12 10:26:03 -0700 |
commit | 2cd854c2c7dd918fe67106e79e7c8eadbe47f1c1 (patch) | |
tree | ccf95f0a925ea66a05103d3f9834f751dd36274a /tokio-current-thread | |
parent | ba05c39d65cd48b0263b37a32b96721fb8a47933 (diff) |
tokio-current-thread crate (#370)
Extract `tokio::executor::current_thread` to a tokio-current-thread
crate. Deprecated fns stay in the old location. The new crate only
contains thee most recent API.
Diffstat (limited to 'tokio-current-thread')
-rw-r--r-- | tokio-current-thread/CHANGELOG.md | 3 | ||||
-rw-r--r-- | tokio-current-thread/Cargo.toml | 22 | ||||
-rw-r--r-- | tokio-current-thread/LICENSE | 25 | ||||
-rw-r--r-- | tokio-current-thread/README.md | 19 | ||||
-rw-r--r-- | tokio-current-thread/src/lib.rs | 709 | ||||
-rw-r--r-- | tokio-current-thread/src/scheduler.rs | 772 | ||||
-rw-r--r-- | tokio-current-thread/tests/current_thread.rs | 622 |
7 files changed, 2172 insertions, 0 deletions
diff --git a/tokio-current-thread/CHANGELOG.md b/tokio-current-thread/CHANGELOG.md new file mode 100644 index 00000000..066575d4 --- /dev/null +++ b/tokio-current-thread/CHANGELOG.md @@ -0,0 +1,3 @@ +# Unreleased + +* Extract `tokio::executor::current_thread` to a tokio-current-thread crate (#356) diff --git a/tokio-current-thread/Cargo.toml b/tokio-current-thread/Cargo.toml new file mode 100644 index 00000000..ce227d74 --- /dev/null +++ b/tokio-current-thread/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "tokio-current-thread" + +# When releasing to crates.io: +# - Update html_root_url. +# - Update CHANGELOG.md. +# - Create "v0.1.x" git tag. +version = "0.1.0" +documentation = "https://docs.rs/tokio-current-thread" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://github.com/tokio-rs/tokio" +license = "MIT" +authors = ["Carl Lerche <me@carllerche.com>"] +description = """ +Single threaded executor which manage many tasks concurrently on the current thread. +""" +keywords = ["futures", "tokio"] +categories = ["concurrency", "asynchronous"] + +[dependencies] +tokio-executor = { version = "0.1.2", path = "../tokio-executor" } +futures = "0.1.19" diff --git a/tokio-current-thread/LICENSE b/tokio-current-thread/LICENSE new file mode 100644 index 00000000..38c1e27b --- /dev/null +++ b/tokio-current-thread/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tokio-current-thread/README.md b/tokio-current-thread/README.md new file mode 100644 index 00000000..eb60e7d8 --- /dev/null +++ b/tokio-current-thread/README.md @@ -0,0 +1,19 @@ +# tokio-current-thread + +Single threaded executor for Tokio. + +[Documentation](https://tokio-rs.github.io/tokio/tokio_current_thread/) + +## Overview + +This crate provides the single threaded executor which execute many tasks concurrently. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs new file mode 100644 index 00000000..81c74297 --- /dev/null +++ b/tokio-current-thread/src/lib.rs @@ -0,0 +1,709 @@ +//! A single-threaded executor which executes tasks on the same thread from which +//! they are spawned. +//! +//! +//! The crate provides: +//! +//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread. +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. +//! +//! +//! Application authors will not use this crate directly. Instead, they will use the +//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they +//! are building a custom task executor. +//! +//! For more details, see [executor module] documentation in the Tokio crate. +//! +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html + +#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.0")] +#![deny(warnings, missing_docs, missing_debug_implementations)] + +extern crate futures; +extern crate tokio_executor; + +mod scheduler; + +use self::scheduler::Scheduler; + +use tokio_executor::{Enter, SpawnError}; +use tokio_executor::park::{Park, Unpark, ParkThread}; + +use futures::{executor, Async, Future}; +use futures::future::{Executor, ExecuteError, ExecuteErrorKind}; + +use std::fmt; +use std::cell::Cell; +use std::rc::Rc; +use std::time::{Duration, Instant}; +use std::sync::mpsc; + +#[cfg(feature = "unstable-futures")] +use futures2; + +/// Executes tasks on the current thread +pub struct CurrentThread<P: Park = ParkThread> { + /// Execute futures and receive unpark notifications. + scheduler: Scheduler<P::Unpark>, + + /// Current number of futures being executed + num_futures: usize, + + /// Thread park handle + park: P, + + /// Handle for spawning new futures from other threads + spawn_handle: Handle, + + /// Receiver for futures spawned from other threads + spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>, +} + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData<Rc<()>>, +} + +/// Returned by the `turn` function. +#[derive(Debug)] +pub struct Turn { + polled: bool +} + +impl Turn { + /// `true` if any futures were polled at all and `false` otherwise. + pub fn has_polled(&self) -> bool { + self.polled + } +} + +/// A `CurrentThread` instance bound to a supplied execution context. +pub struct Entered<'a, P: Park + 'a> { + executor: &'a mut CurrentThread<P>, + enter: &'a mut Enter, +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + _p: (), +} + +/// Error returned by the `run_timeout` function. +#[derive(Debug)] +pub struct RunTimeoutError { + timeout: bool, +} + +/// Error returned by the `turn` function. +#[derive(Debug)] +pub struct TurnError { + _p: (), +} + +/// Error returned by the `block_on` function. +#[derive(Debug)] +pub struct BlockError<T> { + inner: Option<T>, +} + +/// This is mostly split out to make the borrow checker happy. +struct Borrow<'a, U: 'a> { + scheduler: &'a mut Scheduler<U>, + num_futures: &'a mut usize, +} + +trait SpawnLocal { + fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>); +} + +struct CurrentRunner { + spawn: Cell<Option<*mut SpawnLocal>>, +} + +/// Current thread's task runner. This is set in `TaskRunner::with` +thread_local!(static CURRENT: CurrentRunner = CurrentRunner { + spawn: Cell::new(None), +}); + +/// Run the executor bootstrapping the execution with the provided future. +/// +/// This creates a new [`CurrentThread`] executor, spawns the provided future, +/// and blocks the current thread until the provided future and **all** +/// subsequently spawned futures complete. In other words: +/// +/// * If the provided bootstrap future does **not** spawn any additional tasks, +/// `block_on_all` returns once `future` completes. +/// * If the provided bootstrap future **does** spawn additional tasks, then +/// `block_on_all` returns once **all** spawned futures complete. +/// +/// See [module level][mod] documentation for more details. +/// +/// [`CurrentThread`]: struct.CurrentThread.html +/// [mod]: index.html +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where F: Future, +{ + let mut current_thread = CurrentThread::new(); + + let ret = current_thread.block_on(future); + current_thread.run().unwrap(); + + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// Unlike [`tokio::spawn`], this function will always spawn on a +/// `CurrentThread` executor and is able to spawn futures that are not `Send`. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +/// +/// [`tokio::spawn`]: ../fn.spawn.html +pub fn spawn<F>(future: F) +where F: Future<Item = (), Error = ()> + 'static +{ + TaskExecutor::current() + .spawn_local(Box::new(future)) + .unwrap(); +} + +// ===== impl CurrentThread ===== + +impl CurrentThread<ParkThread> { + /// Create a new instance of `CurrentThread`. + pub fn new() -> Self { + CurrentThread::new_with_park(ParkThread::new()) + } +} + +impl<P: Park> CurrentThread<P> { + /// Create a new instance of `CurrentThread` backed by the given park + /// handle. + pub fn new_with_park(park: P) -> Self { + let unpark = park.unpark(); + + let (spawn_sender, spawn_receiver) = mpsc::channel(); + + let scheduler = Scheduler::new(unpark); + let notify = scheduler.notify(); + + CurrentThread { + scheduler: scheduler, + num_futures: 0, + park, + spawn_handle: Handle { sender: spawn_sender, notify: notify }, + spawn_receiver: spawn_receiver, + } + } + + /// Returns `true` if the executor is currently idle. + /// + /// An idle executor is defined by not currently having any spawned tasks. + pub fn is_idle(&self) -> bool { + self.num_futures == 0 + } + + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.borrow().spawn_local(Box::new(future)); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) + -> Result<F::Item, BlockError<F::Error>> + where F: Future + { + let mut enter = tokio_executor::enter().unwrap(); + self.enter(&mut enter).block_on(future) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + let mut enter = tokio_executor::enter().unwrap(); + self.enter(&mut enter).run() + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) + -> Result<(), RunTimeoutError> + { + let mut enter = tokio_executor::enter().unwrap(); + self.enter(&mut enter).run_timeout(duration) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) + -> Result<Turn, TurnError> + { + let mut enter = tokio_executor::enter().unwrap(); + self.enter(&mut enter).turn(duration) + } + + /// Bind `CurrentThread` instance with an execution context. + pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> { + Entered { + executor: self, + enter, + } + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.park + } + + fn borrow(&mut self) -> Borrow<P::Unpark> { + Borrow { + scheduler: &mut self.scheduler, + num_futures: &mut self.num_futures, + } + } + + /// Get a new handle to spawn futures on the executor + /// + /// Different to the executor itself, the handle can be sent to different + /// threads and can be used to spawn futures on the executor. + pub fn handle(&self) -> Handle { + self.spawn_handle.clone() + } +} + +impl tokio_executor::Executor for CurrentThread { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), SpawnError> + { + self.borrow().spawn_local(future); + Ok(()) + } + + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + panic!("Futures 0.2 integration is not available for current_thread"); + } +} + +impl<P: Park> fmt::Debug for CurrentThread<P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("CurrentThread") + .field("scheduler", &self.scheduler) + .field("num_futures", &self.num_futures) + .finish() + } +} + +// ===== impl Entered ===== + +impl<'a, P: Park> Entered<'a, P> { + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.executor.borrow().spawn_local(Box::new(future)); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) + -> Result<F::Item, BlockError<F::Error>> + where F: Future + { + let mut future = executor::spawn(future); + let notify = self.executor.scheduler.notify(); + + loop { + let res = self.executor.borrow().enter(self.enter, || { + future.poll_future_notify(¬ify, 0) + }); + + match res { + Ok(Async::Ready(e)) => return Ok(e), + Err(e) => return Err(BlockError { inner: Some(e) }), + Ok(Async::NotReady) => {} + } + + self.tick(); + + if let Err(_) = self.executor.park.park() { + return Err(BlockError { inner: None }); + } + } + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.run_timeout2(None) + .map_err(|_| RunError { _p: () }) + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) + -> Result<(), RunTimeoutError> + { + self.run_timeout2(Some(duration)) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) + -> Result<Turn, TurnError> + { + let res = if self.executor.scheduler.has_pending_futures() { + self.executor.park.park_timeout(Duration::from_millis(0)) + } else { + match duration { + Some(duration) => self.executor.park.park_timeout(duration), + None => self.executor.park.park(), + } + }; + + if res.is_err() { + return Err(TurnError { _p: () }); + } + + let polled = self.tick(); + + Ok(Turn { polled }) + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.executor.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.executor.park + } + + fn run_timeout2(&mut self, dur: Option<Duration>) + -> Result<(), RunTimeoutError> + { + if self.executor.is_idle() { + // Nothing to do + return Ok(()); + } + + let mut time = dur.map(|dur| (Instant::now() + dur, dur)); + + loop { + self.tick(); + + if self.executor.is_idle() { + return Ok(()); + } + + match time { + Some((until, rem)) => { + if let Err(_) = self.executor.park.park_timeout(rem) { + return Err(RunTimeoutError::new(false)); + } + + let now = Instant::now(); + + if now >= until { + return Err(RunTimeoutError::new(true)); + } + + time = Some((until, until - now)); + } + None => { + if let Err(_) = self.executor.park.park() { + return Err(RunTimeoutError::new(false)); + } + } + } + } + } + + /// Returns `true` if any futures were processed + fn tick(&mut self) -> bool { + // Spawn any futures that were spawned from other threads by manually + // looping over the receiver stream + + // FIXME: Slightly ugly but needed to make the borrow checker happy + let (mut borrow, spawn_receiver) = ( + Borrow { + scheduler: &mut self.executor.scheduler, + num_futures: &mut self.executor.num_futures, + }, + &mut self.executor.spawn_receiver, + ); + + while let Ok(future) = spawn_receiver.try_recv() { + borrow.spawn_local(future); + } + + // After any pending futures were scheduled, do the actual tick + borrow.scheduler.tick( + &mut *self.enter, + borrow.num_futures) + } +} + +impl<'a, P: Park> fmt::Debug for Entered<'a, P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Entered") + .field("executor", &self.executor) + .field("enter", &self.enter) + .finish() + } +} + +// ===== impl Handle ===== + +/// Handle to spawn a future on the corresponding `CurrentThread` instance +#[derive(Clone)] +pub struct Handle { + sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>, + notify: executor::NotifyHandle, +} + +// Manual implementation because the Sender does not implement Debug +impl fmt::Debug for Handle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Handle") + .finish() + } +} + +impl Handle { + /// Spawn a future onto the `CurrentThread` instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> + where F: Future<Item = (), Error = ()> + Send + 'static { + self.sender.send(Box::new(future)) + .expect("CurrentThread does not exist anymore"); + // use 0 for the id, CurrentThread does not make use of it + self.notify.notify(0); + + Ok(()) + } +} + +// ===== impl TaskExecutor ===== + +impl TaskExecutor { + /// Returns an executor that executes futures on the current thread. + /// + /// The user of `TaskExecutor` must ensure that when a future is submitted, + /// that it is done within the context of a call to `run`. + /// + /// For more details, see the [module level](index.html) documentation. + pub fn current() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } + } + + /// Spawn a future onto the current `CurrentThread` instance. + pub fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) + -> Result<(), SpawnError> + { + CURRENT.with(|current| { + match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(future) }; + Ok(()) + } + None => { + Err(SpawnError::shutdown()) + } + } + }) + } +} + +impl tokio_executor::Executor for TaskExecutor { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), SpawnError> + { + self.spawn_local(future) + } + + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + panic!("Futures 0.2 integration is not available for current_thread"); + } + + fn status(&self) -> Result<(), SpawnError> { + CURRENT.with(|current| { + if current.spawn.get().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + }) + } +} + +impl<F> Executor<F> for TaskExecutor +where F: Future<Item = (), Error = ()> + 'static +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + CURRENT.with(|current| { + match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(Box::new(future)) }; + Ok(()) + } + None => { + Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)) + } + } + }) + } +} + +// ===== impl Borrow ===== + +impl<'a, U: Unpark> Borrow<'a, U> { + fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R + where F: FnOnce() -> R, + { + CURRENT.with(|current| { + current.set_spawn(self, || { + f() + }) + }) + } +} + +impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { + fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) { + *self.num_futures += 1; + self.scheduler.schedule(future); + } +} + +// ===== impl CurrentRunner ===== + +impl CurrentRunner { + fn set_spawn<F, R>(&self, spawn: &mut SpawnLocal, f: F) -> R + where F: FnOnce() -> R + { + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.spawn.set(None); + } + } + + let _reset = Reset(self); + + let spawn = unsafe { hide_lt(spawn as *mut SpawnLocal) }; + self.spawn.set(Some(spawn)); + + f() + } +} + +unsafe fn hide_lt<'a>(p: *mut (SpawnLocal + 'a)) -> *mut (SpawnLocal + 'static) { + use std::mem; + mem::transmute(p) +} + +// ===== impl RunTimeoutError ===== + +impl RunTimeoutError { + fn new(timeout: bool) -> Self { + RunTimeoutError { timeout } + } + + /// Returns `true` if the error was caused by the operation timing out. + pub fn is_timeout(&self) -> bool { + self.timeout + } +} + +impl From<tokio_executor::EnterError> for RunTimeoutError { + fn from(_: tokio_executor::EnterError) -> Self { + RunTimeoutError::new(false) + } +} + +// ===== impl BlockError ===== + +impl<T> BlockError<T> { + /// Returns the error yielded by the future being blocked on + pub fn into_inner(self) -> Option<T> { + self.inner + } +} + +impl<T> From<tokio_executor::EnterError> for BlockError<T> { + fn from(_: tokio_executor::EnterError) -> Self { + BlockError { inner: None } + } +} diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs new file mode 100644 index 00000000..c66523bf --- /dev/null +++ b/tokio-current-thread/src/scheduler.rs @@ -0,0 +1,772 @@ +use super::Borrow; +use tokio_executor::Enter; +use tokio_executor::park::Unpark; + +use futures::{Future, Async}; +use futures::executor::{self, Spawn, UnsafeNotify, NotifyHandle}; + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; +use std::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize}; +use std::sync::{Arc, Weak}; +use std::usize; +use std::thread; +use std::marker::PhantomData; + +/// A generic task-aware scheduler. +/// +/// This is used both by `FuturesUnordered` and the current-thread executor. +pub struct Scheduler<U> { + inner: Arc<Inner<U>>, + nodes: List<U>, +} + +pub struct Notify<'a, U: 'a>(&'a Arc<Node<U>>); + +// A linked-list of nodes +struct List<U> { + len: usize, + head: *const Node<U>, + tail: *const Node<U>, +} + +// Scheduler is implemented using two linked lists. The first linked list tracks +// all items managed by a `Scheduler`. This list is stored on the `Scheduler` +// struct and is **not** thread safe. The second linked list is an +// implementation of the intrusive MPSC queue algorithm described by +// 1024cores.net and is stored on `Inner`. This linked list can push items to +// the back concurrently but only one consumer may pop from the front. To +// enforce this requirement, all popping will be performed via fns on +// `Scheduler` that take `&mut self`. +// +// When a item is submitted to the set a node is allocated and inserted in +// both linked lists. This means that all insertion operations **must** be +// originated from `Scheduler` with `&mut self` The next call to `tick` will +// (eventually) see this node and call `poll` on the item. +// +// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. +// However, `Arc` handles are sometimes cast to `*const Node` pointers. +// Specifically, when a node is stored in at least one of the two lists +// described above, this represents a logical `Arc` handle. This is how +// `Scheduler` maintains its reference to all nodes it manages. Each +// `NotifyHandle` instance is an `Arc<Node>` as well. +// +// When `Scheduler` drops, it clears the linked list of all nodes that it +// manages. When doing so, it must attempt to decrement the reference count (by +// dropping an Arc handle). However, it can **only** decrement the reference +// count if the node is not currently stored in the mpsc channel. If the node +// **is** "queued" in the mpsc channel, then the arc reference count cannot be +// decremented. Once the node is popped from the mpsc channel, then the final +// arc reference count can be decremented, thus freeing the node. + +struct Inner<U> { + // Thread unpark handle + unpark: U, + + // Tick number + tick_num: AtomicUsize, + + // Head/tail of the readiness queue + head_readiness: AtomicPtr<Node<U>>, + tail_readiness: UnsafeCell<*const Node<U>>, + + // Used as part of the MPSC queue algorithm + stub: Arc<Node<U>>, +} + +unsafe impl<U: Sync + Send> Send for Inner<U> {} +unsafe impl<U: Sync + Send> Sync for Inner<U> {} + +impl<U: Unpark> executor::Notify for Inner<U> { + fn notify(&self, _: usize) { + self.unpark.unpark(); + } +} + +struct Node<U> { + // The item + item: UnsafeCell<Option<Task>>, + + // The tick at which this node was notified + notified_at: AtomicUsize, + + // Next pointer for linked list tracking all active nodes + next_all: UnsafeCell<*const Node<U>>, + + // Previous node in linked list tracking all active nodes + prev_all: UnsafeCell<*const Node<U>>, + + // Next pointer in readiness queue + next_readiness: AtomicPtr<Node<U>>, + + // Whether or not this node is currently in the mpsc queue. + queued: AtomicBool, + + // Queue that we'll be enqueued to when notified + queue: Weak<Inner<U>>, +} + +/// Returned by `Inner::dequeue`, representing either a dequeue success (with +/// the dequeued node), an empty list, or an inconsistent state. +/// +/// The inconsistent state is described in more detail at [1024cores], but +/// roughly indicates that a node will be ready to dequeue sometime shortly in +/// the future and the caller should try again soon. +/// +/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +enum Dequeue<U> { + Data(*const Node<U>), + Empty, + Yield, + Inconsistent, +} + +/// Wraps a spawned boxed future +struct Task(Spawn<Box<Future<Item = (), Error = ()>>>); + +/// A task that is scheduled. `turn` must be called +pub struct Scheduled<'a, U: 'a> { + task: &'a mut Task, + notify: &'a Notify<'a, U>, + done: &'a mut bool, +} + +impl<U> Scheduler<U> +where U: Unpark, +{ + /// Constructs a new, empty `Scheduler` + /// + /// The returned `Scheduler` does not contain any items and, in this + /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`. + pub fn new(unpark: U) -> Self { + let stub = Arc::new(Node { + item: UnsafeCell::new(None), + notified_at: AtomicUsize::new(0), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node<U>; + let inner = Arc::new(Inner { + unpark, + tick_num: AtomicUsize::new(0), + head_readiness: AtomicPtr::new(stub_ptr as *mut _), + tail_readiness: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + Scheduler { + inner: inner, + nodes: List::new(), + } + } + + pub fn notify(&self) -> NotifyHandle { + self.inner.clone().into() + } + + pub fn schedule(&mut self, item: Box<Future<Item = (), Error = ()>>) { + // Get the current scheduler tick + let tick_num = self.inner.tick_num.load(SeqCst); + + let node = Arc::new(Node { + item: UnsafeCell::new(Some(Task::new(item))), + notified_at: AtomicUsize::new(tick_num), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Arc::downgrade(&self.inner), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.nodes.push_back(node); + + // We'll need to get the item "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // items are ready. To do that we unconditionally enqueue it for + // polling here. |