summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/builder.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-21 23:28:39 -0800
committerGitHub <noreply@github.com>2019-11-21 23:28:39 -0800
commit8546ff826db8dba1e39b4119ad909fb6cab2492a (patch)
tree0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/builder.rs
parent6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff)
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/builder.rs')
-rw-r--r--tokio/src/runtime/builder.rs218
1 files changed, 123 insertions, 95 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 793146f7..4f36a027 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,9 +1,10 @@
-use crate::loom::sync::Arc;
-use crate::runtime::handle::{self, Handle};
+use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
-use crate::runtime::{blocking, io, time, Runtime};
+use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use std::fmt;
+#[cfg(not(loom))]
+use std::sync::Arc;
/// Builds Tokio Runtime with custom configuration values.
///
@@ -39,6 +40,12 @@ pub struct Builder {
/// The task execution model to use.
kind: Kind,
+ /// Whether or not to enable the I/O driver
+ enable_io: bool,
+
+ /// Whether or not to enable the time driver
+ enable_time: bool,
+
/// The number of worker threads.
///
/// Only used when not using the current-thread executor.
@@ -51,13 +58,13 @@ pub struct Builder {
pub(super) thread_stack_size: Option<usize>,
/// Callback to run after each thread starts.
- after_start: Option<Callback>,
+ pub(super) after_start: Option<Callback>,
/// To run before each worker thread stops
- before_stop: Option<Callback>,
+ pub(super) before_stop: Option<Callback>,
}
-#[derive(Debug)]
+#[derive(Debug, Clone, Copy)]
enum Kind {
Shell,
#[cfg(feature = "rt-core")]
@@ -66,12 +73,6 @@ enum Kind {
ThreadPool,
}
-#[cfg(not(loom))]
-type Callback = Arc<dyn Fn() + Send + Sync>;
-
-#[cfg(loom)]
-type Callback = Arc<Box<dyn Fn() + Send + Sync>>;
-
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
@@ -82,6 +83,12 @@ impl Builder {
// No task execution by default
kind: Kind::Shell,
+ // I/O defaults to "off"
+ enable_io: false,
+
+ // Time defaults to "off"
+ enable_time: false,
+
// Default to use an equal number of threads to number of CPU cores
num_threads: crate::loom::sys::num_cpus(),
@@ -97,6 +104,31 @@ impl Builder {
}
}
+ /// Enable both I/O and time drivers.
+ ///
+ /// Doing this is a shorthand for calling `enable_io` and `enable_time`
+ /// individually. If additional components are added to Tokio in the future,
+ /// `enable_all` will include these future components.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_all(&mut self) -> &mut Self {
+ #[cfg(feature = "io-driver")]
+ self.enable_io();
+ #[cfg(feature = "time")]
+ self.enable_time();
+
+ self
+ }
+
/// Set the maximum number of worker threads for the `Runtime`'s thread pool.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
@@ -107,14 +139,12 @@ impl Builder {
/// # Examples
///
/// ```
- /// # use tokio::runtime;
+ /// use tokio::runtime;
///
- /// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .num_threads(4)
/// .build()
/// .unwrap();
- /// # }
/// ```
pub fn num_threads(&mut self, val: usize) -> &mut Self {
self.num_threads = val;
@@ -194,14 +224,14 @@ impl Builder {
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
- /// .after_start(|| {
+ /// .on_thread_start(|| {
/// println!("thread started");
/// })
/// .build();
/// # }
/// ```
#[cfg(not(loom))]
- pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
@@ -220,14 +250,14 @@ impl Builder {
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
- /// .before_stop(|| {
+ /// .on_thread_stop(|| {
/// println!("thread stopping");
/// })
/// .build();
/// # }
/// ```
#[cfg(not(loom))]
- pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
@@ -266,21 +296,21 @@ impl Builder {
let clock = time::create_clock();
// Create I/O driver
- let (io_driver, handle) = io::create_driver()?;
- let io_handles = vec![handle];
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
- let (driver, handle) = time::create_driver(io_driver, clock.clone());
- let time_handles = vec![handle];
+ let spawner = Spawner::Shell;
- let blocking_pool = blocking::create_blocking_pool(self);
+ let blocking_pool =
+ blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Shell(Shell::new(driver)),
handle: Handle {
- kind: handle::Kind::Shell,
- io_handles,
- time_handles,
+ spawner,
+ io_handle,
+ time_handle,
clock,
blocking_spawner,
},
@@ -289,6 +319,53 @@ impl Builder {
}
}
+cfg_io_driver! {
+ impl Builder {
+ /// Enable the I/O driver.
+ ///
+ /// Doing this enables using net, process, signal, and some I/O types on
+ /// the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_io()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_io(&mut self) -> &mut Self {
+ self.enable_io = true;
+ self
+ }
+ }
+}
+
+cfg_time! {
+ impl Builder {
+ /// Enable the time driver.
+ ///
+ /// Doing this enables using `tokio::time` on the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_time()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_time(&mut self) -> &mut Self {
+ self.enable_time = true;
+ self
+ }
+ }
+}
+
cfg_rt_core! {
impl Builder {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
@@ -297,29 +374,27 @@ cfg_rt_core! {
let clock = time::create_clock();
// Create I/O driver
- let (io_driver, handle) = io::create_driver()?;
- let io_handles = vec![handle];
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
- let (driver, handle) = time::create_driver(io_driver, clock.clone());
- let time_handles = vec![handle];
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(driver);
- let spawner = scheduler.spawner();
+ let spawner = Spawner::Basic(scheduler.spawner());
// Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self);
+ let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Basic(scheduler),
handle: Handle {
- kind: handle::Kind::Basic(spawner),
- io_handles,
- time_handles,
+ spawner,
+ io_handle,
+ time_handle,
clock,
blocking_spawner,
},
@@ -333,77 +408,30 @@ cfg_rt_threaded! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Kind, ThreadPool};
- use std::sync::Mutex;
+ use crate::runtime::park::Parker;
let clock = time::create_clock();
- let mut io_handles = Vec::new();
- let mut time_handles = Vec::new();
- let mut drivers = Vec::new();
-
- for _ in 0..self.num_threads {
- // Create I/O driver and handle
- let (io_driver, handle) = io::create_driver()?;
- io_handles.push(handle);
-
- // Create a new timer.
- let (time_driver, handle) = time::create_driver(io_driver, clock.clone());
- time_handles.push(handle);
- drivers.push(Mutex::new(Some(time_driver)));
- }
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (scheduler, workers) = ThreadPool::new(self.num_threads, Parker::new(driver));
+ let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
- let blocking_pool = blocking::create_blocking_pool(self);
+ let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
let blocking_spawner = blocking_pool.spawner().clone();
- let scheduler = {
- let clock = clock.clone();
- let io_handles = io_handles.clone();
- let time_handles = time_handles.clone();
-
- let after_start = self.after_start.clone();
- let before_stop = self.before_stop.clone();
-
- let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| {
- // Configure the I/O driver
- let _io = io::set_default(&io_handles[index]);
-
- // Configure time
- time::with_default(&time_handles[index], &clock, || {
- // Call the start callback
- if let Some(after_start) = after_start.as_ref() {
- after_start();
- }
-
- // Run the worker
- next();
-
- // Call the after call back
- if let Some(before_stop) = before_stop.as_ref() {
- before_stop();
- }
- })
- })
- as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>);
-
- ThreadPool::new(
- self.num_threads,
- blocking_pool.spawner().clone(),
- around_worker,
- move |index| drivers[index].lock().unwrap().take().unwrap(),
- )
- };
-
- let spawner = scheduler.spawner().clone();
+ // Spawn the thread pool workers
+ workers.spawn(&blocking_spawner);
Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
handle: Handle {
- kind: handle::Kind::ThreadPool(spawner),
- io_handles,
- time_handles,
+ spawner,
+ io_handle,
+ time_handle,
clock,
- blocking_spawner,
+ blocking_spawner: blocking_spawner.clone(),
},
blocking_pool,
})