summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-02-06 07:26:21 -0800
committerGitHub <noreply@github.com>2018-02-06 07:26:21 -0800
commitf0ea9d6f4c0a734ac4c235630f3d8cc51fb48f51 (patch)
tree011aae238269ce6ba1cf29013126e4e45fea4cd4
parent567887cc75170437f75f19f5966f2b32bf49ab72 (diff)
Switch back to futures from crates.io (#113)
Doing so requires copying the `current_thread` executor from GitHub into the repo.
-rw-r--r--Cargo.toml4
-rw-r--r--examples/chat-combinator.rs4
-rw-r--r--examples/chat.rs2
-rw-r--r--examples/compress.rs4
-rw-r--r--examples/connect.rs8
-rw-r--r--examples/echo-threads.rs6
-rw-r--r--examples/echo-udp.rs6
-rw-r--r--examples/echo.rs4
-rw-r--r--examples/hello.rs3
-rw-r--r--examples/hello_world.rs3
-rw-r--r--examples/proxy.rs5
-rw-r--r--examples/sink.rs4
-rw-r--r--examples/tinydb.rs4
-rw-r--r--examples/tinyhttp.rs2
-rw-r--r--examples/udp-codec.rs4
-rw-r--r--src/executor/current_thread.rs413
-rw-r--r--src/executor/mod.rs8
-rw-r--r--src/executor/scheduler.rs663
-rw-r--r--src/executor/sleep.rs169
-rw-r--r--src/lib.rs1
-rw-r--r--tests/buffered.rs3
-rw-r--r--tests/chain.rs3
-rw-r--r--tests/drop-core.rs8
-rw-r--r--tests/echo.rs3
-rw-r--r--tests/global.rs3
-rw-r--r--tests/limit.rs3
-rw-r--r--tests/line-frames.rs16
-rw-r--r--tests/pipe-hup.rs4
-rw-r--r--tests/stream-buffered.rs3
-rw-r--r--tests/tcp.rs7
-rw-r--r--tests/udp.rs13
31 files changed, 1313 insertions, 70 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 4c0293ea..d3720eb3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,7 +50,3 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
time = "0.1"
-
-[patch.crates-io]
-futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" }
-mio = { git = "https://github.com/carllerche/mio" }
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs
index 667f0e9a..76e689b9 100644
--- a/examples/chat-combinator.rs
+++ b/examples/chat-combinator.rs
@@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader};
use std::sync::{Arc, Mutex};
use futures::Future;
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
@@ -134,5 +134,5 @@ fn main() {
});
// execute server
- future::blocking(srv).wait().unwrap();
+ srv.wait().unwrap();
}
diff --git a/examples/chat.rs b/examples/chat.rs
index da8889fd..1b155427 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -33,10 +33,10 @@ extern crate tokio;
extern crate tokio_io;
extern crate bytes;
+use tokio::executor::current_thread;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead};
use futures::prelude::*;
-use futures::current_thread;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};
diff --git a/examples/compress.rs b/examples/compress.rs
index 501548ef..3098abf7 100644
--- a/examples/compress.rs
+++ b/examples/compress.rs
@@ -29,7 +29,7 @@ use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -62,7 +62,7 @@ fn main() {
Ok(())
});
- future::blocking(server).wait().unwrap();
+ server.wait().unwrap();
}
/// The main workhorse of this example. This'll compress all data read from
diff --git a/examples/connect.rs b/examples/connect.rs
index f0619fbd..a4160449 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -26,7 +26,7 @@ use std::net::SocketAddr;
use std::thread;
use futures::sync::mpsc;
-use futures::{future, Sink, Stream};
+use futures::{Future, Sink, Stream};
use futures_cpupool::CpuPool;
fn main() {
@@ -71,9 +71,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- future::blocking(stdout.for_each(|chunk| {
+ stdout.for_each(|chunk| {
out.write_all(&chunk)
- })).wait().unwrap();
+ }).wait().unwrap();
}
mod tcp {
@@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
Ok(n) => n,
};
buf.truncate(n);
- tx = match future::blocking(tx.send(buf)).wait() {
+ tx = match tx.send(buf).wait() {
Ok(tx) => tx,
Err(_) => break,
};
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
index e2525c80..6ce8b156 100644
--- a/examples/echo-threads.rs
+++ b/examples/echo-threads.rs
@@ -24,7 +24,7 @@ use std::net::SocketAddr;
use std::thread;
use futures::prelude::*;
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
@@ -61,7 +61,7 @@ fn main() {
next = (next + 1) % channels.len();
Ok(())
});
- future::blocking(srv).wait().unwrap();
+ srv.wait().unwrap();
}
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
@@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
Ok(())
});
- future::blocking(done).wait().unwrap();
+ done.wait().unwrap();
}
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index 2ce43bc0..f7e2bf09 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -18,7 +18,7 @@ extern crate tokio_io;
use std::{env, io};
use std::net::SocketAddr;
-use futures::{future, Future, Poll};
+use futures::{Future, Poll};
use tokio::net::UdpSocket;
struct Server {
@@ -58,9 +58,9 @@ fn main() {
// Next we'll create a future to spawn (the one we defined above) and then
// we'll block our current thread waiting on the result of the future
- future::blocking(Server {
+ Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
- }).wait().unwrap();
+ }.wait().unwrap();
}
diff --git a/examples/echo.rs b/examples/echo.rs
index 54a28ff7..558f3a68 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -26,7 +26,7 @@ use std::env;
use std::net::SocketAddr;
use futures::Future;
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
@@ -114,5 +114,5 @@ fn main() {
// And finally now that we've define what our server is, we run it! Here we
// just need to execute the future we've created and wait for it to complete
// using the standard methods in the `futures` crate.
- future::blocking(done).wait().unwrap();
+ done.wait().unwrap();
}
diff --git a/examples/hello.rs b/examples/hello.rs
index d9e46d17..5ceb431b 100644
--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -19,7 +19,6 @@ extern crate tokio_io;
use std::env;
use std::net::SocketAddr;
-use futures::future;
use futures::prelude::*;
use tokio::net::TcpListener;
@@ -41,5 +40,5 @@ fn main() {
Ok(())
});
- future::blocking(server).wait().unwrap();
+ server.wait().unwrap();
}
diff --git a/examples/hello_world.rs b/examples/hello_world.rs
index 5cac1259..fee06607 100644
--- a/examples/hello_world.rs
+++ b/examples/hello_world.rs
@@ -18,9 +18,10 @@ extern crate tokio;
extern crate tokio_io;
extern crate futures;
+use tokio::executor::current_thread;
use tokio::net::TcpListener;
use tokio_io::io;
-use futures::{current_thread, Future, Stream};
+use futures::{Future, Stream};
pub fn main() {
let addr = "127.0.0.1:6142".parse().unwrap();
diff --git a/examples/proxy.rs b/examples/proxy.rs
index f73dd30d..131fa41b 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -28,7 +28,7 @@ use std::io::{self, Read, Write};
use futures::stream::Stream;
use futures::{Future, Poll};
-use futures::future::{self, Executor};
+use futures::future::{Executor};
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -92,7 +92,8 @@ fn main() {
Ok(())
});
- future::blocking(done).wait().unwrap();
+
+ done.wait().unwrap();
}
// This is a custom type used to have a custom implementation of the
diff --git a/examples/sink.rs b/examples/sink.rs
index 3fa5f5ed..21456ada 100644
--- a/examples/sink.rs
+++ b/examples/sink.rs
@@ -26,7 +26,7 @@ use std::iter;
use std::net::SocketAddr;
use futures::Future;
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
@@ -46,7 +46,7 @@ fn main() {
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
- future::blocking(server).wait().unwrap();
+ server.wait().unwrap();
}
fn write(socket: TcpStream) -> IoFuture<()> {
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index de750404..0a68a314 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -51,7 +51,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::prelude::*;
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio_io::AsyncRead;
@@ -160,7 +160,7 @@ fn main() {
Ok(())
});
- future::blocking(done).wait().unwrap();
+ done.wait().unwrap();
}
impl Request {
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index b0106d63..2f982484 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -90,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
})).unwrap();
Ok(())
});
- future::blocking(done).wait().unwrap();
+ done.wait().unwrap();
}
/// "Server logic" is implemented in this function.
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 5c11e9f3..c874ebd7 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -15,7 +15,7 @@ use std::io;
use std::net::SocketAddr;
use futures::{Future, Stream, Sink};
-use futures::future::{self, Executor};
+use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec};
@@ -76,5 +76,5 @@ fn main() {
// Spawn the sender of pongs and then wait for our pinger to finish.
pool.execute(b.then(|_| Ok(()))).unwrap();
- drop(future::blocking(a).wait());
+ drop(a.wait());
}
diff --git a/src/executor/current_thread.rs b/src/executor/current_thread.rs
new file mode 100644
index 00000000..edc011ed
--- /dev/null
+++ b/src/executor/current_thread.rs
@@ -0,0 +1,413 @@
+//! Execute tasks on the current thread
+//!
+//! This module implements an executor that keeps futures on the same thread
+//! that they are submitted on. This allows it to execute futures that are
+//! not `Send`.
+//!
+//! Before being able to spawn futures with this module, an executor
+//! context must be setup by calling [`run`]. From within that context [`spawn`]
+//! may be called with the future to run in the background.
+//!
+//! ```
+//! # extern crate tokio;
+//! # extern crate futures;
+//! # use tokio::executor::current_thread;
+//! use futures::future::lazy;
+//!
+//! // Calling execute here results in a panic
+//! // current_thread::spawn(my_future);
+//!
+//! # pub fn main() {
+//! current_thread::run(|_| {
+//! // The execution context is setup, futures may be executed.
+//! current_thread::spawn(lazy(|| {
+//! println!("called from the current thread executor");
+//! Ok(())
+//! }));
+//! });
+//! # }
+//! ```
+//!
+//! # Execution model
+//!
+//! When an execution context is setup with `run` the current thread will block
+//! and all the futures managed by the executor are driven to completion.
+//! Whenever a future receives a notification, it is pushed to the end of a
+//! scheduled list. The executor will drain this list, advancing the state of
+//! each future.
+//!
+//! All futures managed by this module will remain on the current thread,
+//! as such, this module is able to safely execute futures that are not `Send`.
+//!
+//! Once a future is complete, it is dropped. Once all futures are completed,
+//! [`run`] will unblock and return.
+//!
+//! This module makes a best effort to fairly schedule futures that it manages.
+//!
+//! [`spawn`]: fn.spawn.html
+//! [`run`]: fn.run.html
+
+use super::{scheduler};
+use super::sleep::{self, Sleep, Wakeup};
+
+use futures::Async;
+use futures::executor::{self, Spawn};
+use futures::future::{Future, Executor, ExecuteError, ExecuteErrorKind};
+
+use std::{fmt, thread};
+use std::cell::Cell;
+use std::rc::Rc;
+
+/// Executes futures on the current thread.
+///
+/// All futures executed using this executor will be executed on the current
+/// thread. As such, `run` will wait for these futures to complete before
+/// returning.
+///
+/// For more details, see the [module level](index.html) documentation.
+#[derive(Debug, Clone)]
+pub struct TaskExecutor {
+ // Prevent the handle from moving across threads.
+ _p: ::std::marker::PhantomData<Rc<()>>,
+}
+
+/// A context yielded to the closure provided to `run`.
+///
+/// This context is mostly a future-proofing of the library to add future
+/// contextual information into it. Currently it only contains the `Enter`
+/// instance used to reserve the current thread for blocking on futures.
+#[derive(Debug)]
+pub struct Context<'a> {
+ cancel: &'a Cell<bool>,
+}
+
+/// Implements the "blocking" logic for the current thread executor. A
+/// `TaskRunner` will be created during `run` and will sit on the stack until
+/// execution is complete.
+#[derive(Debug)]
+struct TaskRunner<T> {
+ /// Executes futures.
+ scheduler: Scheduler<T>,
+}
+
+struct CurrentRunner {
+ /// When set to true, the executor should return immediately, even if there
+ /// still futures to run.
+ cancel: Cell<bool>,
+
+ /// Number of futures currently being executed by the runner.
+ num_futures: Cell<usize>,
+
+ /// Raw pointer to the current scheduler pusher.
+ ///
+ /// The raw pointer is required in order to store it in a thread-local slot.
+ schedule: Cell<Option<*mut Schedule>>,
+}
+
+type Scheduler<T> = scheduler::Scheduler<Task, T>;
+type Schedule = scheduler::Schedule<Task>;
+
+struct Task(Spawn<Box<Future<Item = (), Error = ()>>>);
+
+/// Current thread's task runner. This is set in `TaskRunner::with`
+thread_local!(static CURRENT: CurrentRunner = CurrentRunner {
+ cancel: Cell::new(false),
+ num_futures: Cell::new(0),
+ schedule: Cell::new(None),
+});
+
+/// Calls the given closure, then block until all futures submitted for
+/// execution complete.
+///
+/// In more detail, this function will block until:
+/// - All executing futures are complete, or
+/// - `cancel_all_spawned` is invoked.
+pub fn run<F, R>(f: F) -> R
+where F: FnOnce(&mut Context) -> R
+{
+ sleep::BlockThread::with_current(|mut sleep| {
+ TaskRunner::enter(&mut sleep, f)
+ })
+}
+
+/// Calls the given closure with a custom sleep strategy.
+///
+/// This function is the same as `run` except that it allows customizing the
+/// sleep strategy.
+pub fn run_with_sleep<S, F, R>(sleep: &mut S, f: F) -> R
+where F: FnOnce(&mut Context) -> R,
+ S: Sleep,
+{
+ TaskRunner::enter(sleep, f)
+}
+
+/// Executes a future on the current thread.
+///
+/// The provided future must complete or be canceled before `run` will return.
+///
+/// # Panics
+///
+/// This function can only be invoked from the context of a `run` call; any
+/// other use will result in a panic.
+pub fn spawn<F>(future: F)
+where F: Future<Item = (), Error = ()> + 'static
+{
+ execute(future).unwrap_or_else(|_| {
+ panic!("cannot call `execute` unless the thread is already \
+ in the context of a call to `run`")
+ })
+}
+
+/// Returns an executor that executes futures on the current thread.
+///
+/// The user of `TaskExecutor` must ensure that when a future is submitted,
+/// that it is done within the context of a call to `run`.
+///
+/// For more details, see the [module level](index.html) documentation.
+pub fn task_executor() -> TaskExecutor {
+ TaskExecutor {
+ _p: ::std::marker::PhantomData,
+ }
+}
+
+impl<F> Executor<F> for TaskExecutor
+where F: Future<Item = (), Error = ()> + 'static
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ execute(future)
+ }
+}
+
+impl<'a> Context<'a> {
+ /// Cancels *all* executing futures.
+ pub fn cancel_all_spawned(&self) {
+ self.cancel.set(true);
+ }
+}
+
+/// Submits a future to the current executor. This is done by
+/// checking the thread-local variable tracking the current executor.
+///
+/// If this function is not called in context of an executor, i.e. outside of
+/// `run`, then `Err` is returned.
+///
+/// This function does not panic.
+fn execute<F>(future: F) -> Result<(), ExecuteError<F>>
+where F: Future<Item = (), Error = ()> + 'static,
+{
+ CURRENT.with(|current| {
+ match current.schedule.get() {
+ Some(schedule) => {
+ let spawned = Task::new(future);
+
+ let num_futures = current.num_futures.get();
+ current.num_futures.set(num_futures + 1);
+
+ unsafe { (*schedule).schedule(spawned); }
+
+ Ok(())
+ }
+ None => {
+ Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future))
+ }
+ }
+ })
+}
+
+impl<T> TaskRunner<T>
+where T: Wakeup,
+{
+ /// Return a new `TaskRunner`
+ fn new(wakeup: T) -> TaskRunner<T> {
+ let scheduler = scheduler::Scheduler::new(wakeup);
+
+ TaskRunner {
+ scheduler: scheduler,
+ }
+ }
+
+ /// Enter a new `TaskRunner` context
+ ///
+ /// This function handles advancing the scheduler state and blocking while
+ /// listening for notified futures.
+ ///
+ /// First, a new task runner is created backed by the current
+ /// `sleep::BlockThread` handle. Passing `sleep::BlockThread` into the
+ /// scheduler is how scheduled futures unblock the thread, signalling that
+ /// there is more work to do.
+ ///
+ /// Before any future is polled, the scheduler must be set to a thread-local
+ /// variable so that `execute` is able to submit new futures to the current
+ /// executor. Because `Scheduler::schedule` requires `&mut self`, this
+ /// introduces a mutability hazard. This hazard is minimized with some
+ /// indirection. See `set_schedule` for more details.
+ ///
+ /// Once all context is setup, the init closure is invoked. This is the
+ /// "boostrapping" process that executes the initial futures into the
+ /// scheduler. After this, the function loops and advances the scheduler
+ /// state until all futures complete. When no scheduled futures are ready to
+ /// be advanced, the thread is blocked using `S: Sleep`.
+ fn enter<S, F, R>(sleep: &mut S, f: F) -> R
+ where F: FnOnce(&mut Context) -> R,
+ S: Sleep<Wakeup = T>,
+ {
+ let mut runner = TaskRunner::new(sleep.wakeup());
+
+ CURRENT.with(|current| {
+ // Make sure that another task runner is not set.
+ //
+ // This should not be ever possible due to how `set_schedule`
+ // is setup, but better safe than sorry!
+ assert!(current.schedule.get().is_none());
+
+ // Enter an execution scope
+ let mut ctx = Context {
+ cancel: &current.cancel,
+ };
+
+ // Set the scheduler to the TLS and perform setup work,
+ // returning a future to execute.
+ //
+ // This could possibly suubmit other futures for execution.
+ let ret = current.set_schedule(&mut runner.scheduler as &mut Schedule, || {
+ f(&mut ctx)
+ });
+
+ // Execute the runner.
+ //
+ // This function will not return until either
+ //
+ // a) All futures have completed execution
+ // b) `cancel_all_spawned` is called, forcing the executor to
+ // return.
+ runner.run(sleep, current);
+
+ // Not technically required, but this makes the fact that `ctx`
+ // needs to live until this point explicit.
+ drop(ctx);
+
+ ret
+ })
+ }
+
+ fn run<S>(&mut self, sleep: &mut S, current: &CurrentRunner)
+ where S: Sleep<Wakeup = T>,
+ {
+ use super::scheduler::Tick;
+
+ while current.is_running() {
+ // Try to advance the scheduler state
+ let res = self.scheduler.tick(|scheduler, spawned, notify| {
+ // `scheduler` is a `&mut Scheduler` reference returned back
+ // from the scheduler to us, but only within the context of this
+ // closure.
+ //
+ // This lets us push new futures into the scheduler. It also
+ // lets us pass the scheduler mutable reference into
+ // `set_schedule`, which sets the thread-local variable that
+ // `spawn` uses for submitting new futures to the
+ // "current" executor.
+ //
+ // See `set_schedule` documentation for more details on how we
+ // guard against mutable pointer aliasing.
+ current.set_schedule(scheduler as &mut Schedule, || {
+ match spawned.0.poll_future_notify(notify, 0) {
+ Ok(Async::Ready(_)) | Err(_) => {
+ Async::Ready(())
+ }
+ Ok(Async::NotReady) => Async::NotReady,
+ }
+ })
+ });
+
+ // Process the result of ticking the scheduler
+ match res {
+ // A future completed. `is_daemon` is true when the future was
+ // submitted as a daemon future.
+ Tick::Data(_) => {
+ let num_futures = current.num_futures.get();
+ debug_assert!(num_futures > 0);
+ current.num_futures.set(num_futures - 1);
+ },
+ Tick::Empty => {
+ // The scheduler did not have any work to process.
+ //
+ // At this point, the scheduler is currently running given
+ // that the `while` condition was true and no user code has
+ // been executed.
+
+ debug_assert!(current.is_running());
+
+ // Block the current thread until a future managed by the scheduler
+ // receives a readiness notification.
+ sleep.sleep();
+ }
+ Tick::Inconsistent => {
+ // Yield the thread and loop
+ thread::yield_now();
+ }
+ }
+ }
+ }
+}
+
+impl CurrentRunner {
+ /// Set the provided schedule handle to the TLS slot for the duration of the
+ /// closure.
+ ///
+ /// `spawn` will access the CURRENT thread-local variable in
+ /// order to push a future into the scheduler. This requires a `&mut`
+ /// reference, introducing mutability hazards.
+ ///
+ /// Rust requires that `&mut` references are not aliases, i.e. there are
+ /// never two "live" mutable references to the same piece of data. In order
+ /// to store a `&mut` reference in a thread-local variable, we must ensure
+ /// that one can not access the scheduler anywhere else.
+ ///
+ /// To do this, we only allow access to the thread local variable from
+ /// within the closure passed to `set_schedule`. This function also takes a
+ /// &mut reference to the scheduler, which is essentially holding a "lock"
+ /// on that reference, preventing any other location in the code from
+ /// also getting that &mut reference.
+ ///
+ /// When `set_schedule` returns, the thread-local variable containing the
+ /// mut reference is set to null. This is done even if the closure panics.
+ ///
+ /// This reduces the odds of introducing pointer aliasing.
+ fn set_schedule<F, R>(&self, schedule: &mut Schedule, f: F) -> R
+ where F: FnOnce() -> R
+ {
+ // Ensure that the runner is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a CurrentRunner);
+
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.0.schedule.set(None);
+ }
+ }
+
+ let _reset = Reset(self);
+
+ self.schedule.set(Some(schedule as *mut Schedule));
+
+ f()
+ }
+
+ fn is_running(&self) -> bool {
+ self.num_futures.get() > 0 && !self.cancel.get()
+ }
+}
+
+impl Task {
+ fn new<T: Future<Item = (), Error = ()> + 'static>(f: T) -> Self {
+ Task(executor::spawn(Box::new(f)))
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Task")
+ .finish()
+ }
+}
diff --git a/src/executor/mod.rs b/src/executor/mod.rs
new file mode 100644
index 00000000..896676c8
--- /dev/null
+++ b/src/executor/mod.rs
@@ -0,0 +1,8 @@
+//! Task execution utilities.
+//!
+//! This module only contains `current_thread`, an executor for multiplexing
+//! many tasks on a single thread.
+
+pub mod current_thread;
+mod scheduler;
+mod sleep;
diff --git a/src/executor/scheduler.rs b/src/executor/scheduler.rs
new file mode 100644
index 00000000..e9e6e1a3
--- /dev/null
+++ b/src/executor/scheduler.rs
@@ -0,0 +1,663 @@
+//! An unbounded set of futures.
+
+use super::sleep::Wakeup;
+
+use futures::Async;
+use futures::executor::{self, UnsafeNotify, NotifyHandle};
+
+use std::cell::UnsafeCell;
+use std::fmt::{self, Debug};
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
+use std::sync::atomic::{AtomicPtr, AtomicBool};
+use std::sync::{Arc, Weak};
+use std::usize;
+
+/// A generic task-aware scheduler.
+///
+/// This is used both by `FuturesUnordered` and the current-thread executor.
+pub struct Scheduler<T, W> {
+ inner: Arc<Inner<T, W>>,
+ nodes: List<T, W>,
+}
+
+/// Schedule new futures
+pub trait Schedule<T> {
+ /// Schedule a new future.
+ fn schedule(&mut self, item: T);
+}
+
+pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc<Node<T, W>>);
+
+// A linked-list of nodes
+struct List<T, W> {
+ len: usize,
+ head: *const Node<T, W>,
+ tail: *const Node<T, W>,
+}
+
+unsafe impl<T: Send, W: Wakeup> Send for Scheduler<T, W> {}
+unsafe impl<T: Sync, W: Wakeup> Sync for Scheduler<T, W> {}
+
+// Scheduler is implemented using two linked lists. The first linked list tracks
+// all items managed by a `Scheduler`. This list is stored on the `Scheduler`
+// struct and is **not** thread safe. The second linked list is an
+// implementation of the intrusive MPSC queue algorithm described by
+// 1024cores.net and is stored on `Inner`. This linked list can push items to
+// the back concurrently but only one consumer may pop from the front. To
+// enforce this requirement, all popping will be performed via fns on
+// `Scheduler` that take `&mut self`.
+//
+// When a item is submitted to the set a node is allocated and inserted in
+// both linked lists. This means that all insertion operations **must** be
+// originated from `Scheduler` with `&mut self` The next call to `tick` will
+// (eventually) see this node and call `poll` on the item.
+//
+// Nodes are wrapped in `Arc` cells which manage the lifetime of the node.
+// However, `Arc` handles are sometimes cast to `*const Node` pointers.
+// Specifically, when a node is stored in at least one of the two lists
+// described above, this represents a logical `Arc` handle. This is how
+// `Scheduler` maintains its reference to all nodes it manages. Each
+// `NotifyHande` instance is an `Arc<Node>` as well.
+//
+// When `Scheduler` drops, it clears the linked list of all nodes that it
+// manages. When doing so, it must attempt to decrement the reference count (by
+// dropping an Arc handle). However, it can **only** decrement the reference
+// count if the node is not currently stored in the mpsc channel. If the node