summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/blocking
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-21 23:28:39 -0800
committerGitHub <noreply@github.com>2019-11-21 23:28:39 -0800
commit8546ff826db8dba1e39b4119ad909fb6cab2492a (patch)
tree0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/blocking
parent6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff)
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r--tokio/src/runtime/blocking/mod.rs63
-rw-r--r--tokio/src/runtime/blocking/pool.rs354
-rw-r--r--tokio/src/runtime/blocking/schedule.rs18
-rw-r--r--tokio/src/runtime/blocking/shutdown.rs44
-rw-r--r--tokio/src/runtime/blocking/task.rs32
5 files changed, 511 insertions, 0 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
new file mode 100644
index 00000000..63c54b74
--- /dev/null
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -0,0 +1,63 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking
+//! pool. When the `blocking` feature flag is **not** enabled, these APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+cfg_blocking_impl! {
+ mod pool;
+ pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
+
+ mod schedule;
+ mod shutdown;
+ mod task;
+
+ use crate::runtime::{self, Builder, io, time};
+
+ pub(crate) fn create_blocking_pool(
+ builder: &Builder,
+ spawner: &runtime::Spawner,
+ io: &io::Handle,
+ time: &time::Handle,
+ clock: &time::Clock,
+ ) -> BlockingPool {
+ BlockingPool::new(
+ builder,
+ spawner,
+ io,
+ time,
+ clock)
+
+ }
+}
+
+cfg_not_blocking_impl! {
+ use crate::runtime::{self, io, time, Builder};
+
+ #[derive(Debug, Clone)]
+ pub(crate) struct BlockingPool {}
+
+ pub(crate) use BlockingPool as Spawner;
+
+ pub(crate) fn create_blocking_pool(
+ _builder: &Builder,
+ _spawner: &runtime::Spawner,
+ _io: &io::Handle,
+ _time: &time::Handle,
+ _clock: &time::Clock,
+ ) -> BlockingPool {
+ BlockingPool {}
+ }
+
+ impl BlockingPool {
+ pub(crate) fn spawner(&self) -> &BlockingPool {
+ self
+ }
+
+ pub(crate) fn enter<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ f()
+ }
+ }
+}
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
new file mode 100644
index 00000000..052d361a
--- /dev/null
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -0,0 +1,354 @@
+//! Thread pool for blocking operations
+
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+use crate::runtime::{self, io, time, Builder, Callback};
+use crate::runtime::blocking::shutdown;
+use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::task::BlockingTask;
+use crate::task::{self, JoinHandle};
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::time::Duration;
+
+pub(crate) struct BlockingPool {
+ spawner: Spawner,
+ shutdown_rx: shutdown::Receiver,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// State shared between worker threads
+ shared: Mutex<Shared>,
+
+ /// Pool threads wait on this.
+ condvar: Condvar,
+
+ /// Spawned threads use this name
+ thread_name: String,
+
+ /// Spawned thread stack size
+ stack_size: Option<usize>,
+
+ /// Call after a thread starts
+ after_start: Option<Callback>,
+
+ /// Call before a thread stops
+ before_stop: Option<Callback>,
+
+ /// Spawns async tasks
+ spawner: runtime::Spawner,
+
+ /// Runtime I/O driver handle
+ io_handle: io::Handle,
+
+ /// Runtime time driver handle
+ time_handle: time::Handle,
+
+ /// Source of `Instant::now()`
+ clock: time::Clock,
+
+}
+
+struct Shared {
+ queue: VecDeque<Task>,
+ num_th: u32,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+ shutdown_tx: Option<shutdown::Sender>,
+}
+
+type Task = task::Task<NoopSchedule>;
+
+thread_local! {
+ /// Thread-local tracking the current executor
+ static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
+}
+
+const MAX_THREADS: u32 = 1_000;
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Run the provided function on an executor dedicated to blocking operations.
+pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ BLOCKING.with(|cell| {
+ let schedule = match cell.get() {
+ Some(ptr) => unsafe { &*ptr },
+ None => panic!("not currently running on the Tokio runtime."),
+ };
+
+ let (task, handle) = task::joinable(BlockingTask::new(func));
+ schedule.schedule(task);
+ handle
+ })
+}
+
+// ===== impl BlockingPool =====
+
+impl BlockingPool {
+ pub(crate) fn new(
+ builder: &Builder,
+ spawner: &runtime::Spawner,
+ io: &io::Handle,
+ time: &time::Handle,
+ clock: &time::Clock,
+ ) -> BlockingPool {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ BlockingPool {
+ spawner: Spawner {
+ inner: Arc::new(Inner {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ shutdown_tx: Some(shutdown_tx),
+ }),
+ condvar: Condvar::new(),
+ thread_name: builder.thread_name.clone(),
+ stack_size: builder.thread_stack_size,
+ after_start: builder.after_start.clone(),
+ before_stop: builder.before_stop.clone(),
+ spawner: spawner.clone(),
+ io_handle: io.clone(),
+ time_handle: time.clone(),
+ clock: clock.clone(),
+ }),
+ },
+ shutdown_rx,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+}
+
+impl Drop for BlockingPool {
+ fn drop(&mut self) {
+ let mut shared = self.spawner.inner.shared.lock().unwrap();
+
+ shared.shutdown = true;
+ shared.shutdown_tx = None;
+ self.spawner.inner.condvar.notify_all();
+
+ drop(shared);
+
+ self.shutdown_rx.wait();
+ }
+}
+
+impl fmt::Debug for BlockingPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BlockingPool").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ /// Set the blocking pool for the duration of the closure
+ ///
+ /// If a blocking pool is already set, it will be restored when the closure
+ /// returns or if it panics.
+ pub(crate) fn enter<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ // While scary, this is safe. The function takes a `&BlockingPool`,
+ // which guarantees that the reference lives for the duration of
+ // `with_pool`.
+ //
+ // Because we are always clearing the TLS value at the end of the
+ // function, we can cast the reference to 'static which thread-local
+ // cells require.
+ BLOCKING.with(|cell| {
+ let was = cell.replace(None);
+
+ // Ensure that the pool is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ self.0.set(self.1);
+ }
+ }
+
+ let _reset = Reset(cell, was);
+ cell.set(Some(self as *const Spawner));
+ f()
+ })
+ }
+
+ fn schedule(&self, task: Task) {
+ let shutdown_tx = {
+ let mut shared = self.inner.shared.lock().unwrap();
+
+ if shared.shutdown {
+ // no need to even push this task; it would never get picked up
+ return;
+ }
+
+ shared.queue.push_back(task);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == MAX_THREADS {
+ // At max number of threads
+ None
+ } else {
+ shared.num_th += 1;
+ assert!(shared.shutdown_tx.is_some());
+ shared.shutdown_tx.clone()
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ self.inner.condvar.notify_one();
+ None
+ }
+ };
+
+ if let Some(shutdown_tx) = shutdown_tx {
+ self.spawn_thread(shutdown_tx);
+ }
+ }
+
+ fn spawn_thread(&self, shutdown_tx: shutdown::Sender) {
+ let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
+
+ if let Some(stack_size) = self.inner.stack_size {
+ builder = builder.stack_size(stack_size);
+ }
+
+ let inner = self.inner.clone();
+
+ builder
+ .spawn(move || {
+ inner.run();
+
+ // Make sure `inner` drops first to ensure that the shutdown_rx
+ // sees all refs to `Inner` are dropped when the `shutdown_rx`
+ // resolves.
+ drop(inner);
+ drop(shutdown_tx);
+ })
+ .unwrap();
+ }
+}
+
+impl Inner {
+ fn run(&self) {
+ let _io = io::set_default(&self.io_handle);
+
+ time::with_default(&self.time_handle, &self.clock, || {
+ self.spawner.enter(|| self.run2());
+ });
+ }
+
+ fn run2(&self) {
+ if let Some(f) = &self.after_start {
+ f()
+ }
+
+ let mut shared = self.shared.lock().unwrap();
+
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ run_task(task);
+
+ shared = self.shared.lock().unwrap();
+ if shared.shutdown {
+ break; // Need to increment idle before we exit
+ }
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ if timeout_result.timed_out() {
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ self.condvar.notify_one();
+ }
+
+ drop(shared);
+
+ if let Some(f) = &self.before_stop {
+ f()
+ }
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("blocking::Spawner").finish()
+ }
+}
+
+fn run_task(f: Task) {
+ let scheduler: &'static NoopSchedule = &NoopSchedule;
+ let res = f.run(|| Some(scheduler.into()));
+ assert!(res.is_none());
+}
diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs
new file mode 100644
index 00000000..461b12c3
--- /dev/null
+++ b/tokio/src/runtime/blocking/schedule.rs
@@ -0,0 +1,18 @@
+use crate::task::{Schedule, Task};
+
+/// `task::Schedule` implementation that does nothing. This is unique to the
+/// blocking scheduler as tasks scheduled are not really futures but blocking
+/// operations.
+pub(super) struct NoopSchedule;
+
+impl Schedule for NoopSchedule {
+ fn bind(&self, _task: &Task<Self>) {}
+
+ fn release(&self, _task: Task<Self>) {}
+
+ fn release_local(&self, _task: &Task<Self>) {}
+
+ fn schedule(&self, _task: Task<Self>) {
+ unreachable!();
+ }
+}
diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs
new file mode 100644
index 00000000..d9f5eb0f
--- /dev/null
+++ b/tokio/src/runtime/blocking/shutdown.rs
@@ -0,0 +1,44 @@
+//! A shutdown channel.
+//!
+//! Each worker holds the `Sender` half. When all the `Sender` halves are
+//! dropped, the `Receiver` receives a notification.
+
+use crate::loom::sync::Arc;
+use crate::sync::oneshot;
+
+#[derive(Debug, Clone)]
+pub(super) struct Sender {
+ tx: Arc<oneshot::Sender<()>>,
+}
+
+#[derive(Debug)]
+pub(super) struct Receiver {
+ rx: oneshot::Receiver<()>,
+}
+
+pub(super) fn channel() -> (Sender, Receiver) {
+ let (tx, rx) = oneshot::channel();
+ let tx = Sender { tx: Arc::new(tx) };
+ let rx = Receiver { rx };
+
+ (tx, rx)
+}
+
+impl Receiver {
+ /// Block the current thread until all `Sender` handles drop.
+ pub(crate) fn wait(&mut self) {
+ use crate::runtime::enter::{enter, try_enter};
+
+ let mut e = if std::thread::panicking() {
+ match try_enter() {
+ Some(enter) => enter,
+ _ => return,
+ }
+ } else {
+ enter()
+ };
+
+ // The oneshot completes with an Err
+ let _ = e.block_on(&mut self.rx);
+ }
+}
diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs
new file mode 100644
index 00000000..8ea3bace
--- /dev/null
+++ b/tokio/src/runtime/blocking/task.rs
@@ -0,0 +1,32 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Converts a function to a future that completes on poll
+pub(super) struct BlockingTask<T> {
+ func: Option<T>,
+}
+
+impl<T> BlockingTask<T> {
+ /// Initialize a new blocking task from the given function
+ pub(super) fn new(func: T) -> BlockingTask<T> {
+ BlockingTask { func: Some(func) }
+ }
+}
+
+impl<T, R> Future for BlockingTask<T>
+where
+ T: FnOnce() -> R,
+{
+ type Output = R;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
+ let me = unsafe { self.get_unchecked_mut() };
+ let func = me
+ .func
+ .take()
+ .expect("[internal exception] blocking task ran twice.");
+
+ Poll::Ready(func())
+ }
+}