diff options
author | Carl Lerche <me@carllerche.com> | 2018-09-17 22:23:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-17 22:23:48 -0700 |
commit | 4019198706604b107b433499db59d91894aa676e (patch) | |
tree | 8eddb76011bf09fcab0428d330faabe4dd88a482 | |
parent | 24dc85dc5e71526aa74b5587348c973beab7ac47 (diff) |
Add some missing future::Executor implementations (#563)
This adds an implementation of future::Executor for
`executor::DefaultExecutor` and `runtime::current_thread::Handle`.
-rw-r--r-- | src/runtime/current_thread/runtime.rs | 34 | ||||
-rw-r--r-- | tests/runtime.rs | 331 | ||||
-rw-r--r-- | tokio-current-thread/src/lib.rs | 17 | ||||
-rw-r--r-- | tokio-current-thread/tests/current_thread.rs | 435 | ||||
-rw-r--r-- | tokio-executor/src/global.rs | 21 | ||||
-rw-r--r-- | tokio-executor/tests/executor.rs | 27 |
6 files changed, 616 insertions, 249 deletions
diff --git a/src/runtime/current_thread/runtime.rs b/src/runtime/current_thread/runtime.rs index 7df915d3..262cb1e7 100644 --- a/src/runtime/current_thread/runtime.rs +++ b/src/runtime/current_thread/runtime.rs @@ -7,7 +7,7 @@ use tokio_timer::clock::{self, Clock}; use tokio_timer::timer::{self, Timer}; use tokio_executor; -use futures::Future; +use futures::{future, Future}; use std::fmt; use std::error::Error; @@ -42,6 +42,38 @@ impl Handle { where F: Future<Item = (), Error = ()> + Send + 'static { self.0.spawn(future) } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl<T> future::Executor<T> for Handle +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } } /// Error returned by the `run` function. diff --git a/tests/runtime.rs b/tests/runtime.rs index 69251952..ac40a6e0 100644 --- a/tests/runtime.rs +++ b/tests/runtime.rs @@ -11,6 +11,11 @@ use tokio::prelude::future::lazy; use tokio::prelude::*; use tokio::runtime::Runtime; +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + macro_rules! t { ($e:expr) => (match $e { Ok(e) => e, @@ -72,78 +77,130 @@ fn runtime_single_threaded_block_on() { tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); } -#[test] -fn runtime_single_threaded_block_on_all() { - let cnt = Arc::new(Mutex::new(0)); - let c = cnt.clone(); +mod runtime_single_threaded_block_on_all { + use super::*; - let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { - { - let mut x = c.lock().unwrap(); - *x = 1 + *x; - } + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); - // Spawn! - tokio::spawn(lazy(move || { + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { { let mut x = c.lock().unwrap(); *x = 1 + *x; } - Ok::<(), ()>(()) - })); - Ok::<_, ()>("hello") - })).unwrap(); + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); - assert_eq!(2, *cnt.lock().unwrap()); - assert_eq!(msg, "hello"); + Ok::<_, ()>("hello") + })).unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } } -#[test] -fn runtime_single_threaded_racy_spawn() { - let (trigger, exit) = futures::sync::oneshot::channel(); - let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); - let jh = ::std::thread::spawn(move || { - let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - handle_tx.send(rt.handle()).unwrap(); - - // don't exit until we are told to - rt.block_on(exit.map_err(|_| ())).unwrap(); - - // run until all spawned futures (incl. the "exit" signal future) have completed. - rt.run().unwrap(); - }); +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn( + tokio::runtime::current_thread::Handle, + Box<Future<Item=(), Error=()> + Send>, + ), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); - let (tx, rx) = futures::sync::oneshot::channel(); + let (tx, rx) = futures::sync::oneshot::channel(); - let handle = handle_rx.recv().unwrap(); - handle - .spawn(futures::future::lazy(move || { + let handle = handle_rx.recv().unwrap(); + spawn(handle, Box::new(futures::future::lazy(move || { tx.send(()).unwrap(); Ok(()) - })) - .unwrap(); + }))); - // signal runtime thread to exit - trigger.send(()).unwrap(); + // signal runtime thread to exit + trigger.send(()).unwrap(); - // wait for runtime thread to exit - jh.join().unwrap(); + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } - assert_eq!(rx.wait().unwrap(), ()); + #[test] + fn spawn() { + test(|handle, f| { handle.spawn(f).unwrap(); }) + } + + #[test] + fn execute() { + test(|handle, f| { handle.execute(f).unwrap(); }) + } } -#[test] -fn runtime_multi_threaded() { - let _ = env_logger::try_init(); +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); - let mut runtime = tokio::runtime::Builder::new() - .build() - .unwrap(); - runtime.spawn(create_client_server_future()); - runtime.shutdown_on_idle().wait().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .build() + .unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { rt.spawn(create_client_server_future()); }); + } + + #[test] + fn execute() { + test(|rt| { rt.executor().execute(create_client_server_future()).unwrap(); }); + } } + #[test] fn block_on_timer() { use std::time::{Duration, Instant}; @@ -161,35 +218,57 @@ fn block_on_timer() { runtime.shutdown_on_idle().wait().unwrap(); } -#[test] -fn spawn_from_block_on() { - let cnt = Arc::new(Mutex::new(0)); - let c = cnt.clone(); +mod from_block_on { + use super::*; - let mut runtime = Runtime::new().unwrap(); - let msg = runtime - .block_on(lazy(move || { - { - let mut x = c.lock().unwrap(); - *x = 1 + *x; - } + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); - // Spawn! - tokio::spawn(lazy(move || { + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { { let mut x = c.lock().unwrap(); *x = 1 + *x; } - Ok::<(), ()>(()) - })); - Ok::<_, ()>("hello") - })) - .unwrap(); + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } - runtime.shutdown_on_idle().wait().unwrap(); - assert_eq!(2, *cnt.lock().unwrap()); - assert_eq!(msg, "hello"); + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } } #[test] @@ -220,54 +299,94 @@ fn block_waits() { runtime.shutdown_on_idle().wait().unwrap(); } -#[test] -fn spawn_many() { +mod many { + use super::*; + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); - let cnt = Arc::new(Mutex::new(0)); - let mut runtime = Runtime::new().unwrap(); + for _ in 0..ITER { + let c = cnt.clone(); + spawn(&mut runtime, Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + } - for _ in 0..ITER { - let c = cnt.clone(); - runtime.spawn(lazy(move || { - { - let mut x = c.lock().unwrap(); - *x = 1 + *x; - } - Ok::<(), ()>(()) - })); + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); } - runtime.shutdown_on_idle().wait().unwrap(); - assert_eq!(ITER, *cnt.lock().unwrap()); + #[test] + fn spawn() { + test(|rt, f| { rt.spawn(f); }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor() + .execute(f) + .unwrap(); + }) + } } -#[test] -fn spawn_from_block_on_all() { - let cnt = Arc::new(Mutex::new(0)); - let c = cnt.clone(); - let runtime = Runtime::new().unwrap(); - let msg = runtime - .block_on_all(lazy(move || { - { - let mut x = c.lock().unwrap(); - *x = 1 + *x; - } +mod from_block_on_all { + use super::*; - // Spawn! - tokio::spawn(lazy(move || { + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { { let mut x = c.lock().unwrap(); *x = 1 + *x; } - Ok::<(), ()>(()) - })); - Ok::<_, ()>("hello") - })) - .unwrap(); + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } - assert_eq!(2, *cnt.lock().unwrap()); - assert_eq!(msg, "hello"); + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } } diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs index fb2519f9..69b571c7 100644 --- a/tokio-current-thread/src/lib.rs +++ b/tokio-current-thread/src/lib.rs @@ -682,6 +682,23 @@ impl Handle { self.notify.notify(0); Ok(()) } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), SpawnError> { + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + Ok(()) + } } // ===== impl TaskExecutor ===== diff --git a/tokio-current-thread/tests/current_thread.rs b/tokio-current-thread/tests/current_thread.rs index 598587fd..a98ed792 100644 --- a/tokio-current-thread/tests/current_thread.rs +++ b/tokio-current-thread/tests/current_thread.rs @@ -12,28 +12,49 @@ use std::time::Duration; use futures::task; use futures::future::{self, lazy}; +// This is not actually unused --- we need this trait to be in scope for +// the tests that sue TaskExecutor::current().execute(). The compiler +// doesn't realise that. +#[allow(unused_imports)] +use futures::future::Executor as _futures_Executor; use futures::prelude::*; use futures::sync::oneshot; -#[test] -fn spawn_from_block_on_all() { - let cnt = Rc::new(Cell::new(0)); - let c = cnt.clone(); - - let msg = tokio_current_thread::block_on_all(lazy(move || { - c.set(1 + c.get()); +mod from_block_on_all { + use super::*; + fn test<F: Fn(Box<Future<Item=(), Error=()>>) + 'static>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); - // Spawn! - tokio_current_thread::spawn(lazy(move || { + let msg = tokio_current_thread::block_on_all(lazy(move || { c.set(1 + c.get()); - Ok::<(), ()>(()) - })); - Ok::<_, ()>("hello") - })).unwrap(); + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })).unwrap(); - assert_eq!(2, cnt.get()); - assert_eq!(msg, "hello"); + assert_eq!(2, cnt.get()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }); + } } #[test] @@ -76,39 +97,61 @@ fn spawn_many() { assert_eq!(cnt.get(), ITER); } -#[test] -fn does_not_set_global_executor_by_default() { - use tokio_executor::Executor; +mod does_not_set_global_executor_by_default { + use super::*; - block_on_all(lazy(|| { - tokio_executor::DefaultExecutor::current() - .spawn(Box::new(lazy(|| ok()))) - .unwrap_err(); + fn test<F: Fn(Box<Future<Item=(), Error=()> + Send>) -> Result<(), E> + 'static, E>(spawn: F) { + block_on_all(lazy(|| { + spawn(Box::new(lazy(|| ok()))).unwrap_err(); + ok() + })).unwrap() + } - ok() - })).unwrap(); + #[test] + fn spawn() { + use tokio_executor::Executor; + test(|f| tokio_executor::DefaultExecutor::current().spawn(f)) + } + + #[test] + fn execute() { + test(|f| tokio_executor::DefaultExecutor::current().execute(f)) + } } -#[test] -fn spawn_from_block_on_future() { - let cnt = Rc::new(Cell::new(0)); +mod from_block_on_future { + use super::*; - let mut tokio_current_thread = CurrentThread::new(); + fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); - tokio_current_thread.block_on(lazy(|| { - let cnt = cnt.clone(); + let mut tokio_current_thread = CurrentThread::new(); - tokio_current_thread::spawn(lazy(move || { - cnt.set(1 + cnt.get()); - Ok(()) - })); + tokio_current_thread.block_on(lazy(|| { + let cnt = cnt.clone(); - Ok::<_, ()>(()) - })).unwrap(); + spawn(Box::new(lazy(move || { + cnt.set(1 + cnt.get()); + Ok(()) + }))); - tokio_current_thread.run().unwrap(); + Ok::<_, ()>(()) + })).unwrap(); + + tokio_current_thread.run().unwrap(); + + assert_eq!(1, cnt.get()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn); + } - assert_eq!(1, cnt.get()); + #[test] + fn execute() { + test(|f| { tokio_current_thread::TaskExecutor::current().execute(f).unwrap(); }); + } } struct Never(Rc<()>); @@ -122,33 +165,60 @@ impl Future for Never { } } -#[test] -fn outstanding_tasks_are_dropped_when_executor_is_dropped() { - let mut rc = Rc::new(()); +mod outstanding_tasks_are_dropped_when_executor_is_dropped { + use super::*; - let mut tokio_current_thread = CurrentThread::new(); - tokio_current_thread.spawn(Never(rc.clone())); + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item=(), Error=()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>) + { + let mut rc = Rc::new(()); - drop(tokio_current_thread); + let mut tokio_current_thread = CurrentThread::new(); + dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone()))); - // Ensure the daemon is dropped - assert!(Rc::get_mut(&mut rc).is_some()); + drop(tokio_current_thread); - // Using the global spawn fn + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); - let mut rc = Rc::new(()); + // Using the global spawn fn - let mut tokio_current_thread = CurrentThread::new(); + let mut rc = Rc::new(()); - tokio_current_thread.block_on(lazy(|| { - tokio_current_thread::spawn(Never(rc.clone())); - Ok::<_, ()>(()) - })).unwrap(); + let mut tokio_current_thread = CurrentThread::new(); + + tokio_current_thread.block_on(lazy(|| { + spawn(Box::new(Never(rc.clone()))); + Ok::<_, ()>(()) + })).unwrap(); + + drop(tokio_current_thread); - drop(tokio_current_thread); + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); }) + } - // Ensure the daemon is dropped - assert!(Rc::get_mut(&mut rc).is_some()); + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { rt.spawn(f); } + ); + } } #[test] @@ -163,20 +233,41 @@ fn nesting_run() { })).unwrap(); } -#[test] -#[should_panic] -fn run_in_future() { - block_on_all(lazy(|| { - tokio_current_thread::spawn(lazy(|| { - block_on_all(lazy(|| { +mod run_in_future { + use super::*; + + #[test] + #[should_panic] + fn spawn() { + block_on_all(lazy(|| { + tokio_current_thread::spawn(lazy(|| { + block_on_all(lazy(|| { + ok() + })).unwrap(); ok() - })).unwrap(); + })); ok() - })); - ok() - })).unwrap(); + })).unwrap(); + } + + #[test] + #[should_panic] + fn execute() { + block_on_all(lazy(|| { + tokio_current_thread::TaskExecutor::current() + .execute(lazy(|| { + block_on_all(lazy(|| { + ok() + })).unwrap(); + ok() + })) + .unwrap(); + ok() + })).unwrap(); + } } + #[test] fn tick_on_infini_future() { let num = Rc::new(Cell::new(0)); @@ -206,10 +297,8 @@ fn tick_on_infini_future() { assert_eq!(1, num.get()); } -#[test] -fn tasks_are_scheduled_fairly() { - let state = Rc::new(RefCell::new([0, 0])); - +mod tasks_are_scheduled_fairly { + use super::*; struct Spin { state: Rc<RefCell<[i32; 2]>>, idx: usize, @@ -243,97 +332,171 @@ fn tasks_are_scheduled_fairly() { } } - block_on_all(lazy(|| { - tokio_current_thread::spawn(Spin { - state: state.clone(), - idx: 0, - }); + fn test<F: Fn(Spin)>(spawn: F) { + let state = Rc::new(RefCell::new([0, 0])); - tokio_current_thread::spawn(Spin { - state: state, - idx: 1, - }); + block_on_all(lazy(|| { + spawn(Spin { + state: state.clone(), + idx: 0, + }); - ok() - })).unwrap(); + spawn(Spin { + state: state, + idx: 1, + }); + + ok() + })).unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }) + } } -#[test] -fn spawn_and_turn() { - let cnt = Rc::new(Cell::new(0)); - let c = cnt.clone(); +mod and_turn { + use super::*; - let mut tokio_current_thread = CurrentThread::new(); + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item=(), Error=()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>) + { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); - // Spawn a basic task to get the executor to turn - tokio_current_thread.spawn(lazy(move || { - Ok(()) - })); + let mut tokio_current_thread = CurrentThread::new(); - // Turn once... - tokio_current_thread.turn(None).unwrap(); + // Spawn a basic task to get the executor to turn + dotspawn(&mut tokio_current_thread, Box::new(lazy(move || { + Ok(()) + }))); - tokio_current_thread.spawn(lazy(move || { - c.set(1 + c.get()); + // Turn once... + tokio_current_thread.turn(None).unwrap(); - // Spawn! - tokio_current_thread::spawn(lazy(move || { + dotspawn(&mut tokio_current_thread, Box::new(lazy(move || { c.set(1 + c.get()); - Ok::<(), ()>(()) - })); - Ok(()) - })); + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); - // This does not run the newly spawned thread - tokio_current_thread.turn(None).unwrap(); - assert_eq!(1, cnt.get()); + Ok(()) + }))); - // This runs the newly spawned thread - tokio_current_thread.turn(None).unwrap(); - assert_eq!(2, cnt.get()); -} + // This does not run the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(1, cnt.get()); -#[test] -fn spawn_in_drop() { - let mut tokio_current_thread = CurrentThread::new(); + // This runs the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(2, cnt.get()); + } - let (tx, rx) = oneshot::channel(); + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); }) + } - tokio_current_thread.spawn({ - struct OnDrop<F: FnOnce()>(Option<F>); + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { rt.spawn(f); } + ); + } - impl<F: FnOnce()> Drop for OnDrop<F> { - fn drop(&mut self) { - (self.0.take().unwrap())(); - } + +} + +mod in_drop { + use super::*; + struct OnDrop<F: FnOnce()>(Option<F>); + + impl<F: FnOnce()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0.take().unwrap())(); } + } + + struct MyFuture { + _data: Box<Any>, + } + + impl Future for MyFuture { + type Item = (); + type Error = (); - struct MyFuture { - _data: Box<Any>, + fn poll(&mut self) -> Poll<(), ()> { + Ok(().into()) } + } - impl Future for MyFuture { - type Item = (); - type Error = (); + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item=(), Error=()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item=(), Error=()>>) + { + let mut tokio_current_thread = CurrentThread::new(); + + let (tx, rx) = oneshot::channel(); - fn poll(&mut self) -> Poll<(), ()> { - Ok(().into()) + dotspawn(&mut tokio_current_thread, Box::new( + MyFuture { + _data: Box::new(OnDrop(Some(move || { + spawn(Box::new(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + }))); + }))), } - } + )); - MyFuture { - _data: Box::new(OnDrop(Some(move || { - tokio_current_thread::spawn(lazy(move || { - tx.send(()).unwrap(); - Ok(()) - })); - }))), - } - }); + tokio_current_thread.block_on(rx).unwrap(); + tokio_current_thread.run().unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { rt.spawn(f); }) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { rt.spawn(f); } + ); + } - tokio_current_thread.block_on(rx).unwrap(); - tokio_current_thread.run().unwrap(); } #[test] diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs index 24867cd5..1420deaf 100644 --- a/tokio-executor/src/global.rs +++ b/tokio-executor/src/global.rs @@ -1,6 +1,6 @@ use super::{Executor, Enter, SpawnError}; -use futures::Future; +use futures::{future, Future}; use std::cell::Cell; @@ -83,6 +83,25 @@ impl super::Executor for DefaultExecutor { } } +impl<T> future::Executor<T> for DefaultExecutor +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = super::Executor::status(self) { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future))); + Ok(()) + } +} + // ===== global spawn fns ===== /// Submits a future for execution on the default executor -- usually a diff --git a/tokio-executor/tests/executor.rs b/tokio-executor/tests/executor.rs index 77436ec9..0c7269fd 100644 --- a/tokio-executor/tests/executor.rs +++ b/tokio-executor/tests/executor.rs @@ -2,10 +2,27 @@ extern crate tokio_executor; extern crate futures; use tokio_executor::*; -use futures::future::lazy; +use futures::{Future, future::lazy}; -#[test] -fn spawn_out_of_executor_context() { - let res = DefaultExecutor::current().spawn(Box::new(lazy(|| Ok(())))); - assert!(res.is_err()); +mod out_of_executor_context { + use super::*; + + fn test<F, E>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) -> Result<(), E>, + { + let res = spawn(Box::new(lazy(|| Ok(())))); + assert!(res.is_err()); + } + + #[test] + fn spawn() { + test(|f| DefaultExecutor::current().spawn(f)); + } + + #[test] + fn execute() { + use futures::future::Executor as FuturesExecutor; + test(|f| DefaultExecutor::current().execute(f)); + } } |