diff options
author | Carl Lerche <me@carllerche.com> | 2019-03-21 14:30:18 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-21 14:30:18 -0700 |
commit | b1172f8074b381b543ff15e23e3092fc5dc6de7d (patch) | |
tree | 6b0ffa87d724f01166ed4c65f40c3fc413e76cfe | |
parent | cdde2e7a273cbab2085b822efcf54c6bec822681 (diff) |
executor: add TypedExecutor (#993)
Adds a `TypedExecutor` trait that describes how to spawn futures of a specific
type. This is useful for implementing functions that are generic over an executor
and wish to support both `Send` and `!Send` cases.
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | ci/azure-patch-crates.yml | 16 | ||||
-rw-r--r-- | ci/azure-test-stable.yml | 8 | ||||
-rw-r--r-- | ci/azure-tsan.yml | 6 | ||||
-rw-r--r-- | tokio-current-thread/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio-current-thread/src/lib.rs | 19 | ||||
-rw-r--r-- | tokio-executor/Cargo.toml | 3 | ||||
-rw-r--r-- | tokio-executor/src/error.rs | 50 | ||||
-rw-r--r-- | tokio-executor/src/executor.rs | 151 | ||||
-rw-r--r-- | tokio-executor/src/global.rs | 13 | ||||
-rw-r--r-- | tokio-executor/src/lib.rs | 221 | ||||
-rw-r--r-- | tokio-executor/src/typed.rs | 181 | ||||
-rw-r--r-- | tokio-executor/tests/executor.rs | 3 | ||||
-rw-r--r-- | tokio-threadpool/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio-threadpool/src/sender.rs | 13 | ||||
-rw-r--r-- | tokio/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio/src/executor/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/runtime.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/threadpool/task_executor.rs | 9 |
19 files changed, 499 insertions, 214 deletions
@@ -21,3 +21,6 @@ members = [ "tokio-udp", "tokio-uds", ] + +[patch.crates-io] +tokio-executor = { path = "tokio-executor" } diff --git a/ci/azure-patch-crates.yml b/ci/azure-patch-crates.yml new file mode 100644 index 00000000..7bf96e60 --- /dev/null +++ b/ci/azure-patch-crates.yml @@ -0,0 +1,16 @@ +steps: + - script: | + set -e + + # Remove any existing patch statements + mv Cargo.toml Cargo.toml.bck + sed -n '/\[patch.crates-io\]/q;p' Cargo.toml.bck > Cargo.toml + + # Patch all crates + cat ci/patch.toml >> Cargo.toml + + # Print `Cargo.toml` for debugging + echo "~~~~ Cargo.toml ~~~~" + cat Cargo.toml + echo "~~~~~~~~~~~~~~~~~~~~" + displayName: Patch Cargo.toml diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index af378d5e..c385be26 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -27,13 +27,7 @@ jobs: displayName: cargo test -p ${{ crate }} workingDirectory: $(Build.SourcesDirectory)/${{ crate }} - - script: | - set -e - cat ci/patch.toml >> Cargo.toml - echo "~~~~ Cargo.toml ~~~~" - cat Cargo.toml - echo "~~~~~~~~~~~~~~~~~~~~" - displayName: Patch Cargo.toml + - template: azure-patch-crates.yml - ${{ each crate in parameters.crates }}: - script: cargo test diff --git a/ci/azure-tsan.yml b/ci/azure-tsan.yml index 9395f668..519960f3 100644 --- a/ci/azure-tsan.yml +++ b/ci/azure-tsan.yml @@ -14,14 +14,10 @@ jobs: parameters: rust_version: nightly-2018-11-18 + - template: azure-patch-crates.yml - script: | set -e - cat ci/patch.toml >> Cargo.toml - echo "~~~~ Cargo.toml ~~~~" - cat Cargo.toml - echo "~~~~~~~~~~~~~~~~~~~~" - # Make sure the benchmarks compile export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" diff --git a/tokio-current-thread/Cargo.toml b/tokio-current-thread/Cargo.toml index f144c74b..32ed491e 100644 --- a/tokio-current-thread/Cargo.toml +++ b/tokio-current-thread/Cargo.toml @@ -21,5 +21,5 @@ keywords = ["futures", "tokio"] categories = ["concurrency", "asynchronous"] [dependencies] -tokio-executor = "0.1.5" +tokio-executor = { version = "0.1.5", path = "../tokio-executor" } futures = "0.1.19" diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs index b2199faa..3491e815 100644 --- a/tokio-current-thread/src/lib.rs +++ b/tokio-current-thread/src/lib.rs @@ -431,6 +431,16 @@ impl tokio_executor::Executor for CurrentThread { } } +impl<T> tokio_executor::TypedExecutor<T> for CurrentThread +where + T: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + self.borrow().spawn_local(Box::new(future), false); + Ok(()) + } +} + impl<P: Park> fmt::Debug for CurrentThread<P> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("CurrentThread") @@ -742,6 +752,15 @@ impl tokio_executor::Executor for TaskExecutor { } } +impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor +where + F: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: F) -> Result<(), SpawnError> { + self.spawn_local(Box::new(future)) + } +} + impl<F> Executor<F> for TaskExecutor where F: Future<Item = (), Error = ()> + 'static, diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml index 320df263..5f006fe8 100644 --- a/tokio-executor/Cargo.toml +++ b/tokio-executor/Cargo.toml @@ -23,3 +23,6 @@ categories = ["concurrency", "asynchronous"] [dependencies] crossbeam-utils = "0.6.2" futures = "0.1.19" + +[dev-dependencies] +tokio = { version = "0.1.17", path = "../tokio" } diff --git a/tokio-executor/src/error.rs b/tokio-executor/src/error.rs new file mode 100644 index 00000000..579239f0 --- /dev/null +++ b/tokio-executor/src/error.rs @@ -0,0 +1,50 @@ +use std::error::Error; +use std::fmt; + +/// Errors returned by `Executor::spawn`. +/// +/// Spawn errors should represent relatively rare scenarios. Currently, the two +/// scenarios represented by `SpawnError` are: +/// +/// * An executor being at capacity or full. As such, the executor is not able +/// to accept a new future. This error state is expected to be transient. +/// * An executor has been shutdown and can no longer accept new futures. This +/// error state is expected to be permanent. +#[derive(Debug)] +pub struct SpawnError { + is_shutdown: bool, +} + +impl SpawnError { + /// Return a new `SpawnError` reflecting a shutdown executor failure. + pub fn shutdown() -> Self { + SpawnError { is_shutdown: true } + } + + /// Return a new `SpawnError` reflecting an executor at capacity failure. + pub fn at_capacity() -> Self { + SpawnError { is_shutdown: false } + } + + /// Returns `true` if the error reflects a shutdown executor failure. + pub fn is_shutdown(&self) -> bool { + self.is_shutdown + } + + /// Returns `true` if the error reflects an executor at capacity failure. + pub fn is_at_capacity(&self) -> bool { + !self.is_shutdown + } +} + +impl fmt::Display for SpawnError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for SpawnError { + fn description(&self) -> &str { + "attempted to spawn task while the executor is at capacity or shut down" + } +} diff --git a/tokio-executor/src/executor.rs b/tokio-executor/src/executor.rs new file mode 100644 index 00000000..11276d0e --- /dev/null +++ b/tokio-executor/src/executor.rs @@ -0,0 +1,151 @@ +use futures::Future; +use SpawnError; + +/// A value that executes futures. +/// +/// The [`spawn`] function is used to submit a future to an executor. Once +/// submitted, the executor takes ownership of the future and becomes +/// responsible for driving the future to completion. +/// +/// The strategy employed by the executor to handle the future is less defined +/// and is left up to the `Executor` implementation. The `Executor` instance is +/// expected to call [`poll`] on the future once it has been notified, however +/// the "when" and "how" can vary greatly. +/// +/// For example, the executor might be a thread pool, in which case a set of +/// threads have already been spawned up and the future is inserted into a +/// queue. A thread will acquire the future and poll it. +/// +/// The `Executor` trait is only for futures that **are** `Send`. These are most +/// common. There currently is no trait that describes executors that operate +/// entirely on the current thread (i.e., are able to spawn futures that are not +/// `Send`). Note that single threaded executors can still implement `Executor`, +/// but only futures that are `Send` can be spawned via the trait. +/// +/// This trait is primarily intended to implemented by executors and used to +/// back `tokio::spawn`. Libraries and applications **may** use this trait to +/// bound generics, but doing so will limit usage to futures that implement +/// `Send`. Instead, libraries and applications are recommended to use +/// [`TypedExecutor`] as a bound. +/// +/// # Errors +/// +/// The [`spawn`] function returns `Result` with an error type of `SpawnError`. +/// This error type represents the reason that the executor was unable to spawn +/// the future. The two current represented scenarios are: +/// +/// * An executor being at capacity or full. As such, the executor is not able +/// to accept a new future. This error state is expected to be transient. +/// * An executor has been shutdown and can no longer accept new futures. This +/// error state is expected to be permanent. +/// +/// If a caller encounters an at capacity error, the caller should try to shed +/// load. This can be as simple as dropping the future that was spawned. +/// +/// If the caller encounters a shutdown error, the caller should attempt to +/// gracefully shutdown. +/// +/// # Examples +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_executor; +/// # use tokio_executor::Executor; +/// # fn docs(my_executor: &mut Executor) { +/// use futures::future::lazy; +/// my_executor.spawn(Box::new(lazy(|| { +/// println!("running on the executor"); +/// Ok(()) +/// }))).unwrap(); +/// # } +/// # fn main() {} +/// ``` +/// +/// [`spawn`]: #tymethod.spawn +/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +/// [`TypedExecutor`]: ../trait.TypedExecutor.html +pub trait Executor { + /// Spawns a future object to run on this executor. + /// + /// `future` is passed to the executor, which will begin running it. The + /// future may run on the current thread or another thread at the discretion + /// of the `Executor` implementation. + /// + /// # Panics + /// + /// Implementations are encouraged to avoid panics. However, panics are + /// permitted and the caller should check the implementation specific + /// documentation for more details on possible panics. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::Executor; + /// # fn docs(my_executor: &mut Executor) { + /// use futures::future::lazy; + /// my_executor.spawn(Box::new(lazy(|| { + /// println!("running on the executor"); + /// Ok(()) + /// }))).unwrap(); + /// # } + /// # fn main() {} + /// ``` + fn spawn( + &mut self, + future: Box<Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError>; + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + /// + /// # Panics + /// + /// This function must not panic. Implementers must ensure that panics do + /// not happen. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::Executor; + /// # fn docs(my_executor: &mut Executor) { + /// use futures::future::lazy; + /// + /// if my_executor.status().is_ok() { + /// my_executor.spawn(Box::new(lazy(|| { + /// println!("running on the executor"); + /// Ok(()) + /// }))).unwrap(); + /// } else { + /// println!("the executor is not in a good state"); + /// } + /// # } + /// # fn main() {} + /// ``` + fn status(&self) -> Result<(), SpawnError> { + Ok(()) + } +} + +impl<E: Executor + ?Sized> Executor for Box<E> { + fn spawn( + &mut self, + future: Box<Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + (**self).spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + (**self).status() + } +} diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs index 852f6db8..47f7505c 100644 --- a/tokio-executor/src/global.rs +++ b/tokio-executor/src/global.rs @@ -84,6 +84,19 @@ impl super::Executor for DefaultExecutor { } } +impl<T> super::TypedExecutor<T> for DefaultExecutor +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + super::Executor::spawn(self, Box::new(future)) + } + + fn status(&self) -> Result<(), SpawnError> { + super::Executor::status(self) + } +} + impl<T> future::Executor<T> for DefaultExecutor where T: Future<Item = (), Error = ()> + Send + 'static, diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs index db9c1781..bcb4b23f 100644 --- a/tokio-executor/src/lib.rs +++ b/tokio-executor/src/lib.rs @@ -17,8 +17,12 @@ //! This crate provides traits and utilities that are necessary for building an //! executor, including: //! -//! * The [`Executor`] trait describes the API for spawning a future onto an -//! executor. +//! * The [`Executor`] trait spawns future object onto an executor. +//! +//! * The [`TypedExecutor`] trait spawns futures of a specific type onto an +//! executor. This is used to be generic over executors that spawn futures +//! that are either `Send` or `!Send` or implement executors that apply to +//! specific futures. //! //! * [`enter`] marks that the current thread is entering an execution //! context. This prevents a second executor from accidentally starting from @@ -29,7 +33,19 @@ //! //! * [`Park`] abstracts over blocking and unblocking the current thread. //! +//! # Implementing an executor +//! +//! Executors should always implement `TypedExecutor`. This usually is the bound +//! that applications and libraries will use when generic over an executor. See +//! the [trait documentation][`TypedExecutor`] for more details. +//! +//! If the executor is able to spawn all futures that are `Send`, then the +//! executor should also implement the `Executor` trait. This trait is rarely +//! used directly by applications and libraries. Instead, `tokio::spawn` is +//! configured to dispatch to type that implements `Executor`. +//! //! [`Executor`]: trait.Executor.html +//! [`TypedExecutor`]: trait.TypedExecutor.html //! [`enter`]: fn.enter.html //! [`DefaultExecutor`]: struct.DefaultExecutor.html //! [`Park`]: park/index.html @@ -39,203 +55,14 @@ extern crate crossbeam_utils; extern crate futures; mod enter; +mod error; +mod executor; mod global; pub mod park; +mod typed; pub use enter::{enter, Enter, EnterError}; +pub use error::SpawnError; +pub use executor::Executor; pub use global::{spawn, with_default, DefaultExecutor}; - -use futures::Future; - -use std::error::Error; -use std::fmt; - -/// A value that executes futures. -/// -/// The [`spawn`] function is used to submit a future to an executor. Once -/// submitted, the executor takes ownership of the future and becomes -/// responsible for driving the future to completion. -/// -/// The strategy employed by the executor to handle the future is less defined -/// and is left up to the `Executor` implementation. The `Executor` instance is -/// expected to call [`poll`] on the future once it has been notified, however -/// the "when" and "how" can vary greatly. -/// -/// For example, the executor might be a thread pool, in which case a set of -/// threads have already been spawned up and the future is inserted into a -/// queue. A thread will acquire the future and poll it. -/// -/// The `Executor` trait is only for futures that **are** `Send`. These are most -/// common. There currently is no trait that describes executors that operate -/// entirely on the current thread (i.e., are able to spawn futures that are not -/// `Send`). Note that single threaded executors can still implement `Executor`, -/// but only futures that are `Send` can be spawned via the trait. -/// -/// # Errors -/// -/// The [`spawn`] function returns `Result` with an error type of `SpawnError`. -/// This error type represents the reason that the executor was unable to spawn -/// the future. The two current represented scenarios are: -/// -/// * An executor being at capacity or full. As such, the executor is not able -/// to accept a new future. This error state is expected to be transient. -/// * An executor has been shutdown and can no longer accept new futures. This -/// error state is expected to be permanent. -/// -/// If a caller encounters an at capacity error, the caller should try to shed -/// load. This can be as simple as dropping the future that was spawned. -/// -/// If the caller encounters a shutdown error, the caller should attempt to -/// gracefully shutdown. -/// -/// # Examples -/// -/// ```rust -/// # extern crate futures; -/// # extern crate tokio_executor; -/// # use tokio_executor::Executor; -/// # fn docs(my_executor: &mut Executor) { -/// use futures::future::lazy; -/// my_executor.spawn(Box::new(lazy(|| { -/// println!("running on the executor"); -/// Ok(()) -/// }))).unwrap(); -/// # } -/// # fn main() {} -/// ``` -/// -/// [`spawn`]: #tymethod.spawn -/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll -pub trait Executor { - /// Spawns a future object to run on this executor. - /// - /// `future` is passed to the executor, which will begin running it. The - /// future may run on the current thread or another thread at the discretion - /// of the `Executor` implementation. - /// - /// # Panics - /// - /// Implementers are encouraged to avoid panics. However, a panic is - /// permitted and the caller should check the implementation specific - /// documentation for more details on possible panics. - /// - /// # Examples - /// - /// ```rust - /// # extern crate futures; - /// # extern crate tokio_executor; - /// # use tokio_executor::Executor; - /// # fn docs(my_executor: &mut Executor) { - /// use futures::future::lazy; - /// my_executor.spawn(Box::new(lazy(|| { - /// println!("running on the executor"); - /// Ok(()) - /// }))).unwrap(); - /// # } - /// # fn main() {} - /// ``` - fn spawn( - &mut self, - future: Box<Future<Item = (), Error = ()> + Send>, - ) -> Result<(), SpawnError>; - - /// Provides a best effort **hint** to whether or not `spawn` will succeed. - /// - /// This function may return both false positives **and** false negatives. - /// If `status` returns `Ok`, then a call to `spawn` will *probably* - /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will - /// *probably* fail, but may succeed. - /// - /// This allows a caller to avoid creating the task if the call to `spawn` - /// has a high likelihood of failing. - /// - /// # Panics - /// - /// This function must not panic. Implementers must ensure that panics do - /// not happen. - /// - /// # Examples - /// - /// ```rust - /// # extern crate futures; - /// # extern crate tokio_executor; - /// # use tokio_executor::Executor; - /// # fn docs(my_executor: &mut Executor) { - /// use futures::future::lazy; - /// - /// if my_executor.status().is_ok() { - /// my_executor.spawn(Box::new(lazy(|| { - /// println!("running on the executor"); - /// Ok(()) - /// }))).unwrap(); - /// } else { - /// println!("the executor is not in a good state"); - /// } - /// # } - /// # fn main() {} - /// ``` - fn status(&self) -> Result<(), SpawnError> { - Ok(()) - } -} - -impl<E: Executor + ?Sized> Executor for Box<E> { - fn spawn( - &mut self, - future: Box<Future<Item = (), Error = ()> + Send>, - ) -> Result<(), SpawnError> { - (**self).spawn(future) - } - - fn status(&self) -> Result<(), SpawnError> { - (**self).status() - } -} - -/// Errors returned by `Executor::spawn`. -/// -/// Spawn errors should represent relatively rare scenarios. Currently, the two -/// scenarios represented by `SpawnError` are: -/// -/// * An executor being at capacity or full. As such, the executor is not able -/// to accept a new future. This error state is expected to be transient. -/// * An executor has been shutdown and can no longer accept new futures. This -/// error state is expected to be permanent. -#[derive(Debug)] -pub struct SpawnError { - is_shutdown: bool, -} - -impl SpawnError { - /// Return a new `SpawnError` reflecting a shutdown executor failure. - pub fn shutdown() -> Self { - SpawnError { is_shutdown: true } - } - - /// Return a new `SpawnError` reflecting an executor at capacity failure. - pub fn at_capacity() -> Self { - SpawnError { is_shutdown: false } - } - - /// Returns `true` if the error reflects a shutdown executor failure. - pub fn is_shutdown(&self) -> bool { - self.is_shutdown - } - - /// Returns `true` if the error reflects an executor at capacity failure. - pub fn is_at_capacity(&self) -> bool { - !self.is_shutdown - } -} - -impl fmt::Display for SpawnError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.description()) - } -} - -impl Error for SpawnError { - fn description(&self) -> &str { - "attempted to spawn task while the executor is at capacity or shut down" - } -} +pub use typed::TypedExecutor; diff --git a/tokio-executor/src/typed.rs b/tokio-executor/src/typed.rs new file mode 100644 index 00000000..22edf29d --- /dev/null +++ b/tokio-executor/src/typed.rs @@ -0,0 +1,181 @@ +use SpawnError; + +/// A value that spawns futures of a specific type. +/// +/// The trait is generic over `T`: the type of future that can be spawened. This +/// is useful for implementing an executor that is only able to spawn a specific +/// type of future. +/// +/// The [`spawn`] function is used to submit the future to the executor. Once +/// submitted, the executor takes ownership of the future and becomes +/// responsible for driving the future to completion. +/// +/// This trait is useful as a bound for applications and libraries in order to +/// be generic over futures that are `Send` vs. `!Send`. +/// +/// # Examples +/// +/// Consider a function that provides an API for draining a `Stream` in the +/// background. To do this, a task must be spawned to perform the draining. As +/// such, the function takes a stream and an executor on which the background +/// task is spawned. +/// +/// ```rust +/// #[macro_use] +/// extern crate futures; +/// extern crate tokio; +/// +/// use futures::{Future, Stream, Poll}; +/// use tokio::executor::TypedExecutor; +/// use tokio::sync::oneshot; +/// +/// pub fn drain<T, E>(stream: T, executor: &mut E) +/// -> impl Future<Item = (), Error = ()> +/// where +/// T: Stream, +/// E: TypedExecutor<Drain<T>> +/// { +/// let (tx, rx) = oneshot::channel(); +/// +/// executor.spawn(Drain { +/// stream, +/// tx: Some(tx), +/// }).unwrap(); +/// +/// rx.map_err(|_| ()) +/// } +/// +/// // The background task +/// pub struct Drain<T: Stream> { +/// stream: T, +/// tx: Option<oneshot::Sender<()>>, +/// } +/// +/// impl<T: Stream> Future for Drain<T> { +/// type Item = (); +/// type Error = (); +/// +/// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +/// loop { +/// let item = try_ready!( +/// self.stream.poll() +/// .map_err(|_| ()) +/// ); +/// +/// if item.is_none() { break; } +/// } +/// +/// self.tx.take().unwrap().send(()).map_err(|_| ()); +/// Ok(().into()) +/// } +/// } +/// # pub fn main() {} +/// ``` +/// +/// By doing this, the `drain` fn can accept a stream that is `!Send` as long as +/// the supplied executor is able to spawn `!Send` types. +pub trait TypedExecutor<T> { + /// Spawns a future to run on this executor. + /// + /// `future` is passed to the executor, which will begin running it. The + /// executor takes ownership of the future and becomes responsible for + /// driving the future to completion. + /// + /// # Panics + /// + /// Implementations are encouraged to avoid panics. However, panics are + /// permitted and the caller should check the implementation specific + /// documentation for more details on possible panics. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::TypedExecutor; + /// # use futures::{Future, Poll}; + /// fn example<T>(my_executor: &mut T) + /// where + /// T: TypedExecutor<MyFuture>, + /// { + /// my_executor.spawn(MyFuture).unwrap(); + /// } + /// + /// struct MyFuture; + /// + /// impl Future for MyFuture { + /// type Item = (); + /// type Error = (); + /// + /// fn poll(&mut self) -> Poll<(), ()> { + /// println!("running on the executor"); + /// Ok(().into()) + /// } + /// } + /// # fn main() {} + /// ``` + fn spawn(&mut self, future: T) -> Result<(), SpawnError>; + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + /// + /// # Panics + /// + /// This function must not panic. Implementers must ensure that panics do + /// not happen. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::TypedExecutor; + /// # use futures::{Future, Poll}; + /// fn example<T>(my_executor: &mut T) + /// where + /// T: TypedExecutor<MyFuture>, + /// { + /// if my_executor.status().is_ok() { + /// my_executor.spawn(MyFuture).unwrap(); + /// } else { + /// println!("the executor is not in a good state"); + /// } + /// } + /// + /// struct MyFuture; + /// + /// impl Future for MyFuture { + /// type Item = (); + /// type Error = (); + /// + /// fn poll(&mut self) -> Poll<(), ()> { + /// println!("running on the executor"); + /// Ok(().into()) + /// } + /// } + /// # fn main() {} + /// ``` + fn status(&self) -> Result<(), SpawnError> { + Ok(()) + } +} + +impl<E, T> TypedExecutor<T> for Box<E> +where + E: TypedExecutor<T>, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + (**self).spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + (**self).status() + } +} diff --git a/tokio-executor/tests/executor.rs b/tokio-executor/tests/executor.rs index fdfb7735..bb020243 100644 --- a/tokio-executor/tests/executor.rs +++ b/tokio-executor/tests/executor.rs @@ -2,10 +2,11 @@ extern crate futures; extern crate tokio_executor; use futures::{future::lazy, Future}; -use tokio_executor::*; +use tokio_executor::DefaultExecutor; mod out_of_executor_context { use super::*; + use tokio_executor::Executor; fn test<F, E>(spawn: F) where diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 43a2c6f3..18c757b0 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -20,7 +20,7 @@ keywords = ["futures", "tokio"] categories = ["concurrency", "asynchronous"] [dependencies] -tokio-executor = "0.1.2" +tokio-executor = { version = "0.1.5", path = "../tokio-executor" } futures = "0.1.19" crossbeam-deque = "0.7.0" crossbeam-queue = "0.1.0" diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index 15befd43..fadd0cba 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -176,6 +176,19 @@ impl<'a> tokio_executor::Executor for &'a Sender { } } +impl<T> tokio_executor::TypedExecutor<T> for Sender +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn status(&self) -> Result<(), tokio_executor::SpawnError> { + tokio_executor::Executor::status(self) + } + + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + tokio_executor::Executor::spawn(self, Box::new(future)) + } +} + impl<T> future::Executor<T> for Sender where T: Future<Item = (), Error = ()> + Send + 'static, diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 45a7dc68..76477817 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -71,7 +71,7 @@ tokio-codec = { version = "0.1.0", optional = true } tokio-current-thread = { version = "0.1.3", optional = true } tokio-fs = { version = "0.1.6", optional = true } tokio-io = { version = "0.1.6", optional = true } -tokio-executor = { version = "0.1.5", optional = true } +tokio-executor = { version = "0.1.5", optional = true, path = "../tokio-executor" } tokio-reactor = { version = "0.1.1", o |