summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorDouman <douman@gmx.se>2019-12-18 19:31:49 +0100
committerCarl Lerche <me@carllerche.com>2019-12-18 10:31:49 -0800
commitb24ad9fe861ce01add2c6533149f643e38537f59 (patch)
treed24cbe308eb58bffb4eb0af8e507e1889a8afa32 /tokio/src/runtime
parent7c010ed030d0084d38451bbebf727c6695c2dbac (diff)
rt: add configuration for core threads and max threads (#1977)
`num_threads` is deprecated. Instead, `core_threads` and `max_threads` are introduced. `core_threads` specifies the number of "always on" threads used for the async task executor and `max_threads` specifies the maximum number of threads that the runtime may spawn.
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/blocking/mod.rs5
-rw-r--r--tokio/src/runtime/blocking/pool.rs9
-rw-r--r--tokio/src/runtime/builder.rs74
-rw-r--r--tokio/src/runtime/tests/loom_blocking.rs2
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs2
5 files changed, 73 insertions, 19 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index 63c54b74..fa4c77a1 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -19,13 +19,15 @@ cfg_blocking_impl! {
io: &io::Handle,
time: &time::Handle,
clock: &time::Clock,
+ thread_cap: usize,
) -> BlockingPool {
BlockingPool::new(
builder,
spawner,
io,
time,
- clock)
+ clock,
+ thread_cap)
}
}
@@ -44,6 +46,7 @@ cfg_not_blocking_impl! {
_io: &io::Handle,
_time: &time::Handle,
_clock: &time::Clock,
+ _thread_cap: usize,
) -> BlockingPool {
BlockingPool {}
}
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index a25f1549..02b3d19b 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -54,11 +54,13 @@ struct Inner {
/// Source of `Instant::now()`
clock: time::Clock,
+ thread_cap: usize,
+
}
struct Shared {
queue: VecDeque<Task>,
- num_th: u32,
+ num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
@@ -72,7 +74,6 @@ thread_local! {
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
}
-const MAX_THREADS: u32 = 1_000;
const KEEP_ALIVE: Duration = Duration::from_secs(10);
/// Run the provided function on an executor dedicated to blocking operations.
@@ -101,6 +102,7 @@ impl BlockingPool {
io: &io::Handle,
time: &time::Handle,
clock: &time::Clock,
+ thread_cap: usize,
) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
@@ -124,6 +126,7 @@ impl BlockingPool {
io_handle: io.clone(),
time_handle: time.clone(),
clock: clock.clone(),
+ thread_cap,
}),
},
shutdown_rx,
@@ -209,7 +212,7 @@ impl Spawner {
if shared.num_idle == 0 {
// No threads are able to process the task.
- if shared.num_th == MAX_THREADS {
+ if shared.num_th == self.inner.thread_cap {
// At max number of threads
None
} else {
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 802608e5..2b354824 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -28,7 +28,7 @@ use std::sync::Arc;
/// // build runtime
/// let runtime = Builder::new()
/// .threaded_scheduler()
-/// .num_threads(4)
+/// .core_threads(4)
/// .thread_name("my-custom-name")
/// .thread_stack_size(3 * 1024 * 1024)
/// .build()
@@ -47,10 +47,13 @@ pub struct Builder {
/// Whether or not to enable the time driver
enable_time: bool,
- /// The number of worker threads.
+ /// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
- num_threads: usize,
+ core_threads: usize,
+
+ /// Cap on thread usage.
+ max_threads: usize,
/// Name used for threads spawned by the runtime.
pub(super) thread_name: String,
@@ -91,7 +94,9 @@ impl Builder {
enable_time: false,
// Default to use an equal number of threads to number of CPU cores
- num_threads: crate::loom::sys::num_cpus(),
+ core_threads: crate::loom::sys::num_cpus(),
+
+ max_threads: 512,
// Default thread name
thread_name: "tokio-runtime-worker".into(),
@@ -130,12 +135,26 @@ impl Builder {
self
}
+ #[deprecated(note = "In future will be replaced by core_threads method")]
/// Set the maximum number of worker threads for the `Runtime`'s thread pool.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
/// this value on the smaller side.
///
/// The default value is the number of cores available to the system.
+ pub fn num_threads(&mut self, val: usize) -> &mut Self {
+ self.core_threads = val;
+ self
+ }
+
+ /// Set the core number of worker threads for the `Runtime`'s thread pool.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// These threads will be always active and running.
///
/// # Examples
///
@@ -143,12 +162,32 @@ impl Builder {
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new()
- /// .num_threads(4)
+ /// .core_threads(4)
/// .build()
/// .unwrap();
/// ```
- pub fn num_threads(&mut self, val: usize) -> &mut Self {
- self.num_threads = val;
+ pub fn core_threads(&mut self, val: usize) -> &mut Self {
+ self.core_threads = val;
+ self
+ }
+
+ /// Specifies limit for threads, spawned by the Runtime.
+ ///
+ /// This is number of threads to be used outside of Runtime core threads.
+ /// In the current implementation it is used to cap number of threads spawned when using
+ /// blocking annotation
+ ///
+ /// The default value is 512.
+ ///
+ /// When multi-threaded runtime is not used, will act as limit on additional threads.
+ ///
+ /// Otherwise it limits additional threads as following: `max_threads - core_threads`
+ ///
+ /// If `core_threads` is greater than `max_threads`, then core_threads is capped
+ /// by `max_threads`
+ pub fn max_threads(&mut self, val: usize) -> &mut Self {
+ assert_ne!(val, 0, "Thread limit cannot be zero");
+ self.max_threads = val;
self
}
@@ -285,8 +324,14 @@ impl Builder {
let spawner = Spawner::Shell;
- let blocking_pool =
- blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
+ let blocking_pool = blocking::create_blocking_pool(
+ self,
+ &spawner,
+ &io_handle,
+ &time_handle,
+ &clock,
+ self.max_threads,
+ );
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
@@ -379,7 +424,7 @@ cfg_rt_core! {
let spawner = Spawner::Basic(scheduler.spawner());
// Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
+ let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
@@ -409,15 +454,17 @@ cfg_rt_threaded! {
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
+ assert!(self.core_threads <= self.max_threads, "Core threads number cannot be above max limit");
+
let clock = time::create_clock();
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
- let (scheduler, workers) = ThreadPool::new(self.num_threads, Parker::new(driver));
+ let (scheduler, workers) = ThreadPool::new(self.core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock);
+ let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Spawn the thread pool workers
@@ -448,7 +495,8 @@ impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("kind", &self.kind)
- .field("num_threads", &self.num_threads)
+ .field("core_threads", &self.core_threads)
+ .field("max_threads", &self.max_threads)
.field("thread_name", &self.thread_name)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs
index 85e6fb12..db7048e3 100644
--- a/tokio/src/runtime/tests/loom_blocking.rs
+++ b/tokio/src/runtime/tests/loom_blocking.rs
@@ -25,7 +25,7 @@ fn blocking_shutdown() {
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new()
.threaded_scheduler()
- .num_threads(num_threads)
+ .core_threads(num_threads)
.build()
.unwrap()
}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index 81e292d6..c85ff591 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -171,7 +171,7 @@ fn complete_block_on_under_load() {
fn mk_pool(num_threads: usize) -> Runtime {
runtime::Builder::new()
.threaded_scheduler()
- .num_threads(num_threads)
+ .core_threads(num_threads)
.build()
.unwrap()
}