diff options
author | Carl Lerche <me@carllerche.com> | 2018-02-06 07:26:21 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-06 07:26:21 -0800 |
commit | f0ea9d6f4c0a734ac4c235630f3d8cc51fb48f51 (patch) | |
tree | 011aae238269ce6ba1cf29013126e4e45fea4cd4 | |
parent | 567887cc75170437f75f19f5966f2b32bf49ab72 (diff) |
Switch back to futures from crates.io (#113)
Doing so requires copying the `current_thread` executor from GitHub into
the repo.
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | examples/chat-combinator.rs | 4 | ||||
-rw-r--r-- | examples/chat.rs | 2 | ||||
-rw-r--r-- | examples/compress.rs | 4 | ||||
-rw-r--r-- | examples/connect.rs | 8 | ||||
-rw-r--r-- | examples/echo-threads.rs | 6 | ||||
-rw-r--r-- | examples/echo-udp.rs | 6 | ||||
-rw-r--r-- | examples/echo.rs | 4 | ||||
-rw-r--r-- | examples/hello.rs | 3 | ||||
-rw-r--r-- | examples/hello_world.rs | 3 | ||||
-rw-r--r-- | examples/proxy.rs | 5 | ||||
-rw-r--r-- | examples/sink.rs | 4 | ||||
-rw-r--r-- | examples/tinydb.rs | 4 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 2 | ||||
-rw-r--r-- | examples/udp-codec.rs | 4 | ||||
-rw-r--r-- | src/executor/current_thread.rs | 413 | ||||
-rw-r--r-- | src/executor/mod.rs | 8 | ||||
-rw-r--r-- | src/executor/scheduler.rs | 663 | ||||
-rw-r--r-- | src/executor/sleep.rs | 169 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | tests/buffered.rs | 3 | ||||
-rw-r--r-- | tests/chain.rs | 3 | ||||
-rw-r--r-- | tests/drop-core.rs | 8 | ||||
-rw-r--r-- | tests/echo.rs | 3 | ||||
-rw-r--r-- | tests/global.rs | 3 | ||||
-rw-r--r-- | tests/limit.rs | 3 | ||||
-rw-r--r-- | tests/line-frames.rs | 16 | ||||
-rw-r--r-- | tests/pipe-hup.rs | 4 | ||||
-rw-r--r-- | tests/stream-buffered.rs | 3 | ||||
-rw-r--r-- | tests/tcp.rs | 7 | ||||
-rw-r--r-- | tests/udp.rs | 13 |
31 files changed, 1313 insertions, 70 deletions
@@ -50,7 +50,3 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" time = "0.1" - -[patch.crates-io] -futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" } -mio = { git = "https://github.com/carllerche/mio" } diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs index 667f0e9a..76e689b9 100644 --- a/examples/chat-combinator.rs +++ b/examples/chat-combinator.rs @@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader}; use std::sync::{Arc, Mutex}; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; @@ -134,5 +134,5 @@ fn main() { }); // execute server - future::blocking(srv).wait().unwrap(); + srv.wait().unwrap(); } diff --git a/examples/chat.rs b/examples/chat.rs index da8889fd..1b155427 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -33,10 +33,10 @@ extern crate tokio; extern crate tokio_io; extern crate bytes; +use tokio::executor::current_thread; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead}; use futures::prelude::*; -use futures::current_thread; use futures::sync::mpsc; use futures::future::{self, Either}; use bytes::{BytesMut, Bytes, BufMut}; diff --git a/examples/compress.rs b/examples/compress.rs index 501548ef..3098abf7 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -29,7 +29,7 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, Poll}; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -62,7 +62,7 @@ fn main() { Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } /// The main workhorse of this example. This'll compress all data read from diff --git a/examples/connect.rs b/examples/connect.rs index f0619fbd..a4160449 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; use std::thread; use futures::sync::mpsc; -use futures::{future, Sink, Stream}; +use futures::{Future, Sink, Stream}; use futures_cpupool::CpuPool; fn main() { @@ -71,9 +71,9 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - future::blocking(stdout.for_each(|chunk| { + stdout.for_each(|chunk| { out.write_all(&chunk) - })).wait().unwrap(); + }).wait().unwrap(); } mod tcp { @@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { Ok(n) => n, }; buf.truncate(n); - tx = match future::blocking(tx.send(buf)).wait() { + tx = match tx.send(buf).wait() { Ok(tx) => tx, Err(_) => break, }; diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index e2525c80..6ce8b156 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -24,7 +24,7 @@ use std::net::SocketAddr; use std::thread; use futures::prelude::*; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::sync::mpsc; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -61,7 +61,7 @@ fn main() { next = (next + 1) % channels.len(); Ok(()) }); - future::blocking(srv).wait().unwrap(); + srv.wait().unwrap(); } fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { @@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index 2ce43bc0..f7e2bf09 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -18,7 +18,7 @@ extern crate tokio_io; use std::{env, io}; use std::net::SocketAddr; -use futures::{future, Future, Poll}; +use futures::{Future, Poll}; use tokio::net::UdpSocket; struct Server { @@ -58,9 +58,9 @@ fn main() { // Next we'll create a future to spawn (the one we defined above) and then // we'll block our current thread waiting on the result of the future - future::blocking(Server { + Server { socket: socket, buf: vec![0; 1024], to_send: None, - }).wait().unwrap(); + }.wait().unwrap(); } diff --git a/examples/echo.rs b/examples/echo.rs index 54a28ff7..558f3a68 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -26,7 +26,7 @@ use std::env; use std::net::SocketAddr; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -114,5 +114,5 @@ fn main() { // And finally now that we've define what our server is, we run it! Here we // just need to execute the future we've created and wait for it to complete // using the standard methods in the `futures` crate. - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index d9e46d17..5ceb431b 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,7 +19,6 @@ extern crate tokio_io; use std::env; use std::net::SocketAddr; -use futures::future; use futures::prelude::*; use tokio::net::TcpListener; @@ -41,5 +40,5 @@ fn main() { Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 5cac1259..fee06607 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -18,9 +18,10 @@ extern crate tokio; extern crate tokio_io; extern crate futures; +use tokio::executor::current_thread; use tokio::net::TcpListener; use tokio_io::io; -use futures::{current_thread, Future, Stream}; +use futures::{Future, Stream}; pub fn main() { let addr = "127.0.0.1:6142".parse().unwrap(); diff --git a/examples/proxy.rs b/examples/proxy.rs index f73dd30d..131fa41b 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -28,7 +28,7 @@ use std::io::{self, Read, Write}; use futures::stream::Stream; use futures::{Future, Poll}; -use futures::future::{self, Executor}; +use futures::future::{Executor}; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -92,7 +92,8 @@ fn main() { Ok(()) }); - future::blocking(done).wait().unwrap(); + + done.wait().unwrap(); } // This is a custom type used to have a custom implementation of the diff --git a/examples/sink.rs b/examples/sink.rs index 3fa5f5ed..21456ada 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -26,7 +26,7 @@ use std::iter; use std::net::SocketAddr; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio_io::IoFuture; @@ -46,7 +46,7 @@ fn main() { pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } fn write(socket: TcpStream) -> IoFuture<()> { diff --git a/examples/tinydb.rs b/examples/tinydb.rs index de750404..0a68a314 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -51,7 +51,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use futures::prelude::*; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::TcpListener; use tokio_io::AsyncRead; @@ -160,7 +160,7 @@ fn main() { Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } impl Request { diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index b0106d63..2f982484 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -90,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { })).unwrap(); Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } /// "Server logic" is implemented in this function. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 5c11e9f3..c874ebd7 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -15,7 +15,7 @@ use std::io; use std::net::SocketAddr; use futures::{Future, Stream, Sink}; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; @@ -76,5 +76,5 @@ fn main() { // Spawn the sender of pongs and then wait for our pinger to finish. pool.execute(b.then(|_| Ok(()))).unwrap(); - drop(future::blocking(a).wait()); + drop(a.wait()); } diff --git a/src/executor/current_thread.rs b/src/executor/current_thread.rs new file mode 100644 index 00000000..edc011ed --- /dev/null +++ b/src/executor/current_thread.rs @@ -0,0 +1,413 @@ +//! Execute tasks on the current thread +//! +//! This module implements an executor that keeps futures on the same thread +//! that they are submitted on. This allows it to execute futures that are +//! not `Send`. +//! +//! Before being able to spawn futures with this module, an executor +//! context must be setup by calling [`run`]. From within that context [`spawn`] +//! may be called with the future to run in the background. +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! current_thread::run(|_| { +//! // The execution context is setup, futures may be executed. +//! current_thread::spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! }); +//! # } +//! ``` +//! +//! # Execution model +//! +//! When an execution context is setup with `run` the current thread will block +//! and all the futures managed by the executor are driven to completion. +//! Whenever a future receives a notification, it is pushed to the end of a +//! scheduled list. The executor will drain this list, advancing the state of +//! each future. +//! +//! All futures managed by this module will remain on the current thread, +//! as such, this module is able to safely execute futures that are not `Send`. +//! +//! Once a future is complete, it is dropped. Once all futures are completed, +//! [`run`] will unblock and return. +//! +//! This module makes a best effort to fairly schedule futures that it manages. +//! +//! [`spawn`]: fn.spawn.html +//! [`run`]: fn.run.html + +use super::{scheduler}; +use super::sleep::{self, Sleep, Wakeup}; + +use futures::Async; +use futures::executor::{self, Spawn}; +use futures::future::{Future, Executor, ExecuteError, ExecuteErrorKind}; + +use std::{fmt, thread}; +use std::cell::Cell; +use std::rc::Rc; + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData<Rc<()>>, +} + +/// A context yielded to the closure provided to `run`. +/// +/// This context is mostly a future-proofing of the library to add future +/// contextual information into it. Currently it only contains the `Enter` +/// instance used to reserve the current thread for blocking on futures. +#[derive(Debug)] +pub struct Context<'a> { + cancel: &'a Cell<bool>, +} + +/// Implements the "blocking" logic for the current thread executor. A +/// `TaskRunner` will be created during `run` and will sit on the stack until +/// execution is complete. +#[derive(Debug)] +struct TaskRunner<T> { + /// Executes futures. + scheduler: Scheduler<T>, +} + +struct CurrentRunner { + /// When set to true, the executor should return immediately, even if there + /// still futures to run. + cancel: Cell<bool>, + + /// Number of futures currently being executed by the runner. + num_futures: Cell<usize>, + + /// Raw pointer to the current scheduler pusher. + /// + /// The raw pointer is required in order to store it in a thread-local slot. + schedule: Cell<Option<*mut Schedule>>, +} + +type Scheduler<T> = scheduler::Scheduler<Task, T>; +type Schedule = scheduler::Schedule<Task>; + +struct Task(Spawn<Box<Future<Item = (), Error = ()>>>); + +/// Current thread's task runner. This is set in `TaskRunner::with` +thread_local!(static CURRENT: CurrentRunner = CurrentRunner { + cancel: Cell::new(false), + num_futures: Cell::new(0), + schedule: Cell::new(None), +}); + +/// Calls the given closure, then block until all futures submitted for +/// execution complete. +/// +/// In more detail, this function will block until: +/// - All executing futures are complete, or +/// - `cancel_all_spawned` is invoked. +pub fn run<F, R>(f: F) -> R +where F: FnOnce(&mut Context) -> R +{ + sleep::BlockThread::with_current(|mut sleep| { + TaskRunner::enter(&mut sleep, f) + }) +} + +/// Calls the given closure with a custom sleep strategy. +/// +/// This function is the same as `run` except that it allows customizing the +/// sleep strategy. +pub fn run_with_sleep<S, F, R>(sleep: &mut S, f: F) -> R +where F: FnOnce(&mut Context) -> R, + S: Sleep, +{ + TaskRunner::enter(sleep, f) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +pub fn spawn<F>(future: F) +where F: Future<Item = (), Error = ()> + 'static +{ + execute(future).unwrap_or_else(|_| { + panic!("cannot call `execute` unless the thread is already \ + in the context of a call to `run`") + }) +} + +/// Returns an executor that executes futures on the current thread. +/// +/// The user of `TaskExecutor` must ensure that when a future is submitted, +/// that it is done within the context of a call to `run`. +/// +/// For more details, see the [module level](index.html) documentation. +pub fn task_executor() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } +} + +impl<F> Executor<F> for TaskExecutor +where F: Future<Item = (), Error = ()> + 'static +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + execute(future) + } +} + +impl<'a> Context<'a> { + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +/// Submits a future to the current executor. This is done by +/// checking the thread-local variable tracking the current executor. +/// +/// If this function is not called in context of an executor, i.e. outside of +/// `run`, then `Err` is returned. +/// +/// This function does not panic. +fn execute<F>(future: F) -> Result<(), ExecuteError<F>> +where F: Future<Item = (), Error = ()> + 'static, +{ + CURRENT.with(|current| { + match current.schedule.get() { + Some(schedule) => { + let spawned = Task::new(future); + + let num_futures = current.num_futures.get(); + current.num_futures.set(num_futures + 1); + + unsafe { (*schedule).schedule(spawned); } + + Ok(()) + } + None => { + Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)) + } + } + }) +} + +impl<T> TaskRunner<T> +where T: Wakeup, +{ + /// Return a new `TaskRunner` + fn new(wakeup: T) -> TaskRunner<T> { + let scheduler = scheduler::Scheduler::new(wakeup); + + TaskRunner { + scheduler: scheduler, + } + } + + /// Enter a new `TaskRunner` context + /// + /// This function handles advancing the scheduler state and blocking while + /// listening for notified futures. + /// + /// First, a new task runner is created backed by the current + /// `sleep::BlockThread` handle. Passing `sleep::BlockThread` into the + /// scheduler is how scheduled futures unblock the thread, signalling that + /// there is more work to do. + /// + /// Before any future is polled, the scheduler must be set to a thread-local + /// variable so that `execute` is able to submit new futures to the current + /// executor. Because `Scheduler::schedule` requires `&mut self`, this + /// introduces a mutability hazard. This hazard is minimized with some + /// indirection. See `set_schedule` for more details. + /// + /// Once all context is setup, the init closure is invoked. This is the + /// "boostrapping" process that executes the initial futures into the + /// scheduler. After this, the function loops and advances the scheduler + /// state until all futures complete. When no scheduled futures are ready to + /// be advanced, the thread is blocked using `S: Sleep`. + fn enter<S, F, R>(sleep: &mut S, f: F) -> R + where F: FnOnce(&mut Context) -> R, + S: Sleep<Wakeup = T>, + { + let mut runner = TaskRunner::new(sleep.wakeup()); + + CURRENT.with(|current| { + // Make sure that another task runner is not set. + // + // This should not be ever possible due to how `set_schedule` + // is setup, but better safe than sorry! + assert!(current.schedule.get().is_none()); + + // Enter an execution scope + let mut ctx = Context { + cancel: ¤t.cancel, + }; + + // Set the scheduler to the TLS and perform setup work, + // returning a future to execute. + // + // This could possibly suubmit other futures for execution. + let ret = current.set_schedule(&mut runner.scheduler as &mut Schedule, || { + f(&mut ctx) + }); + + // Execute the runner. + // + // This function will not return until either + // + // a) All futures have completed execution + // b) `cancel_all_spawned` is called, forcing the executor to + // return. + runner.run(sleep, current); + + // Not technically required, but this makes the fact that `ctx` + // needs to live until this point explicit. + drop(ctx); + + ret + }) + } + + fn run<S>(&mut self, sleep: &mut S, current: &CurrentRunner) + where S: Sleep<Wakeup = T>, + { + use super::scheduler::Tick; + + while current.is_running() { + // Try to advance the scheduler state + let res = self.scheduler.tick(|scheduler, spawned, notify| { + // `scheduler` is a `&mut Scheduler` reference returned back + // from the scheduler to us, but only within the context of this + // closure. + // + // This lets us push new futures into the scheduler. It also + // lets us pass the scheduler mutable reference into + // `set_schedule`, which sets the thread-local variable that + // `spawn` uses for submitting new futures to the + // "current" executor. + // + // See `set_schedule` documentation for more details on how we + // guard against mutable pointer aliasing. + current.set_schedule(scheduler as &mut Schedule, || { + match spawned.0.poll_future_notify(notify, 0) { + Ok(Async::Ready(_)) | Err(_) => { + Async::Ready(()) + } + Ok(Async::NotReady) => Async::NotReady, + } + }) + }); + + // Process the result of ticking the scheduler + match res { + // A future completed. `is_daemon` is true when the future was + // submitted as a daemon future. + Tick::Data(_) => { + let num_futures = current.num_futures.get(); + debug_assert!(num_futures > 0); + current.num_futures.set(num_futures - 1); + }, + Tick::Empty => { + // The scheduler did not have any work to process. + // + // At this point, the scheduler is currently running given + // that the `while` condition was true and no user code has + // been executed. + + debug_assert!(current.is_running()); + + // Block the current thread until a future managed by the scheduler + // receives a readiness notification. + sleep.sleep(); + } + Tick::Inconsistent => { + // Yield the thread and loop + thread::yield_now(); + } + } + } + } +} + +impl CurrentRunner { + /// Set the provided schedule handle to the TLS slot for the duration of the + /// closure. + /// + /// `spawn` will access the CURRENT thread-local variable in + /// order to push a future into the scheduler. This requires a `&mut` + /// reference, introducing mutability hazards. + /// + /// Rust requires that `&mut` references are not aliases, i.e. there are + /// never two "live" mutable references to the same piece of data. In order + /// to store a `&mut` reference in a thread-local variable, we must ensure + /// that one can not access the scheduler anywhere else. + /// + /// To do this, we only allow access to the thread local variable from + /// within the closure passed to `set_schedule`. This function also takes a + /// &mut reference to the scheduler, which is essentially holding a "lock" + /// on that reference, preventing any other location in the code from + /// also getting that &mut reference. + /// + /// When `set_schedule` returns, the thread-local variable containing the + /// mut reference is set to null. This is done even if the closure panics. + /// + /// This reduces the odds of introducing pointer aliasing. + fn set_schedule<F, R>(&self, schedule: &mut Schedule, f: F) -> R + where F: FnOnce() -> R + { + // Ensure that the runner is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.schedule.set(None); + } + } + + let _reset = Reset(self); + + self.schedule.set(Some(schedule as *mut Schedule)); + + f() + } + + fn is_running(&self) -> bool { + self.num_futures.get() > 0 && !self.cancel.get() + } +} + +impl Task { + fn new<T: Future<Item = (), Error = ()> + 'static>(f: T) -> Self { + Task(executor::spawn(Box::new(f))) + } +} + +impl fmt::Debug for Task { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Task") + .finish() + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs new file mode 100644 index 00000000..896676c8 --- /dev/null +++ b/src/executor/mod.rs @@ -0,0 +1,8 @@ +//! Task execution utilities. +//! +//! This module only contains `current_thread`, an executor for multiplexing +//! many tasks on a single thread. + +pub mod current_thread; +mod scheduler; +mod sleep; diff --git a/src/executor/scheduler.rs b/src/executor/scheduler.rs new file mode 100644 index 00000000..e9e6e1a3 --- /dev/null +++ b/src/executor/scheduler.rs @@ -0,0 +1,663 @@ +//! An unbounded set of futures. + +use super::sleep::Wakeup; + +use futures::Async; +use futures::executor::{self, UnsafeNotify, NotifyHandle}; + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::usize; + +/// A generic task-aware scheduler. +/// +/// This is used both by `FuturesUnordered` and the current-thread executor. +pub struct Scheduler<T, W> { + inner: Arc<Inner<T, W>>, + nodes: List<T, W>, +} + +/// Schedule new futures +pub trait Schedule<T> { + /// Schedule a new future. + fn schedule(&mut self, item: T); +} + +pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc<Node<T, W>>); + +// A linked-list of nodes +struct List<T, W> { + len: usize, + head: *const Node<T, W>, + tail: *const Node<T, W>, +} + +unsafe impl<T: Send, W: Wakeup> Send for Scheduler<T, W> {} +unsafe impl<T: Sync, W: Wakeup> Sync for Scheduler<T, W> {} + +// Scheduler is implemented using two linked lists. The first linked list tracks +// all items managed by a `Scheduler`. This list is stored on the `Scheduler` +// struct and is **not** thread safe. The second linked list is an +// implementation of the intrusive MPSC queue algorithm described by +// 1024cores.net and is stored on `Inner`. This linked list can push items to +// the back concurrently but only one consumer may pop from the front. To +// enforce this requirement, all popping will be performed via fns on +// `Scheduler` that take `&mut self`. +// +// When a item is submitted to the set a node is allocated and inserted in +// both linked lists. This means that all insertion operations **must** be +// originated from `Scheduler` with `&mut self` The next call to `tick` will +// (eventually) see this node and call `poll` on the item. +// +// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. +// However, `Arc` handles are sometimes cast to `*const Node` pointers. +// Specifically, when a node is stored in at least one of the two lists +// described above, this represents a logical `Arc` handle. This is how +// `Scheduler` maintains its reference to all nodes it manages. Each +// `NotifyHande` instance is an `Arc<Node>` as well. +// +// When `Scheduler` drops, it clears the linked list of all nodes that it +// manages. When doing so, it must attempt to decrement the reference count (by +// dropping an Arc handle). However, it can **only** decrement the reference +// count if the node is not currently stored in the mpsc channel. If the node |