diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/builder.rs | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/builder.rs')
-rw-r--r-- | tokio/src/runtime/builder.rs | 218 |
1 files changed, 123 insertions, 95 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 793146f7..4f36a027 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,10 @@ -use crate::loom::sync::Arc; -use crate::runtime::handle::{self, Handle}; +use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Runtime}; +use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; use std::fmt; +#[cfg(not(loom))] +use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -39,6 +40,12 @@ pub struct Builder { /// The task execution model to use. kind: Kind, + /// Whether or not to enable the I/O driver + enable_io: bool, + + /// Whether or not to enable the time driver + enable_time: bool, + /// The number of worker threads. /// /// Only used when not using the current-thread executor. @@ -51,13 +58,13 @@ pub struct Builder { pub(super) thread_stack_size: Option<usize>, /// Callback to run after each thread starts. - after_start: Option<Callback>, + pub(super) after_start: Option<Callback>, /// To run before each worker thread stops - before_stop: Option<Callback>, + pub(super) before_stop: Option<Callback>, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum Kind { Shell, #[cfg(feature = "rt-core")] @@ -66,12 +73,6 @@ enum Kind { ThreadPool, } -#[cfg(not(loom))] -type Callback = Arc<dyn Fn() + Send + Sync>; - -#[cfg(loom)] -type Callback = Arc<Box<dyn Fn() + Send + Sync>>; - impl Builder { /// Returns a new runtime builder initialized with default configuration /// values. @@ -82,6 +83,12 @@ impl Builder { // No task execution by default kind: Kind::Shell, + // I/O defaults to "off" + enable_io: false, + + // Time defaults to "off" + enable_time: false, + // Default to use an equal number of threads to number of CPU cores num_threads: crate::loom::sys::num_cpus(), @@ -97,6 +104,31 @@ impl Builder { } } + /// Enable both I/O and time drivers. + /// + /// Doing this is a shorthand for calling `enable_io` and `enable_time` + /// individually. If additional components are added to Tokio in the future, + /// `enable_all` will include these future components. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_all() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_all(&mut self) -> &mut Self { + #[cfg(feature = "io-driver")] + self.enable_io(); + #[cfg(feature = "time")] + self.enable_time(); + + self + } + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. /// /// This must be a number between 1 and 32,768 though it is advised to keep @@ -107,14 +139,12 @@ impl Builder { /// # Examples /// /// ``` - /// # use tokio::runtime; + /// use tokio::runtime; /// - /// # pub fn main() { /// let rt = runtime::Builder::new() /// .num_threads(4) /// .build() /// .unwrap(); - /// # } /// ``` pub fn num_threads(&mut self, val: usize) -> &mut Self { self.num_threads = val; @@ -194,14 +224,14 @@ impl Builder { /// /// # pub fn main() { /// let runtime = runtime::Builder::new() - /// .after_start(|| { + /// .on_thread_start(|| { /// println!("thread started"); /// }) /// .build(); /// # } /// ``` #[cfg(not(loom))] - pub fn after_start<F>(&mut self, f: F) -> &mut Self + pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, { @@ -220,14 +250,14 @@ impl Builder { /// /// # pub fn main() { /// let runtime = runtime::Builder::new() - /// .before_stop(|| { + /// .on_thread_stop(|| { /// println!("thread stopping"); /// }) /// .build(); /// # } /// ``` #[cfg(not(loom))] - pub fn before_stop<F>(&mut self, f: F) -> &mut Self + pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, { @@ -266,21 +296,21 @@ impl Builder { let clock = time::create_clock(); // Create I/O driver - let (io_driver, handle) = io::create_driver()?; - let io_handles = vec![handle]; + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - let (driver, handle) = time::create_driver(io_driver, clock.clone()); - let time_handles = vec![handle]; + let spawner = Spawner::Shell; - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = + blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { kind: Kind::Shell(Shell::new(driver)), handle: Handle { - kind: handle::Kind::Shell, - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, blocking_spawner, }, @@ -289,6 +319,53 @@ impl Builder { } } +cfg_io_driver! { + impl Builder { + /// Enable the I/O driver. + /// + /// Doing this enables using net, process, signal, and some I/O types on + /// the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_io() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_io(&mut self) -> &mut Self { + self.enable_io = true; + self + } + } +} + +cfg_time! { + impl Builder { + /// Enable the time driver. + /// + /// Doing this enables using `tokio::time` on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_time() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_time(&mut self) -> &mut Self { + self.enable_time = true; + self + } + } +} + cfg_rt_core! { impl Builder { fn build_basic_runtime(&mut self) -> io::Result<Runtime> { @@ -297,29 +374,27 @@ cfg_rt_core! { let clock = time::create_clock(); // Create I/O driver - let (io_driver, handle) = io::create_driver()?; - let io_handles = vec![handle]; + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, handle) = time::create_driver(io_driver, clock.clone()); - let time_handles = vec![handle]; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. let scheduler = BasicScheduler::new(driver); - let spawner = scheduler.spawner(); + let spawner = Spawner::Basic(scheduler.spawner()); // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { kind: Kind::Basic(scheduler), handle: Handle { - kind: handle::Kind::Basic(spawner), - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, blocking_spawner, }, @@ -333,77 +408,30 @@ cfg_rt_threaded! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { use crate::runtime::{Kind, ThreadPool}; - use std::sync::Mutex; + use crate::runtime::park::Parker; let clock = time::create_clock(); - let mut io_handles = Vec::new(); - let mut time_handles = Vec::new(); - let mut drivers = Vec::new(); - - for _ in 0..self.num_threads { - // Create I/O driver and handle - let (io_driver, handle) = io::create_driver()?; - io_handles.push(handle); - - // Create a new timer. - let (time_driver, handle) = time::create_driver(io_driver, clock.clone()); - time_handles.push(handle); - drivers.push(Mutex::new(Some(time_driver))); - } + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (scheduler, workers) = ThreadPool::new(self.num_threads, Parker::new(driver)); + let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); - let scheduler = { - let clock = clock.clone(); - let io_handles = io_handles.clone(); - let time_handles = time_handles.clone(); - - let after_start = self.after_start.clone(); - let before_stop = self.before_stop.clone(); - - let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| { - // Configure the I/O driver - let _io = io::set_default(&io_handles[index]); - - // Configure time - time::with_default(&time_handles[index], &clock, || { - // Call the start callback - if let Some(after_start) = after_start.as_ref() { - after_start(); - } - - // Run the worker - next(); - - // Call the after call back - if let Some(before_stop) = before_stop.as_ref() { - before_stop(); - } - }) - }) - as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>); - - ThreadPool::new( - self.num_threads, - blocking_pool.spawner().clone(), - around_worker, - move |index| drivers[index].lock().unwrap().take().unwrap(), - ) - }; - - let spawner = scheduler.spawner().clone(); + // Spawn the thread pool workers + workers.spawn(&blocking_spawner); Ok(Runtime { kind: Kind::ThreadPool(scheduler), handle: Handle { - kind: handle::Kind::ThreadPool(spawner), - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, - blocking_spawner, + blocking_spawner: blocking_spawner.clone(), }, blocking_pool, }) |