path: root/tokio/src/runtime/blocking
diff options
authorCarl Lerche <>2019-11-05 19:12:30 -0800
committerGitHub <>2019-11-05 19:12:30 -0800
commitd5c1119c881c9a8b511aa9000fd26b9bda014256 (patch)
tree72e2ca6b655f29e948a91ba4573a95350cb241e0 /tokio/src/runtime/blocking
parenta6253ed05a1e0d14bc64915f5937c29092df9497 (diff)
runtime: combine `executor` and `runtime` mods (#1734)
Now, all types are under `runtime`. `executor::util` is moved to a top level `util` module.
Diffstat (limited to 'tokio/src/runtime/blocking')
2 files changed, 419 insertions, 0 deletions
diff --git a/tokio/src/runtime/blocking/ b/tokio/src/runtime/blocking/
new file mode 100644
index 00000000..59178f25
--- /dev/null
+++ b/tokio/src/runtime/blocking/
@@ -0,0 +1,58 @@
+use crate::loom::thread;
+use crate::runtime::blocking::Pool;
+use std::usize;
+/// Builds a blocking thread pool with custom configuration values.
+pub(crate) struct Builder {
+ /// Thread name
+ name: String,
+ /// Thread stack size
+ stack_size: Option<usize>,
+impl Default for Builder {
+ fn default() -> Self {
+ Builder {
+ name: "tokio-blocking-thread".to_string(),
+ stack_size: None,
+ }
+ }
+impl Builder {
+ /// Set name of threads spawned by the pool
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ = val.into();
+ self
+ }
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.stack_size = Some(val);
+ self
+ }
+ pub(crate) fn build(self) -> Pool {
+ let mut p = Pool::default();
+ let Builder { stack_size, name } = self;
+ p.new_thread = Box::new(move || {
+ let mut b = thread::Builder::new().name(name.clone());
+ if let Some(stack_size) = stack_size {
+ b = b.stack_size(stack_size);
+ }
+ b
+ });
+ p
+ }
diff --git a/tokio/src/runtime/blocking/ b/tokio/src/runtime/blocking/
new file mode 100644
index 00000000..e634ea59
--- /dev/null
+++ b/tokio/src/runtime/blocking/
@@ -0,0 +1,361 @@
+//! Thread pool for blocking operations
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+#[cfg(feature = "blocking")]
+use crate::sync::oneshot;
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+#[cfg(feature = "blocking")]
+use std::future::Future;
+use std::ops::Deref;
+#[cfg(feature = "blocking")]
+use std::pin::Pin;
+#[cfg(feature = "blocking")]
+use std::task::{Context, Poll};
+use std::time::Duration;
+#[cfg(feature = "rt-full")]
+mod builder;
+#[cfg(feature = "rt-full")]
+pub(crate) use builder::Builder;
+#[derive(Clone, Copy)]
+enum State {
+ Empty,
+ Ready(*const Arc<Pool>),
+thread_local! {
+ /// Thread-local tracking the current executor
+ static BLOCKING: Cell<State> = Cell::new(State::Empty)
+/// Set the blocking pool for the duration of the closure
+/// If a blocking pool is already set, it will be restored when the closure returns or if it
+/// panics.
+#[allow(dead_code)] // we allow dead code since this won't be called if no executors are enabled
+pub(crate) fn with_pool<F, R>(pool: &Arc<Pool>, f: F) -> R
+ F: FnOnce() -> R,
+ // While scary, this is safe. The function takes a `&Pool`, which guarantees
+ // that the reference lives for the duration of `with_pool`.
+ //
+ // Because we are always clearing the TLS value at the end of the
+ // function, we can cast the reference to 'static which thread-local
+ // cells require.
+ BLOCKING.with(|cell| {
+ let was = cell.replace(State::Empty);
+ // Ensure that the pool is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<State>, State);
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ self.0.set(self.1);
+ }
+ }
+ let _reset = Reset(cell, was);
+ cell.set(State::Ready(pool as *const _));
+ f()
+ })
+pub(crate) struct Pool {
+ shared: Mutex<Shared>,
+ condvar: Condvar,
+ new_thread: Box<dyn Fn() -> thread::Builder + Send + Sync + 'static>,
+impl fmt::Debug for Pool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Pool").finish()
+ }
+struct Shared {
+ queue: VecDeque<Box<dyn FnOnce() + Send>>,
+ num_th: u32,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+const MAX_THREADS: u32 = 1_000;
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+/// Result of a blocking operation running on the blocking thread pool.
+#[cfg(feature = "blocking")]
+pub struct Blocking<T> {
+ rx: oneshot::Receiver<T>,
+impl Pool {
+ /// Run the provided function on an executor dedicated to blocking operations.
+ pub(crate) fn spawn(this: &Arc<Self>, f: Box<dyn FnOnce() + Send + 'static>) {
+ let should_spawn = {
+ let mut shared = this.shared.lock().unwrap();
+ if shared.shutdown {
+ // no need to even push this task; it would never get picked up
+ return;
+ }
+ shared.queue.push_back(f);
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+ if shared.num_th == MAX_THREADS {
+ // At max number of threads
+ false
+ } else {
+ shared.num_th += 1;
+ true
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ this.condvar.notify_one();
+ false
+ }
+ };
+ if should_spawn {
+ Pool::spawn_thread(Arc::clone(this), (this.new_thread)());
+ }
+ }
+ // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc
+ fn spawn_thread(this: Arc<Self>, builder: thread::Builder) {
+ builder
+ .spawn(move || {
+ let mut shared = this.shared.lock().unwrap();
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ run_task(task);
+ shared = this.shared.lock().unwrap();
+ if shared.shutdown {
+ break; // Need to increment idle before we exit
+ }
+ }
+ // IDLE
+ shared.num_idle += 1;
+ while !shared.shutdown {
+ let lock_result = this.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+ if timeout_result.timed_out() {
+ break 'main;
+ }
+ // Spurious wakeup detected, go back to sleep.
+ }
+ if shared.shutdown {
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+ // Thread exit
+ shared.num_th -= 1;
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+ if shared.shutdown && shared.num_th == 0 {
+ this.condvar.notify_one();
+ }
+ })
+ .unwrap();
+ }
+ /// Shut down all workers in the pool the next time they are idle.
+ ///
+ /// Blocks until all threads have exited.
+ pub(crate) fn shutdown(&self) {
+ let mut shared = self.shared.lock().unwrap();
+ shared.shutdown = true;
+ self.condvar.notify_all();
+ while shared.num_th > 0 {
+ shared = self.condvar.wait(shared).unwrap();
+ }
+ }
+pub(crate) struct PoolWaiter(Arc<Pool>);
+impl From<Pool> for PoolWaiter {
+ fn from(p: Pool) -> Self {
+ Self::from(Arc::new(p))
+ }
+impl From<Arc<Pool>> for PoolWaiter {
+ fn from(p: Arc<Pool>) -> Self {
+ Self(p)
+ }
+impl Deref for PoolWaiter {
+ type Target = Arc<Pool>;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+impl Drop for PoolWaiter {
+ fn drop(&mut self) {
+ self.0.shutdown();
+ }
+/// Run the provided blocking function without blocking the executor.
+/// In general, issuing a blocking call or performing a lot of compute in a
+/// future without yielding is not okay, as it may prevent the executor from
+/// driving other futures forward. If you run a closure through this method,
+/// the current executor thread will relegate all its executor duties to another
+/// (possibly new) thread, and only then poll the task. Note that this requires
+/// additional synchronization.
+/// # Examples
+/// ```
+/// # async fn docs() {
+/// tokio::runtime::blocking::in_place(move || {
+/// // do some compute-heavy work or call synchronous code
+/// });
+/// # }
+/// ```
+#[cfg(feature = "rt-full")]
+pub fn in_place<F, R>(f: F) -> R
+ F: FnOnce() -> R,
+ use crate::runtime::{enter, thread_pool};
+ enter::exit(|| thread_pool::blocking(f))
+/// Run the provided closure on a thread where blocking is acceptable.
+/// In general, issuing a blocking call or performing a lot of compute in a future without
+/// yielding is not okay, as it may prevent the executor from driving other futures forward.
+/// A closure that is run through this method will instead be run on a dedicated thread pool for
+/// such blocking tasks without holding up the main futures executor.
+/// # Examples
+/// ```
+/// # async fn docs() {
+/// tokio::runtime::blocking::run(move || {
+/// // do some compute-heavy work or call synchronous code
+/// }).await;
+/// # }
+/// ```
+#[cfg(feature = "blocking")]
+pub fn run<F, R>(f: F) -> Blocking<R>
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ let (tx, rx) = oneshot::channel();
+ BLOCKING.with(|current_pool| match current_pool.get() {
+ State::Ready(pool) => {
+ let pool = unsafe { &*pool };
+ Pool::spawn(
+ pool,
+ Box::new(move || {
+ // receiver may have gone away
+ let _ = tx.send(f());
+ }),
+ );
+ }
+ State::Empty => panic!("must be called from the context of Tokio runtime"),
+ });
+ Blocking { rx }
+#[cfg(feature = "blocking")]
+impl<T> Future for Blocking<T> {
+ type Output = T;
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ use std::task::Poll::*;
+ match Pin::new(&mut self.rx).poll(cx) {
+ Ready(Ok(v)) => Ready(v),
+ Ready(Err(_)) => panic!(
+ "the blocking operation has been dropped before completing. \
+ This should not happen and is a bug."
+ ),
+ Pending => Pending,
+ }
+ }
+fn run_task(f: Box<dyn FnOnce() + Send>) {
+ use std::panic::{catch_unwind, AssertUnwindSafe};
+ let _ = catch_unwind(AssertUnwindSafe(|| f()));
+impl Default for Pool {
+ fn default() -> Self {
+ Pool {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ }),
+ condvar: Condvar::new(),
+ new_thread: Box::new(|| {
+ thread::Builder::new().name("tokio-blocking-driver".to_string())
+ }),
+ }
+ }