summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/builder.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-06 21:29:10 -0800
committerGitHub <noreply@github.com>2019-11-06 21:29:10 -0800
commit4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch)
tree4ee02032f615464af50aa6858440c5cecccec4cf /tokio/src/runtime/builder.rs
parent9bec094150e869caae5105d7080f0ae54757b2d9 (diff)
runtime: misc pool cleanup (#1743)
- Remove builders for internal types - Avoid duplicating the blocking pool when using the concurrent scheduler. - misc smaller cleanup
Diffstat (limited to 'tokio/src/runtime/builder.rs')
-rw-r--r--tokio/src/runtime/builder.rs104
1 files changed, 56 insertions, 48 deletions
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 92fdfa1a..6081c10e 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,13 +1,9 @@
+use crate::loom::sync::Arc;
#[cfg(feature = "blocking")]
-use crate::runtime::blocking::{Pool, PoolWaiter};
-#[cfg(feature = "rt-current-thread")]
-use crate::runtime::current_thread::CurrentThread;
-#[cfg(feature = "rt-full")]
-use crate::runtime::thread_pool;
+use crate::runtime::blocking;
use crate::runtime::{io, timer, Runtime};
use std::fmt;
-use std::sync::Arc;
/// Builds Tokio Runtime with custom configuration values.
///
@@ -57,10 +53,10 @@ pub struct Builder {
thread_stack_size: Option<usize>,
/// Callback to run after each thread starts.
- after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+ after_start: Option<Callback>,
/// To run before each worker thread stops
- before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
+ before_stop: Option<Callback>,
/// The clock to use
clock: timer::Clock,
@@ -75,6 +71,12 @@ enum Kind {
ThreadPool,
}
+#[cfg(not(loom))]
+type Callback = Arc<dyn Fn() + Send + Sync>;
+
+#[cfg(loom)]
+type Callback = Arc<Box<dyn Fn() + Send + Sync>>;
+
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
@@ -206,6 +208,7 @@ impl Builder {
/// .build();
/// # }
/// ```
+ #[cfg(not(loom))]
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
@@ -231,6 +234,7 @@ impl Builder {
/// .build();
/// # }
/// ```
+ #[cfg(not(loom))]
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
@@ -285,13 +289,13 @@ impl Builder {
net_handles,
timer_handles,
#[cfg(feature = "blocking")]
- blocking_pool: PoolWaiter::from(Pool::default()),
+ blocking_pool: self.build_blocking_pool().into(),
})
}
#[cfg(feature = "rt-current-thread")]
fn build_current_thread(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
+ use crate::runtime::{CurrentThread, Kind};
// Create network driver
let (net, handle) = io::create()?;
@@ -307,19 +311,19 @@ impl Builder {
let executor = CurrentThread::new(timer);
// Blocking pool
- let blocking_pool = PoolWaiter::from(Pool::default());
+ let blocking_pool = self.build_blocking_pool();
Ok(Runtime {
kind: Kind::CurrentThread(executor),
net_handles,
timer_handles,
- blocking_pool,
+ blocking_pool: blocking_pool.into(),
})
}
#[cfg(feature = "rt-full")]
fn build_threadpool(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
+ use crate::runtime::{Kind, ThreadPool};
use crate::timer::clock;
use std::sync::Mutex;
@@ -341,8 +345,8 @@ impl Builder {
// Get a handle to the clock for the runtime.
let clock = self.clock.clone();
- // Blocking pool
- let blocking_pool = PoolWaiter::from(Pool::default());
+ // Create the blocking pool
+ let blocking_pool = self.build_blocking_pool();
let pool = {
let net_handles = net_handles.clone();
@@ -351,48 +355,52 @@ impl Builder {
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
- let mut builder = thread_pool::Builder::new();
- builder.num_threads(self.num_threads);
- builder.name(&self.thread_name);
-
- if let Some(stack_size) = self.thread_stack_size {
- builder.stack_size(stack_size);
- }
-
- builder
- .around_worker(move |index, next| {
- // Configure the network driver
- let _net = io::set_default(&net_handles[index]);
-
- // Configure the clock
- clock::with_default(&clock, || {
- // Configure the timer
- let _timer = timer::set_default(&timer_handles[index]);
-
- // Call the start callback
- if let Some(after_start) = after_start.as_ref() {
- after_start();
- }
-
- // Run the worker
- next();
-
- // Call the after call back
- if let Some(before_stop) = before_stop.as_ref() {
- before_stop();
- }
- })
+ let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| {
+ // Configure the network driver
+ let _net = io::set_default(&net_handles[index]);
+
+ // Configure the clock
+ clock::with_default(&clock, || {
+ // Configure the timer
+ let _timer = timer::set_default(&timer_handles[index]);
+
+ // Call the start callback
+ if let Some(after_start) = after_start.as_ref() {
+ after_start();
+ }
+
+ // Run the worker
+ next();
+
+ // Call the after call back
+ if let Some(before_stop) = before_stop.as_ref() {
+ before_stop();
+ }
})
- .build(move |index| timers[index].lock().unwrap().take().unwrap())
+ })
+ as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>);
+
+ ThreadPool::new(
+ self.num_threads,
+ blocking_pool.clone(),
+ around_worker,
+ move |index| timers[index].lock().unwrap().take().unwrap(),
+ )
};
Ok(Runtime {
kind: Kind::ThreadPool(pool),
net_handles,
timer_handles,
- blocking_pool,
+ blocking_pool: blocking_pool.into(),
})
}
+
+ #[cfg(feature = "blocking")]
+ fn build_blocking_pool(&self) -> Arc<blocking::Pool> {
+ // Create the blocking pool
+ blocking::Pool::new(self.thread_name.clone(), self.thread_stack_size)
+ }
}
impl Default for Builder {