diff options
author | Douman <douman@gmx.se> | 2019-12-18 19:31:49 +0100 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-18 10:31:49 -0800 |
commit | b24ad9fe861ce01add2c6533149f643e38537f59 (patch) | |
tree | d24cbe308eb58bffb4eb0af8e507e1889a8afa32 /tokio/src/runtime | |
parent | 7c010ed030d0084d38451bbebf727c6695c2dbac (diff) |
rt: add configuration for core threads and max threads (#1977)
`num_threads` is deprecated. Instead, `core_threads` and `max_threads` are
introduced. `core_threads` specifies the number of "always on" threads used
for the async task executor and `max_threads` specifies the maximum number
of threads that the runtime may spawn.
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 5 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 74 | ||||
-rw-r--r-- | tokio/src/runtime/tests/loom_blocking.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 2 |
5 files changed, 73 insertions, 19 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 63c54b74..fa4c77a1 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -19,13 +19,15 @@ cfg_blocking_impl! { io: &io::Handle, time: &time::Handle, clock: &time::Clock, + thread_cap: usize, ) -> BlockingPool { BlockingPool::new( builder, spawner, io, time, - clock) + clock, + thread_cap) } } @@ -44,6 +46,7 @@ cfg_not_blocking_impl! { _io: &io::Handle, _time: &time::Handle, _clock: &time::Clock, + _thread_cap: usize, ) -> BlockingPool { BlockingPool {} } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index a25f1549..02b3d19b 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -54,11 +54,13 @@ struct Inner { /// Source of `Instant::now()` clock: time::Clock, + thread_cap: usize, + } struct Shared { queue: VecDeque<Task>, - num_th: u32, + num_th: usize, num_idle: u32, num_notify: u32, shutdown: bool, @@ -72,7 +74,6 @@ thread_local! { static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None) } -const MAX_THREADS: u32 = 1_000; const KEEP_ALIVE: Duration = Duration::from_secs(10); /// Run the provided function on an executor dedicated to blocking operations. @@ -101,6 +102,7 @@ impl BlockingPool { io: &io::Handle, time: &time::Handle, clock: &time::Clock, + thread_cap: usize, ) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); @@ -124,6 +126,7 @@ impl BlockingPool { io_handle: io.clone(), time_handle: time.clone(), clock: clock.clone(), + thread_cap, }), }, shutdown_rx, @@ -209,7 +212,7 @@ impl Spawner { if shared.num_idle == 0 { // No threads are able to process the task. - if shared.num_th == MAX_THREADS { + if shared.num_th == self.inner.thread_cap { // At max number of threads None } else { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 802608e5..2b354824 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -28,7 +28,7 @@ use std::sync::Arc; /// // build runtime /// let runtime = Builder::new() /// .threaded_scheduler() -/// .num_threads(4) +/// .core_threads(4) /// .thread_name("my-custom-name") /// .thread_stack_size(3 * 1024 * 1024) /// .build() @@ -47,10 +47,13 @@ pub struct Builder { /// Whether or not to enable the time driver enable_time: bool, - /// The number of worker threads. + /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. - num_threads: usize, + core_threads: usize, + + /// Cap on thread usage. + max_threads: usize, /// Name used for threads spawned by the runtime. pub(super) thread_name: String, @@ -91,7 +94,9 @@ impl Builder { enable_time: false, // Default to use an equal number of threads to number of CPU cores - num_threads: crate::loom::sys::num_cpus(), + core_threads: crate::loom::sys::num_cpus(), + + max_threads: 512, // Default thread name thread_name: "tokio-runtime-worker".into(), @@ -130,12 +135,26 @@ impl Builder { self } + #[deprecated(note = "In future will be replaced by core_threads method")] /// Set the maximum number of worker threads for the `Runtime`'s thread pool. /// /// This must be a number between 1 and 32,768 though it is advised to keep /// this value on the smaller side. /// /// The default value is the number of cores available to the system. + pub fn num_threads(&mut self, val: usize) -> &mut Self { + self.core_threads = val; + self + } + + /// Set the core number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// These threads will be always active and running. /// /// # Examples /// @@ -143,12 +162,32 @@ impl Builder { /// use tokio::runtime; /// /// let rt = runtime::Builder::new() - /// .num_threads(4) + /// .core_threads(4) /// .build() /// .unwrap(); /// ``` - pub fn num_threads(&mut self, val: usize) -> &mut Self { - self.num_threads = val; + pub fn core_threads(&mut self, val: usize) -> &mut Self { + self.core_threads = val; + self + } + + /// Specifies limit for threads, spawned by the Runtime. + /// + /// This is number of threads to be used outside of Runtime core threads. + /// In the current implementation it is used to cap number of threads spawned when using + /// blocking annotation + /// + /// The default value is 512. + /// + /// When multi-threaded runtime is not used, will act as limit on additional threads. + /// + /// Otherwise it limits additional threads as following: `max_threads - core_threads` + /// + /// If `core_threads` is greater than `max_threads`, then core_threads is capped + /// by `max_threads` + pub fn max_threads(&mut self, val: usize) -> &mut Self { + assert_ne!(val, 0, "Thread limit cannot be zero"); + self.max_threads = val; self } @@ -285,8 +324,14 @@ impl Builder { let spawner = Spawner::Shell; - let blocking_pool = - blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); + let blocking_pool = blocking::create_blocking_pool( + self, + &spawner, + &io_handle, + &time_handle, + &clock, + self.max_threads, + ); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { @@ -379,7 +424,7 @@ cfg_rt_core! { let spawner = Spawner::Basic(scheduler.spawner()); // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { @@ -409,15 +454,17 @@ cfg_rt_threaded! { use crate::runtime::{Kind, ThreadPool}; use crate::runtime::park::Parker; + assert!(self.core_threads <= self.max_threads, "Core threads number cannot be above max limit"); + let clock = time::create_clock(); let (io_driver, io_handle) = io::create_driver(self.enable_io)?; let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - let (scheduler, workers) = ThreadPool::new(self.num_threads, Parker::new(driver)); + let (scheduler, workers) = ThreadPool::new(self.core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); // Spawn the thread pool workers @@ -448,7 +495,8 @@ impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") .field("kind", &self.kind) - .field("num_threads", &self.num_threads) + .field("core_threads", &self.core_threads) + .field("max_threads", &self.max_threads) .field("thread_name", &self.thread_name) .field("thread_stack_size", &self.thread_stack_size) .field("after_start", &self.after_start.as_ref().map(|_| "...")) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 85e6fb12..db7048e3 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -25,7 +25,7 @@ fn blocking_shutdown() { fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new() .threaded_scheduler() - .num_threads(num_threads) + .core_threads(num_threads) .build() .unwrap() } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index 81e292d6..c85ff591 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -171,7 +171,7 @@ fn complete_block_on_under_load() { fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new() .threaded_scheduler() - .num_threads(num_threads) + .core_threads(num_threads) .build() .unwrap() } |