diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/park.rs | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/park.rs')
-rw-r--r-- | tokio/src/runtime/park.rs | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs new file mode 100644 index 00000000..7681e20f --- /dev/null +++ b/tokio/src/runtime/park.rs @@ -0,0 +1,225 @@ +//! Parks the runtime. +//! +//! A combination of the various resource driver park handles. + +use crate::loom::sync::{Arc, Mutex, Condvar}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::park::{Park, Unpark}; +use crate::runtime::time; +use crate::util::TryLock; + +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +pub(crate) struct Parker { + inner: Arc<Inner>, +} + +pub(crate) struct Unparker { + inner: Arc<Inner>, +} + +struct Inner { + /// Avoids entering the park if possible + state: AtomicUsize, + + /// Used to coordinate access to the driver / condvar + mutex: Mutex<()>, + + /// Condvar to block on if the driver is unavailable. + condvar: Condvar, + + /// Resource (I/O, time, ...) driver + shared: Arc<Shared>, +} + +const EMPTY: usize = 0; +const PARKED_CONDVAR: usize = 1; +const PARKED_DRIVER: usize = 2; +const NOTIFIED: usize = 3; + +/// Shared across multiple Parker handles +struct Shared { + /// Shared driver. Only one thread at a time can use this + driver: TryLock<time::Driver>, + + /// Unpark handle + handle: <time::Driver as Park>::Unpark, +} + +impl Parker { + pub(crate) fn new(driver: time::Driver) -> Parker { + let handle = driver.unpark(); + + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: Arc::new(Shared { + driver: TryLock::new(driver), + handle, + }), + }), + } + } +} + +impl Clone for Parker { + fn clone(&self) -> Parker { + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: self.inner.shared.clone(), + }), + } + } +} + +impl Park for Parker { + type Unpark = Unparker; + type Error = (); + + fn unpark(&self) -> Unparker { + Unparker { inner: self.inner.clone() } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + // Only parking with zero is supported... + assert_eq!(duration, Duration::from_millis(0)); + + if let Some(mut driver) = self.inner.shared.driver.try_lock() { + driver.park_timeout(duration) + .map_err(|_| ()) + } else { + Ok(()) + } + } +} + +impl Unpark for Unparker { + fn unpark(&self) { + self.inner.unpark(); + } +} + +impl Inner { + /// Park the current thread for at most `dur`. + fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + if let Some(mut driver) = self.shared.driver.try_lock() { + self.park_driver(&mut driver); + } else { + self.park_condvar(); + } + } + + fn park_condvar(&self) { + // Otherwise we need to coordinate going to sleep + let mut m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + loop { + m = self.condvar.wait(m).unwrap(); + + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + // got a notification + return; + } + + // spurious wakeup, go back to sleep + } + } + + fn park_driver(&self, driver: &mut time::Driver) { + match self.state.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + // TODO: don't unwrap + driver.park().unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED_DRIVER => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before + // this call, we must perform a release operation that `park` can + // synchronize with. To do that we must write `NOTIFIED` even if `state` + // is already `NOTIFIED`. That is why this must be a swap rather than a + // compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => {}, // no one was waiting + NOTIFIED => {}, // already unparked + PARKED_CONDVAR => self.unpark_condvar(), + PARKED_DRIVER => self.unpark_driver(), + actual => panic!("inconsistent state in unpark; actual = {}", actual), + } + } + + fn unpark_condvar(&self) { + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.mutex.lock().unwrap()); + + self.condvar.notify_one() + } + + fn unpark_driver(&self) { + self.shared.handle.unpark(); + } +} |