diff options
author | Sean McArthur <sean@seanmonstar.com> | 2019-01-04 11:42:33 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-04 11:42:33 -0800 |
commit | 76198f63d73019311cc09d870b8397e46d8c0f2d (patch) | |
tree | 5621e6ed5daa984bbd637887d5f73d43b97e5fb2 /src | |
parent | 39dc5706b71898f5127704a4c323fa28b29154af (diff) |
Provide optional features on tokio crate (#808)
Disabling all features means the only dependency is `futures`.
Relevant pieces of the API can then be enabled with the following features:
- `codec`
- `fs`
- `io`
- `reactor`
- `tcp`
- `timer`
- `udp`
- `uds`
This also introduces the beginnings of enabling only certain pieces of the `Runtime`. As a start, the entire default runtime API is enabled via the `rt-full` feature.
Diffstat (limited to 'src')
-rw-r--r-- | src/io.rs | 1 | ||||
-rw-r--r-- | src/lib.rs | 46 | ||||
-rw-r--r-- | src/net.rs | 12 | ||||
-rw-r--r-- | src/prelude.rs | 1 | ||||
-rw-r--r-- | src/runtime/current_thread/mod.rs | 15 | ||||
-rw-r--r-- | src/runtime/mod.rs | 401 | ||||
-rw-r--r-- | src/runtime/threadpool/builder.rs (renamed from src/runtime/builder.rs) | 2 | ||||
-rw-r--r-- | src/runtime/threadpool/mod.rs | 395 | ||||
-rw-r--r-- | src/runtime/threadpool/shutdown.rs (renamed from src/runtime/shutdown.rs) | 2 | ||||
-rw-r--r-- | src/runtime/threadpool/task_executor.rs (renamed from src/runtime/task_executor.rs) | 0 | ||||
-rw-r--r-- | src/util/future.rs | 6 | ||||
-rw-r--r-- | src/util/stream.rs | 4 |
12 files changed, 480 insertions, 405 deletions
@@ -51,6 +51,7 @@ pub use tokio_io::{ }; // standard input, output, and error +#[cfg(feature = "fs")] pub use tokio_fs::{ stdin, Stdin, @@ -72,42 +72,72 @@ //! } //! ``` -extern crate bytes; -#[macro_use] +macro_rules! if_runtime { + ($($i:item)*) => ($( + #[cfg(any(feature = "rt-full"))] + $i + )*) +} + +#[cfg_attr(feature = "rt-full", macro_use)] extern crate futures; + +#[cfg(feature = "io")] +extern crate bytes; +#[cfg(feature = "reactor")] extern crate mio; +#[cfg(feature = "rt-full")] extern crate num_cpus; +#[cfg(feature = "rt-full")] extern crate tokio_current_thread; +#[cfg(feature = "io")] extern crate tokio_io; -extern crate tokio_executor; +#[cfg(feature = "codec")] extern crate tokio_codec; +#[cfg(feature = "fs")] extern crate tokio_fs; +#[cfg(feature = "reactor")] extern crate tokio_reactor; +#[cfg(feature = "rt-full")] extern crate tokio_threadpool; +#[cfg(feature = "timer")] extern crate tokio_timer; +#[cfg(feature = "tcp")] extern crate tokio_tcp; +#[cfg(feature = "udp")] extern crate tokio_udp; #[cfg(feature = "async-await-preview")] extern crate tokio_async_await; -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] extern crate tokio_uds; +#[cfg(feature = "timer")] pub mod clock; +#[cfg(feature = "codec")] pub mod codec; -pub mod executor; +#[cfg(feature = "fs")] pub mod fs; +#[cfg(feature = "io")] pub mod io; +#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] pub mod net; pub mod prelude; +#[cfg(feature = "reactor")] pub mod reactor; -pub mod runtime; +#[cfg(feature = "timer")] pub mod timer; pub mod util; -pub use executor::spawn; -pub use runtime::run; +if_runtime! { + extern crate tokio_executor; + pub mod executor; + pub mod runtime; + + pub use executor::spawn; + pub use runtime::run; +} // ===== Experimental async/await support ===== @@ -22,6 +22,7 @@ //! [`UnixDatagram`]: struct.UnixDatagram.html //! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html +#[cfg(feature = "tcp")] pub mod tcp { //! TCP bindings for `tokio`. //! @@ -42,15 +43,19 @@ pub mod tcp { //! [`Incoming`]: struct.Incoming.html pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; } +#[cfg(feature = "tcp")] pub use self::tcp::{TcpListener, TcpStream}; +#[cfg(feature = "tcp")] #[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] #[doc(hidden)] pub type ConnectFuture = self::tcp::ConnectFuture; +#[cfg(feature = "tcp")] #[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] #[doc(hidden)] pub type Incoming = self::tcp::Incoming; +#[cfg(feature = "udp")] pub mod udp { //! UDP bindings for `tokio`. //! @@ -68,16 +73,19 @@ pub mod udp { //! [`framed`]: struct.UdpSocket.html#method.framed pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; } +#[cfg(feature = "udp")] pub use self::udp::{UdpFramed, UdpSocket}; +#[cfg(feature = "udp")] #[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] #[doc(hidden)] pub type RecvDgram<T> = self::udp::RecvDgram<T>; +#[cfg(feature = "udp")] #[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] #[doc(hidden)] pub type SendDgram<T> = self::udp::SendDgram<T>; -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] pub mod unix { //! Unix domain socket bindings for `tokio` (only available on unix systems). @@ -86,5 +94,5 @@ pub mod unix { UnixListener, UnixStream, }; } -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] pub use self::unix::{UnixDatagram, UnixDatagramFramed, UnixListener, UnixStream}; diff --git a/src/prelude.rs b/src/prelude.rs index ecd82fe4..5ca20399 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -10,6 +10,7 @@ //! //! The prelude may grow over time as additional items see ubiquitous use. +#[cfg(feature = "io")] pub use tokio_io::{ AsyncRead, AsyncWrite, diff --git a/src/runtime/current_thread/mod.rs b/src/runtime/current_thread/mod.rs index dca41711..45dc4efe 100644 --- a/src/runtime/current_thread/mod.rs +++ b/src/runtime/current_thread/mod.rs @@ -90,3 +90,18 @@ where r.run().expect("failed to resolve remaining futures"); Ok(v) } + +/// Start a current-thread runtime using the supplied future to bootstrap execution. +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +pub fn run<F>(future: F) +where + F: Future<Item = (), Error = ()> + 'static, +{ + + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + r.spawn(future); + r.run().expect("failed to resolve remaining futures"); +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2ea0d548..9e7b700c 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -112,399 +112,14 @@ //! [`tokio::spawn`]: ../executor/fn.spawn.html //! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html -mod builder; pub mod current_thread; -mod shutdown; -mod task_executor; +mod threadpool; -pub use self::builder::Builder; -pub use self::shutdown::Shutdown; -pub use self::task_executor::TaskExecutor; +pub use self::threadpool::{ + Builder, + Runtime, + Shutdown, + TaskExecutor, + run, +}; -use reactor::{Handle, Reactor}; - -use std::io; -use std::sync::Mutex; - -use tokio_executor::enter; -use tokio_threadpool as threadpool; - -use futures; -use futures::future::Future; - -/// Handle to the Tokio runtime. -/// -/// The Tokio runtime includes a reactor as well as an executor for running -/// tasks. -/// -/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, -/// most users will use [`tokio::run`], which uses a `Runtime` internally. -/// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: index.html -/// [`new`]: #method.new -/// [`Builder`]: struct.Builder.html -/// [`tokio::run`]: fn.run.html -#[derive(Debug)] -pub struct Runtime { - inner: Option<Inner>, -} - -#[derive(Debug)] -struct Inner { - /// A handle to the reactor in the background thread. - reactor_handle: Handle, - - // TODO: This should go away in 0.2 - reactor: Mutex<Option<Reactor>>, - - /// Task execution pool. - pool: threadpool::ThreadPool, -} - -// ===== impl Runtime ===== - -/// Start the Tokio runtime using the supplied future to bootstrap execution. -/// -/// This function is used to bootstrap the execution of a Tokio application. It -/// does the following: -/// -/// * Start the Tokio runtime using a default configuration. -/// * Spawn the given future onto the thread pool. -/// * Block the current thread until the runtime shuts down. -/// -/// Note that the function will not return immediately once `future` has -/// completed. Instead it waits for the entire runtime to become idle. -/// -/// See the [module level][mod] documentation for more details. -/// -/// # Examples -/// -/// ```rust -/// # extern crate tokio; -/// # extern crate futures; -/// # use futures::{Future, Stream}; -/// use tokio::net::TcpListener; -/// -/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { -/// # unimplemented!(); -/// # } -/// # fn dox() { -/// # let addr = "127.0.0.1:8080".parse().unwrap(); -/// let listener = TcpListener::bind(&addr).unwrap(); -/// -/// let server = listener.incoming() -/// .map_err(|e| println!("error = {:?}", e)) -/// .for_each(|socket| { -/// tokio::spawn(process(socket)) -/// }); -/// -/// tokio::run(server); -/// # } -/// # pub fn main() {} -/// ``` -/// -/// # Panics -/// -/// This function panics if called from the context of an executor. -/// -/// [mod]: ../index.html -pub fn run<F>(future: F) -where F: Future<Item = (), Error = ()> + Send + 'static, -{ - // Check enter before creating a new Runtime... - let mut entered = enter().expect("nested tokio::run"); - let mut runtime = Runtime::new().expect("failed to start new Runtime"); - runtime.spawn(future); - entered - .block_on(runtime.shutdown_on_idle()) - .expect("shutdown cannot error") -} - -impl Runtime { - /// Create a new runtime instance with default configuration values. - /// - /// This results in a reactor, thread pool, and timer being initialized. The - /// thread pool will not spawn any worker threads until it needs to, i.e. - /// tasks are scheduled to run. - /// - /// Most users will not need to call this function directly, instead they - /// will use [`tokio::run`](fn.run.html). - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// Creating a new `Runtime` with default configuration values. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn new() -> io::Result<Self> { - Builder::new().build() - } - - #[deprecated(since = "0.1.5", note = "use `reactor` instead")] - #[doc(hidden)] - pub fn handle(&self) -> &Handle { - #[allow(deprecated)] - self.reactor() - } - - /// Return a reference to the reactor handle for this runtime instance. - /// - /// The returned handle reference can be cloned in order to get an owned - /// value of the handle. This handle can be used to initialize I/O resources - /// (like TCP or UDP sockets) that will not be used on the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let reactor_handle = rt.reactor().clone(); - /// - /// // use `reactor_handle` - /// ``` - #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")] - pub fn reactor(&self) -> &Handle { - let mut reactor = self.inner().reactor.lock().unwrap(); - if let Some(reactor) = reactor.take() { - if let Ok(background) = reactor.background() { - background.forget(); - } - } - - &self.inner().reactor_handle - } - - /// Return a handle to the runtime's executor. - /// - /// The returned handle can be used to spawn tasks that run on this runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let executor_handle = rt.executor(); - /// - /// // use `executor_handle` - /// ``` - pub fn executor(&self) -> TaskExecutor { - let inner = self.inner().pool.sender().clone(); - TaskExecutor { inner } - } - - /// Spawn a future onto the Tokio runtime. - /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ```rust - /// # extern crate tokio; - /// # extern crate futures; - /// # use futures::{future, Future, Stream}; - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|| { - /// println!("now running on a worker thread"); - /// Ok(()) - /// })); - /// # } - /// # pub fn main() {} - /// ``` - /// - /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the executor - /// is currently at capacity and is unable to spawn a new future. - pub fn spawn<F>(&mut self, future: F) -> &mut Self - where F: Future<Item = (), Error = ()> + Send + 'static, - { - self.inner_mut().pool.sender().spawn(future).unwrap(); - self - } - - /// Run a future to completion on the Tokio runtime. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// This method should not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the executor is at capacity, if the provided - /// future panics, or if called within an asynchronous execution context. - pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> - where - F: Send + 'static + Future<Item = R, Error = E>, - R: Send + 'static, - E: Send + 'static, - { - let mut entered = enter().expect("nested block_on"); - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - entered.block_on(rx).unwrap() - } - - /// Run a future to completion on the Tokio runtime, then wait for all - /// background futures to complete too. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, waiting for background futures to complete, and yielding - /// its resolved result. Any tasks or timers which the future spawns - /// internally will be executed on the runtime and waited for completion. - /// - /// This method should not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the executor is at capacity, if the provided - /// future panics, or if called within an asynchronous execution context. - pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> - where - F: Send + 'static + Future<Item = R, Error = E>, - R: Send + 'static, - E: Send + 'static, - { - let mut entered = enter().expect("nested block_on_all"); - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - let block = rx - .map_err(|_| unreachable!()) - .and_then(move |r| { - self.shutdown_on_idle() - .map(move |()| r) - }); - entered.block_on(block).unwrap() - } - - /// Signals the runtime to shutdown once it becomes idle. - /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// - /// This function can be used to perform a graceful shutdown of the runtime. - /// - /// The runtime enters an idle state once **all** of the following occur. - /// - /// * The thread pool has no tasks to execute, i.e., all tasks that were - /// spawned have completed. - /// * The reactor is not managing any I/O resources. - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_on_idle() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn shutdown_on_idle(mut self) -> Shutdown { - let inner = self.inner.take().unwrap(); - let inner = inner.pool.shutdown_on_idle(); - Shutdown { inner } - } - - /// Signals the runtime to shutdown immediately. - /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// - /// This function will forcibly shutdown the runtime, causing any - /// in-progress work to become canceled. The shutdown steps are: - /// - /// * Drain any scheduled work queues. - /// * Drop any futures that have not yet completed. - /// * Drop the reactor. - /// - /// Once the reactor has dropped, any outstanding I/O resources bound to - /// that reactor will no longer function. Calling any method on them will - /// result in an error. - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn shutdown_now(mut self) -> Shutdown { - let inner = self.inner.take().unwrap(); - Shutdown::shutdown_now(inner) - } - - fn inner(&self) -> &Inner { - self.inner.as_ref().unwrap() - } - - fn inner_mut(&mut self) -> &mut Inner { - self.inner.as_mut().unwrap() - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - let shutdown = Shutdown::shutdown_now(inner); - let _ = shutdown.wait(); - } - } -} diff --git a/src/runtime/builder.rs b/src/runtime/threadpool/builder.rs index 8a5dca77..f7af4493 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -1,4 +1,4 @@ -use runtime::{Inner, Runtime}; +use super::{Inner, Runtime}; use reactor::Reactor; diff --git a/src/runtime/threadpool/mod.rs b/src/runtime/threadpool/mod.rs new file mode 100644 index 00000000..77d89498 --- /dev/null +++ b/src/runtime/threadpool/mod.rs @@ -0,0 +1,395 @@ +mod builder; +mod shutdown; +mod task_executor; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Handle, Reactor}; + +use std::io; +use std::sync::Mutex; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + /// A handle to the reactor in the background thread. + reactor_handle: Handle, + + // TODO: This should go away in 0.2 + reactor: Mutex<Option<Reactor>>, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run<F>(future: F) +where F: Future<Item = (), Error = ()> + Send + 'static, +{ + // Check enter before creating a new Runtime... + let mut entered = enter().expect("nested tokio::run"); + let mut runtime = Runtime::new().expect("failed to start new Runtime"); + runtime.spawn(future); + entered + .block_on(runtime.shutdown_on_idle()) + .expect("shutdown cannot error") +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result<Self> { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + #[allow(deprecated)] + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")] + pub fn reactor(&self) -> &Handle { + let mut reactor = self.inner().reactor.lock().unwrap(); + if let Some(reactor) = reactor.take() { + if let Ok(background) = reactor.background() { + background.forget(); + } + } + + &self.inner().reactor_handle + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + entered.block_on(rx).unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on_all"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + let block = rx + .map_err(|_| unreachable!()) + .and_then(move |r| { + self.shutdown_on_idle() + .map(move |()| r) + }); + entered.block_on(block).unwrap() + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + let inner = inner.pool.shutdown_on_idle(); + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() |