summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-09-17 22:23:48 -0700
committerGitHub <noreply@github.com>2018-09-17 22:23:48 -0700
commit4019198706604b107b433499db59d91894aa676e (patch)
tree8eddb76011bf09fcab0428d330faabe4dd88a482
parent24dc85dc5e71526aa74b5587348c973beab7ac47 (diff)
Add some missing future::Executor implementations (#563)
This adds an implementation of future::Executor for `executor::DefaultExecutor` and `runtime::current_thread::Handle`.
-rw-r--r--src/runtime/current_thread/runtime.rs34
-rw-r--r--tests/runtime.rs331
-rw-r--r--tokio-current-thread/src/lib.rs17
-rw-r--r--tokio-current-thread/tests/current_thread.rs435
-rw-r--r--tokio-executor/src/global.rs21
-rw-r--r--tokio-executor/tests/executor.rs27
6 files changed, 616 insertions, 249 deletions
diff --git a/src/runtime/current_thread/runtime.rs b/src/runtime/current_thread/runtime.rs
index 7df915d3..262cb1e7 100644
--- a/src/runtime/current_thread/runtime.rs
+++ b/src/runtime/current_thread/runtime.rs
@@ -7,7 +7,7 @@ use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
use tokio_executor;
-use futures::Future;
+use futures::{future, Future};
use std::fmt;
use std::error::Error;
@@ -42,6 +42,38 @@ impl Handle {
where F: Future<Item = (), Error = ()> + Send + 'static {
self.0.spawn(future)
}
+
+ /// Provides a best effort **hint** to whether or not `spawn` will succeed.
+ ///
+ /// This function may return both false positives **and** false negatives.
+ /// If `status` returns `Ok`, then a call to `spawn` will *probably*
+ /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
+ /// *probably* fail, but may succeed.
+ ///
+ /// This allows a caller to avoid creating the task if the call to `spawn`
+ /// has a high likelihood of failing.
+ pub fn status(&self) -> Result<(), tokio_executor::SpawnError> {
+ self.0.status()
+ }
+}
+
+impl<T> future::Executor<T> for Handle
+where T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
+ if let Err(e) = self.status() {
+ let kind = if e.is_at_capacity() {
+ future::ExecuteErrorKind::NoCapacity
+ } else {
+ future::ExecuteErrorKind::Shutdown
+ };
+
+ return Err(future::ExecuteError::new(kind, future));
+ }
+
+ let _ = self.spawn(future);
+ Ok(())
+ }
}
/// Error returned by the `run` function.
diff --git a/tests/runtime.rs b/tests/runtime.rs
index 69251952..ac40a6e0 100644
--- a/tests/runtime.rs
+++ b/tests/runtime.rs
@@ -11,6 +11,11 @@ use tokio::prelude::future::lazy;
use tokio::prelude::*;
use tokio::runtime::Runtime;
+// this import is used in all child modules that have it in scope
+// from importing super::*, but the compiler doesn't realise that
+// and warns about it.
+pub use futures::future::Executor;
+
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
@@ -72,78 +77,130 @@ fn runtime_single_threaded_block_on() {
tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap();
}
-#[test]
-fn runtime_single_threaded_block_on_all() {
- let cnt = Arc::new(Mutex::new(0));
- let c = cnt.clone();
+mod runtime_single_threaded_block_on_all {
+ use super::*;
- let msg = tokio::runtime::current_thread::block_on_all(lazy(move || {
- {
- let mut x = c.lock().unwrap();
- *x = 1 + *x;
- }
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item=(), Error=()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
- // Spawn!
- tokio::spawn(lazy(move || {
+ let msg = tokio::runtime::current_thread::block_on_all(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
- Ok::<(), ()>(())
- }));
- Ok::<_, ()>("hello")
- })).unwrap();
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
- assert_eq!(2, *cnt.lock().unwrap());
- assert_eq!(msg, "hello");
+ Ok::<_, ()>("hello")
+ })).unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| { tokio::spawn(f); })
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
}
-#[test]
-fn runtime_single_threaded_racy_spawn() {
- let (trigger, exit) = futures::sync::oneshot::channel();
- let (handle_tx, handle_rx) = ::std::sync::mpsc::channel();
- let jh = ::std::thread::spawn(move || {
- let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
- handle_tx.send(rt.handle()).unwrap();
-
- // don't exit until we are told to
- rt.block_on(exit.map_err(|_| ())).unwrap();
-
- // run until all spawned futures (incl. the "exit" signal future) have completed.
- rt.run().unwrap();
- });
+mod runtime_single_threaded_racy {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(
+ tokio::runtime::current_thread::Handle,
+ Box<Future<Item=(), Error=()> + Send>,
+ ),
+ {
+ let (trigger, exit) = futures::sync::oneshot::channel();
+ let (handle_tx, handle_rx) = ::std::sync::mpsc::channel();
+ let jh = ::std::thread::spawn(move || {
+ let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
+ handle_tx.send(rt.handle()).unwrap();
+
+ // don't exit until we are told to
+ rt.block_on(exit.map_err(|_| ())).unwrap();
+
+ // run until all spawned futures (incl. the "exit" signal future) have completed.
+ rt.run().unwrap();
+ });
- let (tx, rx) = futures::sync::oneshot::channel();
+ let (tx, rx) = futures::sync::oneshot::channel();
- let handle = handle_rx.recv().unwrap();
- handle
- .spawn(futures::future::lazy(move || {
+ let handle = handle_rx.recv().unwrap();
+ spawn(handle, Box::new(futures::future::lazy(move || {
tx.send(()).unwrap();
Ok(())
- }))
- .unwrap();
+ })));
- // signal runtime thread to exit
- trigger.send(()).unwrap();
+ // signal runtime thread to exit
+ trigger.send(()).unwrap();
- // wait for runtime thread to exit
- jh.join().unwrap();
+ // wait for runtime thread to exit
+ jh.join().unwrap();
+
+ assert_eq!(rx.wait().unwrap(), ());
+ }
- assert_eq!(rx.wait().unwrap(), ());
+ #[test]
+ fn spawn() {
+ test(|handle, f| { handle.spawn(f).unwrap(); })
+ }
+
+ #[test]
+ fn execute() {
+ test(|handle, f| { handle.execute(f).unwrap(); })
+ }
}
-#[test]
-fn runtime_multi_threaded() {
- let _ = env_logger::try_init();
+mod runtime_multi_threaded {
+ use super::*;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime) + Send + 'static,
+ {
+ let _ = env_logger::try_init();
- let mut runtime = tokio::runtime::Builder::new()
- .build()
- .unwrap();
- runtime.spawn(create_client_server_future());
- runtime.shutdown_on_idle().wait().unwrap();
+ let mut runtime = tokio::runtime::Builder::new()
+ .build()
+ .unwrap();
+ spawn(&mut runtime);
+ runtime.shutdown_on_idle().wait().unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(|rt| { rt.spawn(create_client_server_future()); });
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt| { rt.executor().execute(create_client_server_future()).unwrap(); });
+ }
}
+
#[test]
fn block_on_timer() {
use std::time::{Duration, Instant};
@@ -161,35 +218,57 @@ fn block_on_timer() {
runtime.shutdown_on_idle().wait().unwrap();
}
-#[test]
-fn spawn_from_block_on() {
- let cnt = Arc::new(Mutex::new(0));
- let c = cnt.clone();
+mod from_block_on {
+ use super::*;
- let mut runtime = Runtime::new().unwrap();
- let msg = runtime
- .block_on(lazy(move || {
- {
- let mut x = c.lock().unwrap();
- *x = 1 + *x;
- }
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
- // Spawn!
- tokio::spawn(lazy(move || {
+ let mut runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
- Ok::<(), ()>(())
- }));
- Ok::<_, ()>("hello")
- }))
- .unwrap();
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
- runtime.shutdown_on_idle().wait().unwrap();
- assert_eq!(2, *cnt.lock().unwrap());
- assert_eq!(msg, "hello");
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| {
+ tokio::spawn(f);
+ })
+ }
}
#[test]
@@ -220,54 +299,94 @@ fn block_waits() {
runtime.shutdown_on_idle().wait().unwrap();
}
-#[test]
-fn spawn_many() {
+mod many {
+ use super::*;
+
const ITER: usize = 200;
+ fn test<F>(spawn: F)
+ where
+ F: Fn(&mut Runtime, Box<Future<Item=(), Error=()> + Send>),
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let mut runtime = Runtime::new().unwrap();
- let cnt = Arc::new(Mutex::new(0));
- let mut runtime = Runtime::new().unwrap();
+ for _ in 0..ITER {
+ let c = cnt.clone();
+ spawn(&mut runtime, Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+ }
- for _ in 0..ITER {
- let c = cnt.clone();
- runtime.spawn(lazy(move || {
- {
- let mut x = c.lock().unwrap();
- *x = 1 + *x;
- }
- Ok::<(), ()>(())
- }));
+ runtime.shutdown_on_idle().wait().unwrap();
+ assert_eq!(ITER, *cnt.lock().unwrap());
}
- runtime.shutdown_on_idle().wait().unwrap();
- assert_eq!(ITER, *cnt.lock().unwrap());
+ #[test]
+ fn spawn() {
+ test(|rt, f| { rt.spawn(f); })
+ }
+
+ #[test]
+ fn execute() {
+ test(|rt, f| {
+ rt.executor()
+ .execute(f)
+ .unwrap();
+ })
+ }
}
-#[test]
-fn spawn_from_block_on_all() {
- let cnt = Arc::new(Mutex::new(0));
- let c = cnt.clone();
- let runtime = Runtime::new().unwrap();
- let msg = runtime
- .block_on_all(lazy(move || {
- {
- let mut x = c.lock().unwrap();
- *x = 1 + *x;
- }
+mod from_block_on_all {
+ use super::*;
- // Spawn!
- tokio::spawn(lazy(move || {
+ fn test<F>(spawn: F)
+ where
+ F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static,
+ {
+ let cnt = Arc::new(Mutex::new(0));
+ let c = cnt.clone();
+
+ let runtime = Runtime::new().unwrap();
+ let msg = runtime
+ .block_on_all(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
- Ok::<(), ()>(())
- }));
- Ok::<_, ()>("hello")
- }))
- .unwrap();
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ {
+ let mut x = c.lock().unwrap();
+ *x = 1 + *x;
+ }
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, *cnt.lock().unwrap());
+ assert_eq!(msg, "hello");
+ }
- assert_eq!(2, *cnt.lock().unwrap());
- assert_eq!(msg, "hello");
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio::executor::DefaultExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| { tokio::spawn(f); })
+ }
}
diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs
index fb2519f9..69b571c7 100644
--- a/tokio-current-thread/src/lib.rs
+++ b/tokio-current-thread/src/lib.rs
@@ -682,6 +682,23 @@ impl Handle {
self.notify.notify(0);
Ok(())
}
+
+ /// Provides a best effort **hint** to whether or not `spawn` will succeed.
+ ///
+ /// This function may return both false positives **and** false negatives.
+ /// If `status` returns `Ok`, then a call to `spawn` will *probably*
+ /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
+ /// *probably* fail, but may succeed.
+ ///
+ /// This allows a caller to avoid creating the task if the call to `spawn`
+ /// has a high likelihood of failing.
+ pub fn status(&self) -> Result<(), SpawnError> {
+ if self.shut_down.get() {
+ return Err(SpawnError::shutdown());
+ }
+
+ Ok(())
+ }
}
// ===== impl TaskExecutor =====
diff --git a/tokio-current-thread/tests/current_thread.rs b/tokio-current-thread/tests/current_thread.rs
index 598587fd..a98ed792 100644
--- a/tokio-current-thread/tests/current_thread.rs
+++ b/tokio-current-thread/tests/current_thread.rs
@@ -12,28 +12,49 @@ use std::time::Duration;
use futures::task;
use futures::future::{self, lazy};
+// This is not actually unused --- we need this trait to be in scope for
+// the tests that sue TaskExecutor::current().execute(). The compiler
+// doesn't realise that.
+#[allow(unused_imports)]
+use futures::future::Executor as _futures_Executor;
use futures::prelude::*;
use futures::sync::oneshot;
-#[test]
-fn spawn_from_block_on_all() {
- let cnt = Rc::new(Cell::new(0));
- let c = cnt.clone();
-
- let msg = tokio_current_thread::block_on_all(lazy(move || {
- c.set(1 + c.get());
+mod from_block_on_all {
+ use super::*;
+ fn test<F: Fn(Box<Future<Item=(), Error=()>>) + 'static>(spawn: F) {
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
- // Spawn!
- tokio_current_thread::spawn(lazy(move || {
+ let msg = tokio_current_thread::block_on_all(lazy(move || {
c.set(1 + c.get());
- Ok::<(), ()>(())
- }));
- Ok::<_, ()>("hello")
- })).unwrap();
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ c.set(1 + c.get());
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ })).unwrap();
- assert_eq!(2, cnt.get());
- assert_eq!(msg, "hello");
+ assert_eq!(2, cnt.get());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn)
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ });
+ }
}
#[test]
@@ -76,39 +97,61 @@ fn spawn_many() {
assert_eq!(cnt.get(), ITER);
}
-#[test]
-fn does_not_set_global_executor_by_default() {
- use tokio_executor::Executor;
+mod does_not_set_global_executor_by_default {
+ use super::*;
- block_on_all(lazy(|| {
- tokio_executor::DefaultExecutor::current()
- .spawn(Box::new(lazy(|| ok())))
- .unwrap_err();
+ fn test<F: Fn(Box<Future<Item=(), Error=()> + Send>) -> Result<(), E> + 'static, E>(spawn: F) {
+ block_on_all(lazy(|| {
+ spawn(Box::new(lazy(|| ok()))).unwrap_err();
+ ok()
+ })).unwrap()
+ }
- ok()
- })).unwrap();
+ #[test]
+ fn spawn() {
+ use tokio_executor::Executor;
+ test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| tokio_executor::DefaultExecutor::current().execute(f))
+ }
}
-#[test]
-fn spawn_from_block_on_future() {
- let cnt = Rc::new(Cell::new(0));
+mod from_block_on_future {
+ use super::*;
- let mut tokio_current_thread = CurrentThread::new();
+ fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) {
+ let cnt = Rc::new(Cell::new(0));
- tokio_current_thread.block_on(lazy(|| {
- let cnt = cnt.clone();
+ let mut tokio_current_thread = CurrentThread::new();
- tokio_current_thread::spawn(lazy(move || {
- cnt.set(1 + cnt.get());
- Ok(())
- }));
+ tokio_current_thread.block_on(lazy(|| {
+ let cnt = cnt.clone();
- Ok::<_, ()>(())
- })).unwrap();
+ spawn(Box::new(lazy(move || {
+ cnt.set(1 + cnt.get());
+ Ok(())
+ })));
- tokio_current_thread.run().unwrap();
+ Ok::<_, ()>(())
+ })).unwrap();
+
+ tokio_current_thread.run().unwrap();
+
+ assert_eq!(1, cnt.get());
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn);
+ }
- assert_eq!(1, cnt.get());
+ #[test]
+ fn execute() {
+ test(|f| { tokio_current_thread::TaskExecutor::current().execute(f).unwrap(); });
+ }
}
struct Never(Rc<()>);
@@ -122,33 +165,60 @@ impl Future for Never {
}
}
-#[test]
-fn outstanding_tasks_are_dropped_when_executor_is_dropped() {
- let mut rc = Rc::new(());
+mod outstanding_tasks_are_dropped_when_executor_is_dropped {
+ use super::*;
- let mut tokio_current_thread = CurrentThread::new();
- tokio_current_thread.spawn(Never(rc.clone()));
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item=(), Error=()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>)
+ {
+ let mut rc = Rc::new(());
- drop(tokio_current_thread);
+ let mut tokio_current_thread = CurrentThread::new();
+ dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
- // Ensure the daemon is dropped
- assert!(Rc::get_mut(&mut rc).is_some());
+ drop(tokio_current_thread);
- // Using the global spawn fn
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
- let mut rc = Rc::new(());
+ // Using the global spawn fn
- let mut tokio_current_thread = CurrentThread::new();
+ let mut rc = Rc::new(());
- tokio_current_thread.block_on(lazy(|| {
- tokio_current_thread::spawn(Never(rc.clone()));
- Ok::<_, ()>(())
- })).unwrap();
+ let mut tokio_current_thread = CurrentThread::new();
+
+ tokio_current_thread.block_on(lazy(|| {
+ spawn(Box::new(Never(rc.clone())));
+ Ok::<_, ()>(())
+ })).unwrap();
+
+ drop(tokio_current_thread);
- drop(tokio_current_thread);
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); })
+ }
- // Ensure the daemon is dropped
- assert!(Rc::get_mut(&mut rc).is_some());
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| { rt.spawn(f); }
+ );
+ }
}
#[test]
@@ -163,20 +233,41 @@ fn nesting_run() {
})).unwrap();
}
-#[test]
-#[should_panic]
-fn run_in_future() {
- block_on_all(lazy(|| {
- tokio_current_thread::spawn(lazy(|| {
- block_on_all(lazy(|| {
+mod run_in_future {
+ use super::*;
+
+ #[test]
+ #[should_panic]
+ fn spawn() {
+ block_on_all(lazy(|| {
+ tokio_current_thread::spawn(lazy(|| {
+ block_on_all(lazy(|| {
+ ok()
+ })).unwrap();
ok()
- })).unwrap();
+ }));
ok()
- }));
- ok()
- })).unwrap();
+ })).unwrap();
+ }
+
+ #[test]
+ #[should_panic]
+ fn execute() {
+ block_on_all(lazy(|| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(lazy(|| {
+ block_on_all(lazy(|| {
+ ok()
+ })).unwrap();
+ ok()
+ }))
+ .unwrap();
+ ok()
+ })).unwrap();
+ }
}
+
#[test]
fn tick_on_infini_future() {
let num = Rc::new(Cell::new(0));
@@ -206,10 +297,8 @@ fn tick_on_infini_future() {
assert_eq!(1, num.get());
}
-#[test]
-fn tasks_are_scheduled_fairly() {
- let state = Rc::new(RefCell::new([0, 0]));
-
+mod tasks_are_scheduled_fairly {
+ use super::*;
struct Spin {
state: Rc<RefCell<[i32; 2]>>,
idx: usize,
@@ -243,97 +332,171 @@ fn tasks_are_scheduled_fairly() {
}
}
- block_on_all(lazy(|| {
- tokio_current_thread::spawn(Spin {
- state: state.clone(),
- idx: 0,
- });
+ fn test<F: Fn(Spin)>(spawn: F) {
+ let state = Rc::new(RefCell::new([0, 0]));
- tokio_current_thread::spawn(Spin {
- state: state,
- idx: 1,
- });
+ block_on_all(lazy(|| {
+ spawn(Spin {
+ state: state.clone(),
+ idx: 0,
+ });
- ok()
- })).unwrap();
+ spawn(Spin {
+ state: state,
+ idx: 1,
+ });
+
+ ok()
+ })).unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn)
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
}
-#[test]
-fn spawn_and_turn() {
- let cnt = Rc::new(Cell::new(0));
- let c = cnt.clone();
+mod and_turn {
+ use super::*;
- let mut tokio_current_thread = CurrentThread::new();
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item=(), Error=()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>)
+ {
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
- // Spawn a basic task to get the executor to turn
- tokio_current_thread.spawn(lazy(move || {
- Ok(())
- }));
+ let mut tokio_current_thread = CurrentThread::new();
- // Turn once...
- tokio_current_thread.turn(None).unwrap();
+ // Spawn a basic task to get the executor to turn
+ dotspawn(&mut tokio_current_thread, Box::new(lazy(move || {
+ Ok(())
+ })));
- tokio_current_thread.spawn(lazy(move || {
- c.set(1 + c.get());
+ // Turn once...
+ tokio_current_thread.turn(None).unwrap();
- // Spawn!
- tokio_current_thread::spawn(lazy(move || {
+ dotspawn(&mut tokio_current_thread, Box::new(lazy(move || {
c.set(1 + c.get());
- Ok::<(), ()>(())
- }));
- Ok(())
- }));
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ c.set(1 + c.get());
+ Ok::<(), ()>(())
+ })));
- // This does not run the newly spawned thread
- tokio_current_thread.turn(None).unwrap();
- assert_eq!(1, cnt.get());
+ Ok(())
+ })));
- // This runs the newly spawned thread
- tokio_current_thread.turn(None).unwrap();
- assert_eq!(2, cnt.get());
-}
+ // This does not run the newly spawned thread
+ tokio_current_thread.turn(None).unwrap();
+ assert_eq!(1, cnt.get());
-#[test]
-fn spawn_in_drop() {
- let mut tokio_current_thread = CurrentThread::new();
+ // This runs the newly spawned thread
+ tokio_current_thread.turn(None).unwrap();
+ assert_eq!(2, cnt.get());
+ }
- let (tx, rx) = oneshot::channel();
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); })
+ }
- tokio_current_thread.spawn({
- struct OnDrop<F: FnOnce()>(Option<F>);
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| { rt.spawn(f); }
+ );
+ }
- impl<F: FnOnce()> Drop for OnDrop<F> {
- fn drop(&mut self) {
- (self.0.take().unwrap())();
- }
+
+}
+
+mod in_drop {
+ use super::*;
+ struct OnDrop<F: FnOnce()>(Option<F>);
+
+ impl<F: FnOnce()> Drop for OnDrop<F> {
+ fn drop(&mut self) {
+ (self.0.take().unwrap())();
}
+ }
+
+ struct MyFuture {
+ _data: Box<Any>,
+ }
+
+ impl Future for MyFuture {
+ type Item = ();
+ type Error = ();
- struct MyFuture {
- _data: Box<Any>,
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(().into())
}
+ }
- impl Future for MyFuture {
- type Item = ();
- type Error = ();
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item=(), Error=()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>)
+ {
+ let mut tokio_current_thread = CurrentThread::new();
+
+ let (tx, rx) = oneshot::channel();
- fn poll(&mut self) -> Poll<(), ()> {
- Ok(().into())
+ dotspawn(&mut tokio_current_thread, Box::new(
+ MyFuture {
+ _data: Box::new(OnDrop(Some(move || {
+ spawn(Box::new(lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ })));
+ }))),
}
- }
+ ));
- MyFuture {
- _data: Box::new(OnDrop(Some(move || {
- tokio_current_thread::spawn(lazy(move || {
- tx.send(()).unwrap();
- Ok(())
- }));
- }))),
- }
- });
+ tokio_current_thread.block_on(rx).unwrap();
+ tokio_current_thread.run().unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); })
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| { rt.spawn(f); }
+ );
+ }
- tokio_current_thread.block_on(rx).unwrap();
- tokio_current_thread.run().unwrap();
}
#[test]
diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs
index 24867cd5..1420deaf 100644
--- a/tokio-executor/src/global.rs
+++ b/tokio-executor/src/global.rs
@@ -1,6 +1,6 @@
use super::{Executor, Enter, SpawnError};
-use futures::Future;
+use futures::{future, Future};
use std::cell::Cell;
@@ -83,6 +83,25 @@ impl super::Executor for DefaultExecutor {
}
}
+impl<T> future::Executor<T> for DefaultExecutor
+where T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
+ if let Err(e) = super::Executor::status(self) {
+ let kind = if e.is_at_capacity() {
+ future::ExecuteErrorKind::NoCapacity
+ } else {
+ future::ExecuteErrorKind::Shutdown
+ };
+
+ return Err(future::ExecuteError::new(kind, future));
+ }
+
+ let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
+ Ok(())
+ }
+}
+
// ===== global spawn fns =====
/// Submits a future for execution on the default executor -- usually a
diff --git a/tokio-executor/tests/executor.rs b/tokio-executor/tests/executor.rs
index 77436ec9..0c7269fd 100644
--- a/tokio-executor/tests/executor.rs
+++ b/tokio-executor/tests/executor.rs
@@ -2,10 +2,27 @@ extern crate tokio_executor;
extern crate futures;
use tokio_executor::*;
-use futures::future::lazy;
+use futures::{Future, future::lazy};
-#[test]
-fn spawn_out_of_executor_context() {
- let res = DefaultExecutor::current().spawn(Box::new(lazy(|| Ok(()))));
- assert!(res.is_err());
+mod out_of_executor_context {
+ use super::*;
+
+ fn test<F, E>(spawn: F)
+ where
+ F: Fn(Box<Future<Item=(), Error=()> + Send>) -> Result<(), E>,
+ {
+ let res = spawn(Box::new(lazy(|| Ok(()))));
+ assert!(res.is_err());
+ }
+
+ #[test]
+ fn spawn() {
+ test(|f| DefaultExecutor::current().spawn(f));
+ }
+
+ #[test]
+ fn execute() {
+ use futures::future::Executor as FuturesExecutor;
+ test(|f| DefaultExecutor::current().execute(f));
+ }
}