diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-06 21:29:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-06 21:29:10 -0800 |
commit | 4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch) | |
tree | 4ee02032f615464af50aa6858440c5cecccec4cf /tokio/src/runtime/builder.rs | |
parent | 9bec094150e869caae5105d7080f0ae54757b2d9 (diff) |
runtime: misc pool cleanup (#1743)
- Remove builders for internal types
- Avoid duplicating the blocking pool when using the concurrent
scheduler.
- misc smaller cleanup
Diffstat (limited to 'tokio/src/runtime/builder.rs')
-rw-r--r-- | tokio/src/runtime/builder.rs | 104 |
1 files changed, 56 insertions, 48 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 92fdfa1a..6081c10e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,13 +1,9 @@ +use crate::loom::sync::Arc; #[cfg(feature = "blocking")] -use crate::runtime::blocking::{Pool, PoolWaiter}; -#[cfg(feature = "rt-current-thread")] -use crate::runtime::current_thread::CurrentThread; -#[cfg(feature = "rt-full")] -use crate::runtime::thread_pool; +use crate::runtime::blocking; use crate::runtime::{io, timer, Runtime}; use std::fmt; -use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -57,10 +53,10 @@ pub struct Builder { thread_stack_size: Option<usize>, /// Callback to run after each thread starts. - after_start: Option<Arc<dyn Fn() + Send + Sync>>, + after_start: Option<Callback>, /// To run before each worker thread stops - before_stop: Option<Arc<dyn Fn() + Send + Sync>>, + before_stop: Option<Callback>, /// The clock to use clock: timer::Clock, @@ -75,6 +71,12 @@ enum Kind { ThreadPool, } +#[cfg(not(loom))] +type Callback = Arc<dyn Fn() + Send + Sync>; + +#[cfg(loom)] +type Callback = Arc<Box<dyn Fn() + Send + Sync>>; + impl Builder { /// Returns a new runtime builder initialized with default configuration /// values. @@ -206,6 +208,7 @@ impl Builder { /// .build(); /// # } /// ``` + #[cfg(not(loom))] pub fn after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, @@ -231,6 +234,7 @@ impl Builder { /// .build(); /// # } /// ``` + #[cfg(not(loom))] pub fn before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, @@ -285,13 +289,13 @@ impl Builder { net_handles, timer_handles, #[cfg(feature = "blocking")] - blocking_pool: PoolWaiter::from(Pool::default()), + blocking_pool: self.build_blocking_pool().into(), }) } #[cfg(feature = "rt-current-thread")] fn build_current_thread(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; + use crate::runtime::{CurrentThread, Kind}; // Create network driver let (net, handle) = io::create()?; @@ -307,19 +311,19 @@ impl Builder { let executor = CurrentThread::new(timer); // Blocking pool - let blocking_pool = PoolWaiter::from(Pool::default()); + let blocking_pool = self.build_blocking_pool(); Ok(Runtime { kind: Kind::CurrentThread(executor), net_handles, timer_handles, - blocking_pool, + blocking_pool: blocking_pool.into(), }) } #[cfg(feature = "rt-full")] fn build_threadpool(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; + use crate::runtime::{Kind, ThreadPool}; use crate::timer::clock; use std::sync::Mutex; @@ -341,8 +345,8 @@ impl Builder { // Get a handle to the clock for the runtime. let clock = self.clock.clone(); - // Blocking pool - let blocking_pool = PoolWaiter::from(Pool::default()); + // Create the blocking pool + let blocking_pool = self.build_blocking_pool(); let pool = { let net_handles = net_handles.clone(); @@ -351,48 +355,52 @@ impl Builder { let after_start = self.after_start.clone(); let before_stop = self.before_stop.clone(); - let mut builder = thread_pool::Builder::new(); - builder.num_threads(self.num_threads); - builder.name(&self.thread_name); - - if let Some(stack_size) = self.thread_stack_size { - builder.stack_size(stack_size); - } - - builder - .around_worker(move |index, next| { - // Configure the network driver - let _net = io::set_default(&net_handles[index]); - - // Configure the clock - clock::with_default(&clock, || { - // Configure the timer - let _timer = timer::set_default(&timer_handles[index]); - - // Call the start callback - if let Some(after_start) = after_start.as_ref() { - after_start(); - } - - // Run the worker - next(); - - // Call the after call back - if let Some(before_stop) = before_stop.as_ref() { - before_stop(); - } - }) + let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| { + // Configure the network driver + let _net = io::set_default(&net_handles[index]); + + // Configure the clock + clock::with_default(&clock, || { + // Configure the timer + let _timer = timer::set_default(&timer_handles[index]); + + // Call the start callback + if let Some(after_start) = after_start.as_ref() { + after_start(); + } + + // Run the worker + next(); + + // Call the after call back + if let Some(before_stop) = before_stop.as_ref() { + before_stop(); + } }) - .build(move |index| timers[index].lock().unwrap().take().unwrap()) + }) + as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>); + + ThreadPool::new( + self.num_threads, + blocking_pool.clone(), + around_worker, + move |index| timers[index].lock().unwrap().take().unwrap(), + ) }; Ok(Runtime { kind: Kind::ThreadPool(pool), net_handles, timer_handles, - blocking_pool, + blocking_pool: blocking_pool.into(), }) } + + #[cfg(feature = "blocking")] + fn build_blocking_pool(&self) -> Arc<blocking::Pool> { + // Create the blocking pool + blocking::Pool::new(self.thread_name.clone(), self.thread_stack_size) + } } impl Default for Builder { |