diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/blocking | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 63 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 354 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/schedule.rs | 18 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/shutdown.rs | 44 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/task.rs | 32 |
5 files changed, 511 insertions, 0 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs new file mode 100644 index 00000000..63c54b74 --- /dev/null +++ b/tokio/src/runtime/blocking/mod.rs @@ -0,0 +1,63 @@ +//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking +//! pool. When the `blocking` feature flag is **not** enabled, these APIs are +//! shells. This isolates the complexity of dealing with conditional +//! compilation. + +cfg_blocking_impl! { + mod pool; + pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; + + mod schedule; + mod shutdown; + mod task; + + use crate::runtime::{self, Builder, io, time}; + + pub(crate) fn create_blocking_pool( + builder: &Builder, + spawner: &runtime::Spawner, + io: &io::Handle, + time: &time::Handle, + clock: &time::Clock, + ) -> BlockingPool { + BlockingPool::new( + builder, + spawner, + io, + time, + clock) + + } +} + +cfg_not_blocking_impl! { + use crate::runtime::{self, io, time, Builder}; + + #[derive(Debug, Clone)] + pub(crate) struct BlockingPool {} + + pub(crate) use BlockingPool as Spawner; + + pub(crate) fn create_blocking_pool( + _builder: &Builder, + _spawner: &runtime::Spawner, + _io: &io::Handle, + _time: &time::Handle, + _clock: &time::Clock, + ) -> BlockingPool { + BlockingPool {} + } + + impl BlockingPool { + pub(crate) fn spawner(&self) -> &BlockingPool { + self + } + + pub(crate) fn enter<F, R>(&self, f: F) -> R + where + F: FnOnce() -> R, + { + f() + } + } +} diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs new file mode 100644 index 00000000..052d361a --- /dev/null +++ b/tokio/src/runtime/blocking/pool.rs @@ -0,0 +1,354 @@ +//! Thread pool for blocking operations + +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::thread; +use crate::runtime::{self, io, time, Builder, Callback}; +use crate::runtime::blocking::shutdown; +use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::task::BlockingTask; +use crate::task::{self, JoinHandle}; + +use std::cell::Cell; +use std::collections::VecDeque; +use std::fmt; +use std::time::Duration; + +pub(crate) struct BlockingPool { + spawner: Spawner, + shutdown_rx: shutdown::Receiver, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + inner: Arc<Inner>, +} + +struct Inner { + /// State shared between worker threads + shared: Mutex<Shared>, + + /// Pool threads wait on this. + condvar: Condvar, + + /// Spawned threads use this name + thread_name: String, + + /// Spawned thread stack size + stack_size: Option<usize>, + + /// Call after a thread starts + after_start: Option<Callback>, + + /// Call before a thread stops + before_stop: Option<Callback>, + + /// Spawns async tasks + spawner: runtime::Spawner, + + /// Runtime I/O driver handle + io_handle: io::Handle, + + /// Runtime time driver handle + time_handle: time::Handle, + + /// Source of `Instant::now()` + clock: time::Clock, + +} + +struct Shared { + queue: VecDeque<Task>, + num_th: u32, + num_idle: u32, + num_notify: u32, + shutdown: bool, + shutdown_tx: Option<shutdown::Sender>, +} + +type Task = task::Task<NoopSchedule>; + +thread_local! { + /// Thread-local tracking the current executor + static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None) +} + +const MAX_THREADS: u32 = 1_000; +const KEEP_ALIVE: Duration = Duration::from_secs(10); + +/// Run the provided function on an executor dedicated to blocking operations. +pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> +where + F: FnOnce() -> R + Send + 'static, +{ + BLOCKING.with(|cell| { + let schedule = match cell.get() { + Some(ptr) => unsafe { &*ptr }, + None => panic!("not currently running on the Tokio runtime."), + }; + + let (task, handle) = task::joinable(BlockingTask::new(func)); + schedule.schedule(task); + handle + }) +} + +// ===== impl BlockingPool ===== + +impl BlockingPool { + pub(crate) fn new( + builder: &Builder, + spawner: &runtime::Spawner, + io: &io::Handle, + time: &time::Handle, + clock: &time::Clock, + ) -> BlockingPool { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + + BlockingPool { + spawner: Spawner { + inner: Arc::new(Inner { + shared: Mutex::new(Shared { + queue: VecDeque::new(), + num_th: 0, + num_idle: 0, + num_notify: 0, + shutdown: false, + shutdown_tx: Some(shutdown_tx), + }), + condvar: Condvar::new(), + thread_name: builder.thread_name.clone(), + stack_size: builder.thread_stack_size, + after_start: builder.after_start.clone(), + before_stop: builder.before_stop.clone(), + spawner: spawner.clone(), + io_handle: io.clone(), + time_handle: time.clone(), + clock: clock.clone(), + }), + }, + shutdown_rx, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } +} + +impl Drop for BlockingPool { + fn drop(&mut self) { + let mut shared = self.spawner.inner.shared.lock().unwrap(); + + shared.shutdown = true; + shared.shutdown_tx = None; + self.spawner.inner.condvar.notify_all(); + + drop(shared); + + self.shutdown_rx.wait(); + } +} + +impl fmt::Debug for BlockingPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BlockingPool").finish() + } +} + +// ===== impl Spawner ===== + +impl Spawner { + /// Set the blocking pool for the duration of the closure + /// + /// If a blocking pool is already set, it will be restored when the closure + /// returns or if it panics. + pub(crate) fn enter<F, R>(&self, f: F) -> R + where + F: FnOnce() -> R, + { + // While scary, this is safe. The function takes a `&BlockingPool`, + // which guarantees that the reference lives for the duration of + // `with_pool`. + // + // Because we are always clearing the TLS value at the end of the + // function, we can cast the reference to 'static which thread-local + // cells require. + BLOCKING.with(|cell| { + let was = cell.replace(None); + + // Ensure that the pool is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>); + + impl Drop for Reset<'_> { + fn drop(&mut self) { + self.0.set(self.1); + } + } + + let _reset = Reset(cell, was); + cell.set(Some(self as *const Spawner)); + f() + }) + } + + fn schedule(&self, task: Task) { + let shutdown_tx = { + let mut shared = self.inner.shared.lock().unwrap(); + + if shared.shutdown { + // no need to even push this task; it would never get picked up + return; + } + + shared.queue.push_back(task); + + if shared.num_idle == 0 { + // No threads are able to process the task. + + if shared.num_th == MAX_THREADS { + // At max number of threads + None + } else { + shared.num_th += 1; + assert!(shared.shutdown_tx.is_some()); + shared.shutdown_tx.clone() + } + } else { + // Notify an idle worker thread. The notification counter + // is used to count the needed amount of notifications + // exactly. Thread libraries may generate spurious + // wakeups, this counter is used to keep us in a + // consistent state. + shared.num_idle -= 1; + shared.num_notify += 1; + self.inner.condvar.notify_one(); + None + } + }; + + if let Some(shutdown_tx) = shutdown_tx { + self.spawn_thread(shutdown_tx); + } + } + + fn spawn_thread(&self, shutdown_tx: shutdown::Sender) { + let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); + + if let Some(stack_size) = self.inner.stack_size { + builder = builder.stack_size(stack_size); + } + + let inner = self.inner.clone(); + + builder + .spawn(move || { + inner.run(); + + // Make sure `inner` drops first to ensure that the shutdown_rx + // sees all refs to `Inner` are dropped when the `shutdown_rx` + // resolves. + drop(inner); + drop(shutdown_tx); + }) + .unwrap(); + } +} + +impl Inner { + fn run(&self) { + let _io = io::set_default(&self.io_handle); + + time::with_default(&self.time_handle, &self.clock, || { + self.spawner.enter(|| self.run2()); + }); + } + + fn run2(&self) { + if let Some(f) = &self.after_start { + f() + } + + let mut shared = self.shared.lock().unwrap(); + + 'main: loop { + // BUSY + while let Some(task) = shared.queue.pop_front() { + drop(shared); + run_task(task); + + shared = self.shared.lock().unwrap(); + if shared.shutdown { + break; // Need to increment idle before we exit + } + } + + // IDLE + shared.num_idle += 1; + + while !shared.shutdown { + let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + + shared = lock_result.0; + let timeout_result = lock_result.1; + + if shared.num_notify != 0 { + // We have received a legitimate wakeup, + // acknowledge it by decrementing the counter + // and transition to the BUSY state. + shared.num_notify -= 1; + break; + } + + if timeout_result.timed_out() { + break 'main; + } + + // Spurious wakeup detected, go back to sleep. + } + + if shared.shutdown { + // Work was produced, and we "took" it (by decrementing num_notify). + // This means that num_idle was decremented once for our wakeup. + // But, since we are exiting, we need to "undo" that, as we'll stay idle. + shared.num_idle += 1; + // NOTE: Technically we should also do num_notify++ and notify again, + // but since we're shutting down anyway, that won't be necessary. + break; + } + } + + // Thread exit + shared.num_th -= 1; + + // num_idle should now be tracked exactly, panic + // with a descriptive message if it is not the + // case. + shared.num_idle = shared + .num_idle + .checked_sub(1) + .expect("num_idle underflowed on thread exit"); + + if shared.shutdown && shared.num_th == 0 { + self.condvar.notify_one(); + } + + drop(shared); + + if let Some(f) = &self.before_stop { + f() + } + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("blocking::Spawner").finish() + } +} + +fn run_task(f: Task) { + let scheduler: &'static NoopSchedule = &NoopSchedule; + let res = f.run(|| Some(scheduler.into())); + assert!(res.is_none()); +} diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs new file mode 100644 index 00000000..461b12c3 --- /dev/null +++ b/tokio/src/runtime/blocking/schedule.rs @@ -0,0 +1,18 @@ +use crate::task::{Schedule, Task}; + +/// `task::Schedule` implementation that does nothing. This is unique to the +/// blocking scheduler as tasks scheduled are not really futures but blocking +/// operations. +pub(super) struct NoopSchedule; + +impl Schedule for NoopSchedule { + fn bind(&self, _task: &Task<Self>) {} + + fn release(&self, _task: Task<Self>) {} + + fn release_local(&self, _task: &Task<Self>) {} + + fn schedule(&self, _task: Task<Self>) { + unreachable!(); + } +} diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs new file mode 100644 index 00000000..d9f5eb0f --- /dev/null +++ b/tokio/src/runtime/blocking/shutdown.rs @@ -0,0 +1,44 @@ +//! A shutdown channel. +//! +//! Each worker holds the `Sender` half. When all the `Sender` halves are +//! dropped, the `Receiver` receives a notification. + +use crate::loom::sync::Arc; +use crate::sync::oneshot; + +#[derive(Debug, Clone)] +pub(super) struct Sender { + tx: Arc<oneshot::Sender<()>>, +} + +#[derive(Debug)] +pub(super) struct Receiver { + rx: oneshot::Receiver<()>, +} + +pub(super) fn channel() -> (Sender, Receiver) { + let (tx, rx) = oneshot::channel(); + let tx = Sender { tx: Arc::new(tx) }; + let rx = Receiver { rx }; + + (tx, rx) +} + +impl Receiver { + /// Block the current thread until all `Sender` handles drop. + pub(crate) fn wait(&mut self) { + use crate::runtime::enter::{enter, try_enter}; + + let mut e = if std::thread::panicking() { + match try_enter() { + Some(enter) => enter, + _ => return, + } + } else { + enter() + }; + + // The oneshot completes with an Err + let _ = e.block_on(&mut self.rx); + } +} diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs new file mode 100644 index 00000000..8ea3bace --- /dev/null +++ b/tokio/src/runtime/blocking/task.rs @@ -0,0 +1,32 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Converts a function to a future that completes on poll +pub(super) struct BlockingTask<T> { + func: Option<T>, +} + +impl<T> BlockingTask<T> { + /// Initialize a new blocking task from the given function + pub(super) fn new(func: T) -> BlockingTask<T> { + BlockingTask { func: Some(func) } + } +} + +impl<T, R> Future for BlockingTask<T> +where + T: FnOnce() -> R, +{ + type Output = R; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> { + let me = unsafe { self.get_unchecked_mut() }; + let func = me + .func + .take() + .expect("[internal exception] blocking task ran twice."); + + Poll::Ready(func()) + } +} |