summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-03-21 14:30:18 -0700
committerGitHub <noreply@github.com>2019-03-21 14:30:18 -0700
commitb1172f8074b381b543ff15e23e3092fc5dc6de7d (patch)
tree6b0ffa87d724f01166ed4c65f40c3fc413e76cfe
parentcdde2e7a273cbab2085b822efcf54c6bec822681 (diff)
executor: add TypedExecutor (#993)
Adds a `TypedExecutor` trait that describes how to spawn futures of a specific type. This is useful for implementing functions that are generic over an executor and wish to support both `Send` and `!Send` cases.
-rw-r--r--Cargo.toml3
-rw-r--r--ci/azure-patch-crates.yml16
-rw-r--r--ci/azure-test-stable.yml8
-rw-r--r--ci/azure-tsan.yml6
-rw-r--r--tokio-current-thread/Cargo.toml2
-rw-r--r--tokio-current-thread/src/lib.rs19
-rw-r--r--tokio-executor/Cargo.toml3
-rw-r--r--tokio-executor/src/error.rs50
-rw-r--r--tokio-executor/src/executor.rs151
-rw-r--r--tokio-executor/src/global.rs13
-rw-r--r--tokio-executor/src/lib.rs221
-rw-r--r--tokio-executor/src/typed.rs181
-rw-r--r--tokio-executor/tests/executor.rs3
-rw-r--r--tokio-threadpool/Cargo.toml2
-rw-r--r--tokio-threadpool/src/sender.rs13
-rw-r--r--tokio/Cargo.toml2
-rw-r--r--tokio/src/executor/mod.rs2
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs9
-rw-r--r--tokio/src/runtime/threadpool/task_executor.rs9
19 files changed, 499 insertions, 214 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 1c403e43..70e264be 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,3 +21,6 @@ members = [
"tokio-udp",
"tokio-uds",
]
+
+[patch.crates-io]
+tokio-executor = { path = "tokio-executor" }
diff --git a/ci/azure-patch-crates.yml b/ci/azure-patch-crates.yml
new file mode 100644
index 00000000..7bf96e60
--- /dev/null
+++ b/ci/azure-patch-crates.yml
@@ -0,0 +1,16 @@
+steps:
+ - script: |
+ set -e
+
+ # Remove any existing patch statements
+ mv Cargo.toml Cargo.toml.bck
+ sed -n '/\[patch.crates-io\]/q;p' Cargo.toml.bck > Cargo.toml
+
+ # Patch all crates
+ cat ci/patch.toml >> Cargo.toml
+
+ # Print `Cargo.toml` for debugging
+ echo "~~~~ Cargo.toml ~~~~"
+ cat Cargo.toml
+ echo "~~~~~~~~~~~~~~~~~~~~"
+ displayName: Patch Cargo.toml
diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml
index af378d5e..c385be26 100644
--- a/ci/azure-test-stable.yml
+++ b/ci/azure-test-stable.yml
@@ -27,13 +27,7 @@ jobs:
displayName: cargo test -p ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
- - script: |
- set -e
- cat ci/patch.toml >> Cargo.toml
- echo "~~~~ Cargo.toml ~~~~"
- cat Cargo.toml
- echo "~~~~~~~~~~~~~~~~~~~~"
- displayName: Patch Cargo.toml
+ - template: azure-patch-crates.yml
- ${{ each crate in parameters.crates }}:
- script: cargo test
diff --git a/ci/azure-tsan.yml b/ci/azure-tsan.yml
index 9395f668..519960f3 100644
--- a/ci/azure-tsan.yml
+++ b/ci/azure-tsan.yml
@@ -14,14 +14,10 @@ jobs:
parameters:
rust_version: nightly-2018-11-18
+ - template: azure-patch-crates.yml
- script: |
set -e
- cat ci/patch.toml >> Cargo.toml
- echo "~~~~ Cargo.toml ~~~~"
- cat Cargo.toml
- echo "~~~~~~~~~~~~~~~~~~~~"
-
# Make sure the benchmarks compile
export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0"
export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan"
diff --git a/tokio-current-thread/Cargo.toml b/tokio-current-thread/Cargo.toml
index f144c74b..32ed491e 100644
--- a/tokio-current-thread/Cargo.toml
+++ b/tokio-current-thread/Cargo.toml
@@ -21,5 +21,5 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[dependencies]
-tokio-executor = "0.1.5"
+tokio-executor = { version = "0.1.5", path = "../tokio-executor" }
futures = "0.1.19"
diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs
index b2199faa..3491e815 100644
--- a/tokio-current-thread/src/lib.rs
+++ b/tokio-current-thread/src/lib.rs
@@ -431,6 +431,16 @@ impl tokio_executor::Executor for CurrentThread {
}
}
+impl<T> tokio_executor::TypedExecutor<T> for CurrentThread
+where
+ T: Future<Item = (), Error = ()> + 'static,
+{
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ self.borrow().spawn_local(Box::new(future), false);
+ Ok(())
+ }
+}
+
impl<P: Park> fmt::Debug for CurrentThread<P> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("CurrentThread")
@@ -742,6 +752,15 @@ impl tokio_executor::Executor for TaskExecutor {
}
}
+impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor
+where
+ F: Future<Item = (), Error = ()> + 'static,
+{
+ fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
+ self.spawn_local(Box::new(future))
+ }
+}
+
impl<F> Executor<F> for TaskExecutor
where
F: Future<Item = (), Error = ()> + 'static,
diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml
index 320df263..5f006fe8 100644
--- a/tokio-executor/Cargo.toml
+++ b/tokio-executor/Cargo.toml
@@ -23,3 +23,6 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
crossbeam-utils = "0.6.2"
futures = "0.1.19"
+
+[dev-dependencies]
+tokio = { version = "0.1.17", path = "../tokio" }
diff --git a/tokio-executor/src/error.rs b/tokio-executor/src/error.rs
new file mode 100644
index 00000000..579239f0
--- /dev/null
+++ b/tokio-executor/src/error.rs
@@ -0,0 +1,50 @@
+use std::error::Error;
+use std::fmt;
+
+/// Errors returned by `Executor::spawn`.
+///
+/// Spawn errors should represent relatively rare scenarios. Currently, the two
+/// scenarios represented by `SpawnError` are:
+///
+/// * An executor being at capacity or full. As such, the executor is not able
+/// to accept a new future. This error state is expected to be transient.
+/// * An executor has been shutdown and can no longer accept new futures. This
+/// error state is expected to be permanent.
+#[derive(Debug)]
+pub struct SpawnError {
+ is_shutdown: bool,
+}
+
+impl SpawnError {
+ /// Return a new `SpawnError` reflecting a shutdown executor failure.
+ pub fn shutdown() -> Self {
+ SpawnError { is_shutdown: true }
+ }
+
+ /// Return a new `SpawnError` reflecting an executor at capacity failure.
+ pub fn at_capacity() -> Self {
+ SpawnError { is_shutdown: false }
+ }
+
+ /// Returns `true` if the error reflects a shutdown executor failure.
+ pub fn is_shutdown(&self) -> bool {
+ self.is_shutdown
+ }
+
+ /// Returns `true` if the error reflects an executor at capacity failure.
+ pub fn is_at_capacity(&self) -> bool {
+ !self.is_shutdown
+ }
+}
+
+impl fmt::Display for SpawnError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl Error for SpawnError {
+ fn description(&self) -> &str {
+ "attempted to spawn task while the executor is at capacity or shut down"
+ }
+}
diff --git a/tokio-executor/src/executor.rs b/tokio-executor/src/executor.rs
new file mode 100644
index 00000000..11276d0e
--- /dev/null
+++ b/tokio-executor/src/executor.rs
@@ -0,0 +1,151 @@
+use futures::Future;
+use SpawnError;
+
+/// A value that executes futures.
+///
+/// The [`spawn`] function is used to submit a future to an executor. Once
+/// submitted, the executor takes ownership of the future and becomes
+/// responsible for driving the future to completion.
+///
+/// The strategy employed by the executor to handle the future is less defined
+/// and is left up to the `Executor` implementation. The `Executor` instance is
+/// expected to call [`poll`] on the future once it has been notified, however
+/// the "when" and "how" can vary greatly.
+///
+/// For example, the executor might be a thread pool, in which case a set of
+/// threads have already been spawned up and the future is inserted into a
+/// queue. A thread will acquire the future and poll it.
+///
+/// The `Executor` trait is only for futures that **are** `Send`. These are most
+/// common. There currently is no trait that describes executors that operate
+/// entirely on the current thread (i.e., are able to spawn futures that are not
+/// `Send`). Note that single threaded executors can still implement `Executor`,
+/// but only futures that are `Send` can be spawned via the trait.
+///
+/// This trait is primarily intended to implemented by executors and used to
+/// back `tokio::spawn`. Libraries and applications **may** use this trait to
+/// bound generics, but doing so will limit usage to futures that implement
+/// `Send`. Instead, libraries and applications are recommended to use
+/// [`TypedExecutor`] as a bound.
+///
+/// # Errors
+///
+/// The [`spawn`] function returns `Result` with an error type of `SpawnError`.
+/// This error type represents the reason that the executor was unable to spawn
+/// the future. The two current represented scenarios are:
+///
+/// * An executor being at capacity or full. As such, the executor is not able
+/// to accept a new future. This error state is expected to be transient.
+/// * An executor has been shutdown and can no longer accept new futures. This
+/// error state is expected to be permanent.
+///
+/// If a caller encounters an at capacity error, the caller should try to shed
+/// load. This can be as simple as dropping the future that was spawned.
+///
+/// If the caller encounters a shutdown error, the caller should attempt to
+/// gracefully shutdown.
+///
+/// # Examples
+///
+/// ```rust
+/// # extern crate futures;
+/// # extern crate tokio_executor;
+/// # use tokio_executor::Executor;
+/// # fn docs(my_executor: &mut Executor) {
+/// use futures::future::lazy;
+/// my_executor.spawn(Box::new(lazy(|| {
+/// println!("running on the executor");
+/// Ok(())
+/// }))).unwrap();
+/// # }
+/// # fn main() {}
+/// ```
+///
+/// [`spawn`]: #tymethod.spawn
+/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
+/// [`TypedExecutor`]: ../trait.TypedExecutor.html
+pub trait Executor {
+ /// Spawns a future object to run on this executor.
+ ///
+ /// `future` is passed to the executor, which will begin running it. The
+ /// future may run on the current thread or another thread at the discretion
+ /// of the `Executor` implementation.
+ ///
+ /// # Panics
+ ///
+ /// Implementations are encouraged to avoid panics. However, panics are
+ /// permitted and the caller should check the implementation specific
+ /// documentation for more details on possible panics.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate futures;
+ /// # extern crate tokio_executor;
+ /// # use tokio_executor::Executor;
+ /// # fn docs(my_executor: &mut Executor) {
+ /// use futures::future::lazy;
+ /// my_executor.spawn(Box::new(lazy(|| {
+ /// println!("running on the executor");
+ /// Ok(())
+ /// }))).unwrap();
+ /// # }
+ /// # fn main() {}
+ /// ```
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError>;
+
+ /// Provides a best effort **hint** to whether or not `spawn` will succeed.
+ ///
+ /// This function may return both false positives **and** false negatives.
+ /// If `status` returns `Ok`, then a call to `spawn` will *probably*
+ /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
+ /// *probably* fail, but may succeed.
+ ///
+ /// This allows a caller to avoid creating the task if the call to `spawn`
+ /// has a high likelihood of failing.
+ ///
+ /// # Panics
+ ///
+ /// This function must not panic. Implementers must ensure that panics do
+ /// not happen.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate futures;
+ /// # extern crate tokio_executor;
+ /// # use tokio_executor::Executor;
+ /// # fn docs(my_executor: &mut Executor) {
+ /// use futures::future::lazy;
+ ///
+ /// if my_executor.status().is_ok() {
+ /// my_executor.spawn(Box::new(lazy(|| {
+ /// println!("running on the executor");
+ /// Ok(())
+ /// }))).unwrap();
+ /// } else {
+ /// println!("the executor is not in a good state");
+ /// }
+ /// # }
+ /// # fn main() {}
+ /// ```
+ fn status(&self) -> Result<(), SpawnError> {
+ Ok(())
+ }
+}
+
+impl<E: Executor + ?Sized> Executor for Box<E> {
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ (**self).spawn(future)
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ (**self).status()
+ }
+}
diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs
index 852f6db8..47f7505c 100644
--- a/tokio-executor/src/global.rs
+++ b/tokio-executor/src/global.rs
@@ -84,6 +84,19 @@ impl super::Executor for DefaultExecutor {
}
}
+impl<T> super::TypedExecutor<T> for DefaultExecutor
+where
+ T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ super::Executor::spawn(self, Box::new(future))
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ super::Executor::status(self)
+ }
+}
+
impl<T> future::Executor<T> for DefaultExecutor
where
T: Future<Item = (), Error = ()> + Send + 'static,
diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs
index db9c1781..bcb4b23f 100644
--- a/tokio-executor/src/lib.rs
+++ b/tokio-executor/src/lib.rs
@@ -17,8 +17,12 @@
//! This crate provides traits and utilities that are necessary for building an
//! executor, including:
//!
-//! * The [`Executor`] trait describes the API for spawning a future onto an
-//! executor.
+//! * The [`Executor`] trait spawns future object onto an executor.
+//!
+//! * The [`TypedExecutor`] trait spawns futures of a specific type onto an
+//! executor. This is used to be generic over executors that spawn futures
+//! that are either `Send` or `!Send` or implement executors that apply to
+//! specific futures.
//!
//! * [`enter`] marks that the current thread is entering an execution
//! context. This prevents a second executor from accidentally starting from
@@ -29,7 +33,19 @@
//!
//! * [`Park`] abstracts over blocking and unblocking the current thread.
//!
+//! # Implementing an executor
+//!
+//! Executors should always implement `TypedExecutor`. This usually is the bound
+//! that applications and libraries will use when generic over an executor. See
+//! the [trait documentation][`TypedExecutor`] for more details.
+//!
+//! If the executor is able to spawn all futures that are `Send`, then the
+//! executor should also implement the `Executor` trait. This trait is rarely
+//! used directly by applications and libraries. Instead, `tokio::spawn` is
+//! configured to dispatch to type that implements `Executor`.
+//!
//! [`Executor`]: trait.Executor.html
+//! [`TypedExecutor`]: trait.TypedExecutor.html
//! [`enter`]: fn.enter.html
//! [`DefaultExecutor`]: struct.DefaultExecutor.html
//! [`Park`]: park/index.html
@@ -39,203 +55,14 @@ extern crate crossbeam_utils;
extern crate futures;
mod enter;
+mod error;
+mod executor;
mod global;
pub mod park;
+mod typed;
pub use enter::{enter, Enter, EnterError};
+pub use error::SpawnError;
+pub use executor::Executor;
pub use global::{spawn, with_default, DefaultExecutor};
-
-use futures::Future;
-
-use std::error::Error;
-use std::fmt;
-
-/// A value that executes futures.
-///
-/// The [`spawn`] function is used to submit a future to an executor. Once
-/// submitted, the executor takes ownership of the future and becomes
-/// responsible for driving the future to completion.
-///
-/// The strategy employed by the executor to handle the future is less defined
-/// and is left up to the `Executor` implementation. The `Executor` instance is
-/// expected to call [`poll`] on the future once it has been notified, however
-/// the "when" and "how" can vary greatly.
-///
-/// For example, the executor might be a thread pool, in which case a set of
-/// threads have already been spawned up and the future is inserted into a
-/// queue. A thread will acquire the future and poll it.
-///
-/// The `Executor` trait is only for futures that **are** `Send`. These are most
-/// common. There currently is no trait that describes executors that operate
-/// entirely on the current thread (i.e., are able to spawn futures that are not
-/// `Send`). Note that single threaded executors can still implement `Executor`,
-/// but only futures that are `Send` can be spawned via the trait.
-///
-/// # Errors
-///
-/// The [`spawn`] function returns `Result` with an error type of `SpawnError`.
-/// This error type represents the reason that the executor was unable to spawn
-/// the future. The two current represented scenarios are:
-///
-/// * An executor being at capacity or full. As such, the executor is not able
-/// to accept a new future. This error state is expected to be transient.
-/// * An executor has been shutdown and can no longer accept new futures. This
-/// error state is expected to be permanent.
-///
-/// If a caller encounters an at capacity error, the caller should try to shed
-/// load. This can be as simple as dropping the future that was spawned.
-///
-/// If the caller encounters a shutdown error, the caller should attempt to
-/// gracefully shutdown.
-///
-/// # Examples
-///
-/// ```rust
-/// # extern crate futures;
-/// # extern crate tokio_executor;
-/// # use tokio_executor::Executor;
-/// # fn docs(my_executor: &mut Executor) {
-/// use futures::future::lazy;
-/// my_executor.spawn(Box::new(lazy(|| {
-/// println!("running on the executor");
-/// Ok(())
-/// }))).unwrap();
-/// # }
-/// # fn main() {}
-/// ```
-///
-/// [`spawn`]: #tymethod.spawn
-/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
-pub trait Executor {
- /// Spawns a future object to run on this executor.
- ///
- /// `future` is passed to the executor, which will begin running it. The
- /// future may run on the current thread or another thread at the discretion
- /// of the `Executor` implementation.
- ///
- /// # Panics
- ///
- /// Implementers are encouraged to avoid panics. However, a panic is
- /// permitted and the caller should check the implementation specific
- /// documentation for more details on possible panics.
- ///
- /// # Examples
- ///
- /// ```rust
- /// # extern crate futures;
- /// # extern crate tokio_executor;
- /// # use tokio_executor::Executor;
- /// # fn docs(my_executor: &mut Executor) {
- /// use futures::future::lazy;
- /// my_executor.spawn(Box::new(lazy(|| {
- /// println!("running on the executor");
- /// Ok(())
- /// }))).unwrap();
- /// # }
- /// # fn main() {}
- /// ```
- fn spawn(
- &mut self,
- future: Box<Future<Item = (), Error = ()> + Send>,
- ) -> Result<(), SpawnError>;
-
- /// Provides a best effort **hint** to whether or not `spawn` will succeed.
- ///
- /// This function may return both false positives **and** false negatives.
- /// If `status` returns `Ok`, then a call to `spawn` will *probably*
- /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
- /// *probably* fail, but may succeed.
- ///
- /// This allows a caller to avoid creating the task if the call to `spawn`
- /// has a high likelihood of failing.
- ///
- /// # Panics
- ///
- /// This function must not panic. Implementers must ensure that panics do
- /// not happen.
- ///
- /// # Examples
- ///
- /// ```rust
- /// # extern crate futures;
- /// # extern crate tokio_executor;
- /// # use tokio_executor::Executor;
- /// # fn docs(my_executor: &mut Executor) {
- /// use futures::future::lazy;
- ///
- /// if my_executor.status().is_ok() {
- /// my_executor.spawn(Box::new(lazy(|| {
- /// println!("running on the executor");
- /// Ok(())
- /// }))).unwrap();
- /// } else {
- /// println!("the executor is not in a good state");
- /// }
- /// # }
- /// # fn main() {}
- /// ```
- fn status(&self) -> Result<(), SpawnError> {
- Ok(())
- }
-}
-
-impl<E: Executor + ?Sized> Executor for Box<E> {
- fn spawn(
- &mut self,
- future: Box<Future<Item = (), Error = ()> + Send>,
- ) -> Result<(), SpawnError> {
- (**self).spawn(future)
- }
-
- fn status(&self) -> Result<(), SpawnError> {
- (**self).status()
- }
-}
-
-/// Errors returned by `Executor::spawn`.
-///
-/// Spawn errors should represent relatively rare scenarios. Currently, the two
-/// scenarios represented by `SpawnError` are:
-///
-/// * An executor being at capacity or full. As such, the executor is not able
-/// to accept a new future. This error state is expected to be transient.
-/// * An executor has been shutdown and can no longer accept new futures. This
-/// error state is expected to be permanent.
-#[derive(Debug)]
-pub struct SpawnError {
- is_shutdown: bool,
-}
-
-impl SpawnError {
- /// Return a new `SpawnError` reflecting a shutdown executor failure.
- pub fn shutdown() -> Self {
- SpawnError { is_shutdown: true }
- }
-
- /// Return a new `SpawnError` reflecting an executor at capacity failure.
- pub fn at_capacity() -> Self {
- SpawnError { is_shutdown: false }
- }
-
- /// Returns `true` if the error reflects a shutdown executor failure.
- pub fn is_shutdown(&self) -> bool {
- self.is_shutdown
- }
-
- /// Returns `true` if the error reflects an executor at capacity failure.
- pub fn is_at_capacity(&self) -> bool {
- !self.is_shutdown
- }
-}
-
-impl fmt::Display for SpawnError {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- write!(fmt, "{}", self.description())
- }
-}
-
-impl Error for SpawnError {
- fn description(&self) -> &str {
- "attempted to spawn task while the executor is at capacity or shut down"
- }
-}
+pub use typed::TypedExecutor;
diff --git a/tokio-executor/src/typed.rs b/tokio-executor/src/typed.rs
new file mode 100644
index 00000000..22edf29d
--- /dev/null
+++ b/tokio-executor/src/typed.rs
@@ -0,0 +1,181 @@
+use SpawnError;
+
+/// A value that spawns futures of a specific type.
+///
+/// The trait is generic over `T`: the type of future that can be spawened. This
+/// is useful for implementing an executor that is only able to spawn a specific
+/// type of future.
+///
+/// The [`spawn`] function is used to submit the future to the executor. Once
+/// submitted, the executor takes ownership of the future and becomes
+/// responsible for driving the future to completion.
+///
+/// This trait is useful as a bound for applications and libraries in order to
+/// be generic over futures that are `Send` vs. `!Send`.
+///
+/// # Examples
+///
+/// Consider a function that provides an API for draining a `Stream` in the
+/// background. To do this, a task must be spawned to perform the draining. As
+/// such, the function takes a stream and an executor on which the background
+/// task is spawned.
+///
+/// ```rust
+/// #[macro_use]
+/// extern crate futures;
+/// extern crate tokio;
+///
+/// use futures::{Future, Stream, Poll};
+/// use tokio::executor::TypedExecutor;
+/// use tokio::sync::oneshot;
+///
+/// pub fn drain<T, E>(stream: T, executor: &mut E)
+/// -> impl Future<Item = (), Error = ()>
+/// where
+/// T: Stream,
+/// E: TypedExecutor<Drain<T>>
+/// {
+/// let (tx, rx) = oneshot::channel();
+///
+/// executor.spawn(Drain {
+/// stream,
+/// tx: Some(tx),
+/// }).unwrap();
+///
+/// rx.map_err(|_| ())
+/// }
+///
+/// // The background task
+/// pub struct Drain<T: Stream> {
+/// stream: T,
+/// tx: Option<oneshot::Sender<()>>,
+/// }
+///
+/// impl<T: Stream> Future for Drain<T> {
+/// type Item = ();
+/// type Error = ();
+///
+/// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+/// loop {
+/// let item = try_ready!(
+/// self.stream.poll()
+/// .map_err(|_| ())
+/// );
+///
+/// if item.is_none() { break; }
+/// }
+///
+/// self.tx.take().unwrap().send(()).map_err(|_| ());
+/// Ok(().into())
+/// }
+/// }
+/// # pub fn main() {}
+/// ```
+///
+/// By doing this, the `drain` fn can accept a stream that is `!Send` as long as
+/// the supplied executor is able to spawn `!Send` types.
+pub trait TypedExecutor<T> {
+ /// Spawns a future to run on this executor.
+ ///
+ /// `future` is passed to the executor, which will begin running it. The
+ /// executor takes ownership of the future and becomes responsible for
+ /// driving the future to completion.
+ ///
+ /// # Panics
+ ///
+ /// Implementations are encouraged to avoid panics. However, panics are
+ /// permitted and the caller should check the implementation specific
+ /// documentation for more details on possible panics.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate futures;
+ /// # extern crate tokio_executor;
+ /// # use tokio_executor::TypedExecutor;
+ /// # use futures::{Future, Poll};
+ /// fn example<T>(my_executor: &mut T)
+ /// where
+ /// T: TypedExecutor<MyFuture>,
+ /// {
+ /// my_executor.spawn(MyFuture).unwrap();
+ /// }
+ ///
+ /// struct MyFuture;
+ ///
+ /// impl Future for MyFuture {
+ /// type Item = ();
+ /// type Error = ();
+ ///
+ /// fn poll(&mut self) -> Poll<(), ()> {
+ /// println!("running on the executor");
+ /// Ok(().into())
+ /// }
+ /// }
+ /// # fn main() {}
+ /// ```
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError>;
+
+ /// Provides a best effort **hint** to whether or not `spawn` will succeed.
+ ///
+ /// This function may return both false positives **and** false negatives.
+ /// If `status` returns `Ok`, then a call to `spawn` will *probably*
+ /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
+ /// *probably* fail, but may succeed.
+ ///
+ /// This allows a caller to avoid creating the task if the call to `spawn`
+ /// has a high likelihood of failing.
+ ///
+ /// # Panics
+ ///
+ /// This function must not panic. Implementers must ensure that panics do
+ /// not happen.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate futures;
+ /// # extern crate tokio_executor;
+ /// # use tokio_executor::TypedExecutor;
+ /// # use futures::{Future, Poll};
+ /// fn example<T>(my_executor: &mut T)
+ /// where
+ /// T: TypedExecutor<MyFuture>,
+ /// {
+ /// if my_executor.status().is_ok() {
+ /// my_executor.spawn(MyFuture).unwrap();
+ /// } else {
+ /// println!("the executor is not in a good state");
+ /// }
+ /// }
+ ///
+ /// struct MyFuture;
+ ///
+ /// impl Future for MyFuture {
+ /// type Item = ();
+ /// type Error = ();
+ ///
+ /// fn poll(&mut self) -> Poll<(), ()> {
+ /// println!("running on the executor");
+ /// Ok(().into())
+ /// }
+ /// }
+ /// # fn main() {}
+ /// ```
+ fn status(&self) -> Result<(), SpawnError> {
+ Ok(())
+ }
+}
+
+impl<E, T> TypedExecutor<T> for Box<E>
+where
+ E: TypedExecutor<T>,
+{
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ (**self).spawn(future)
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ (**self).status()
+ }
+}
diff --git a/tokio-executor/tests/executor.rs b/tokio-executor/tests/executor.rs
index fdfb7735..bb020243 100644
--- a/tokio-executor/tests/executor.rs
+++ b/tokio-executor/tests/executor.rs
@@ -2,10 +2,11 @@ extern crate futures;
extern crate tokio_executor;
use futures::{future::lazy, Future};
-use tokio_executor::*;
+use tokio_executor::DefaultExecutor;
mod out_of_executor_context {
use super::*;
+ use tokio_executor::Executor;
fn test<F, E>(spawn: F)
where
diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml
index 43a2c6f3..18c757b0 100644
--- a/tokio-threadpool/Cargo.toml
+++ b/tokio-threadpool/Cargo.toml
@@ -20,7 +20,7 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[dependencies]
-tokio-executor = "0.1.2"
+tokio-executor = { version = "0.1.5", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-deque = "0.7.0"
crossbeam-queue = "0.1.0"
diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs
index 15befd43..fadd0cba 100644
--- a/tokio-threadpool/src/sender.rs
+++ b/tokio-threadpool/src/sender.rs
@@ -176,6 +176,19 @@ impl<'a> tokio_executor::Executor for &'a Sender {
}
}
+impl<T> tokio_executor::TypedExecutor<T> for Sender
+where
+ T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn status(&self) -> Result<(), tokio_executor::SpawnError> {
+ tokio_executor::Executor::status(self)
+ }
+
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ tokio_executor::Executor::spawn(self, Box::new(future))
+ }
+}
+
impl<T> future::Executor<T> for Sender
where
T: Future<Item = (), Error = ()> + Send + 'static,
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 45a7dc68..76477817 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -71,7 +71,7 @@ tokio-codec = { version = "0.1.0", optional = true }
tokio-current-thread = { version = "0.1.3", optional = true }
tokio-fs = { version = "0.1.6", optional = true }
tokio-io = { version = "0.1.6", optional = true }
-tokio-executor = { version = "0.1.5", optional = true }
+tokio-executor = { version = "0.1.5", optional = true, path = "../tokio-executor" }
tokio-reactor = { version = "0.1.1", o