diff options
author | Kevin Leimkuhler <kleimkuhler@icloud.com> | 2019-11-20 12:24:41 -0800 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-11-20 12:24:41 -0800 |
commit | 3e643c7b81736a4c2b11387a6f71aba99450270b (patch) | |
tree | 2be18c72374efcf8716c3a7d90783027231f909a | |
parent | bc150cd0b56cf6dcb7c4feab64e83b9938faa186 (diff) |
time: Eagerly bind delays to timer (#1800)
## Motivation
Similar to #1666, it is no longer necessary to lazily register delays with the
executions default timer. All delays are expected to be created from within a
runtime, and should panic if not done so.
## Solution
`tokio::time` now assumes there to be a `CURRENT_TIMER` set when creating a
delay; this can be assumed if called within a tokio runtime. If there is no
current timer, the application will panic with a "no current timer" message.
## Follow-up
Similar to #1666, `HandlePriv` can probably be removed, but this mainly prepares
for 0.2 API changes. Because it is not in the public API, this can be done in a
following change.
Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
-rw-r--r-- | tokio-test/tests/block_on.rs | 7 | ||||
-rw-r--r-- | tokio/src/time/delay.rs | 13 | ||||
-rw-r--r-- | tokio/src/time/delay_queue.rs | 120 | ||||
-rw-r--r-- | tokio/src/time/driver/entry.rs | 112 | ||||
-rw-r--r-- | tokio/src/time/driver/handle.rs | 16 | ||||
-rw-r--r-- | tokio/src/time/driver/registration.rs | 11 | ||||
-rw-r--r-- | tokio/src/time/tests/mod.rs | 22 | ||||
-rw-r--r-- | tokio/src/time/tests/test_delay.rs | 22 |
8 files changed, 145 insertions, 178 deletions
diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs index b50f9ae1..7aec82cc 100644 --- a/tokio-test/tests/block_on.rs +++ b/tokio-test/tests/block_on.rs @@ -20,5 +20,10 @@ fn async_fn() { #[test] fn test_delay() { let deadline = Instant::now() + Duration::from_millis(100); - assert_eq!((), block_on(delay_until(deadline))); + assert_eq!( + (), + block_on(async { + delay_until(deadline).await; + }) + ); } diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs index e3b605e7..bae3d9c8 100644 --- a/tokio/src/time/delay.rs +++ b/tokio/src/time/delay.rs @@ -42,7 +42,7 @@ pub fn delay_for(duration: Duration) -> Delay { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Delay { - /// The link between the `Delay` instance at the timer that drives it. + /// The link between the `Delay` instance and the timer that drives it. /// /// This also stores the `deadline` value. registration: Registration, @@ -76,21 +76,12 @@ impl Delay { pub fn reset(&mut self, deadline: Instant) { self.registration.reset(deadline); } - - /// Register the delay with the timer instance for the current execution - /// context. - fn register(&mut self) { - self.registration.register(); - } } impl Future for Delay { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { - // Ensure the `Delay` instance is associated with a timer. - self.register(); - + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { // `poll_elapsed` can return an error in two cases: // // - AtCapacity: this is a pathlogical case where far too many diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index 6a7cc6b3..1fd70625 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -228,15 +228,19 @@ impl<T> DelayQueue<T> { /// ```rust /// # use tokio::time::DelayQueue; /// # use std::time::Duration; - /// let mut delay_queue = DelayQueue::with_capacity(10); /// - /// // These insertions are done without further allocation - /// for i in 0..10 { - /// delay_queue.insert(i, Duration::from_secs(i)); - /// } + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::with_capacity(10); /// - /// // This will make the queue allocate additional storage - /// delay_queue.insert(11, Duration::from_secs(11)); + /// // These insertions are done without further allocation + /// for i in 0..10 { + /// delay_queue.insert(i, Duration::from_secs(i)); + /// } + /// + /// // This will make the queue allocate additional storage + /// delay_queue.insert(11, Duration::from_secs(11)); + /// # } /// ``` pub fn with_capacity(capacity: usize) -> DelayQueue<T> { DelayQueue { @@ -279,13 +283,16 @@ impl<T> DelayQueue<T> { /// ```rust /// use tokio::time::{DelayQueue, Duration, Instant}; /// - /// let mut delay_queue = DelayQueue::new(); - /// let key = delay_queue.insert_at( - /// "foo", Instant::now() + Duration::from_secs(5)); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert_at( + /// "foo", Instant::now() + Duration::from_secs(5)); /// - /// // Remove the entry - /// let item = delay_queue.remove(&key); - /// assert_eq!(*item.get_ref(), "foo"); + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// # } /// ``` /// /// [`poll`]: #method.poll @@ -380,12 +387,15 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); - /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// - /// // Remove the entry - /// let item = delay_queue.remove(&key); - /// assert_eq!(*item.get_ref(), "foo"); + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// # } /// ``` /// /// [`poll`]: #method.poll @@ -430,12 +440,15 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); - /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// - /// // Remove the entry - /// let item = delay_queue.remove(&key); - /// assert_eq!(*item.get_ref(), "foo"); + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// # } /// ``` pub fn remove(&mut self, key: &Key) -> Expired<T> { use crate::time::wheel::Stack; @@ -477,14 +490,17 @@ impl<T> DelayQueue<T> { /// ```rust /// use tokio::time::{DelayQueue, Duration, Instant}; /// - /// let mut delay_queue = DelayQueue::new(); - /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// - /// // "foo" is scheduled to be returned in 5 seconds + /// // "foo" is scheduled to be returned in 5 seconds /// - /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); + /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); /// - /// // "foo"is now scheduled to be returned in 10 seconds + /// // "foo"is now scheduled to be returned in 10 seconds + /// # } /// ``` pub fn reset_at(&mut self, key: &Key, when: Instant) { self.wheel.remove(&key.index, &mut self.slab); @@ -531,14 +547,17 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); - /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// - /// // "foo" is scheduled to be returned in 5 seconds + /// // "foo" is scheduled to be returned in 5 seconds /// - /// delay_queue.reset(&key, Duration::from_secs(10)); + /// delay_queue.reset(&key, Duration::from_secs(10)); /// - /// // "foo"is now scheduled to be returned in 10 seconds + /// // "foo"is now scheduled to be returned in 10 seconds + /// # } /// ``` pub fn reset(&mut self, key: &Key, timeout: Duration) { self.reset_at(key, Instant::now() + timeout); @@ -558,15 +577,18 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); /// - /// delay_queue.insert("foo", Duration::from_secs(5)); + /// delay_queue.insert("foo", Duration::from_secs(5)); /// - /// assert!(!delay_queue.is_empty()); + /// assert!(!delay_queue.is_empty()); /// - /// delay_queue.clear(); + /// delay_queue.clear(); /// - /// assert!(delay_queue.is_empty()); + /// assert!(delay_queue.is_empty()); + /// # } /// ``` pub fn clear(&mut self) { self.slab.clear(); @@ -612,12 +634,15 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); /// - /// delay_queue.insert("hello", Duration::from_secs(10)); - /// delay_queue.reserve(10); + /// delay_queue.insert("hello", Duration::from_secs(10)); + /// delay_queue.reserve(10); /// - /// assert!(delay_queue.capacity() >= 11); + /// assert!(delay_queue.capacity() >= 11); + /// # } /// ``` pub fn reserve(&mut self, additional: usize) { self.slab.reserve(additional); @@ -634,11 +659,14 @@ impl<T> DelayQueue<T> { /// use tokio::time::DelayQueue; /// use std::time::Duration; /// - /// let mut delay_queue = DelayQueue::new(); - /// assert!(delay_queue.is_empty()); + /// # #[tokio::main] + /// # async fn main() { + /// let mut delay_queue = DelayQueue::new(); + /// assert!(delay_queue.is_empty()); /// - /// delay_queue.insert("hello", Duration::from_secs(5)); - /// assert!(!delay_queue.is_empty()); + /// delay_queue.insert("hello", Duration::from_secs(5)); + /// assert!(!delay_queue.is_empty()); + /// # } /// ``` pub fn is_empty(&self) -> bool { self.slab.is_empty() diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 97ce34de..708b6cfb 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -6,7 +6,7 @@ use crate::time::{Duration, Error, Instant}; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Weak}; use std::task::{self, Poll}; use std::u64; @@ -31,7 +31,7 @@ pub(crate) struct Entry { /// without all `Delay` instances having completed. /// /// When `None`, the entry has not yet been linked with a timer instance. - inner: Option<Weak<Inner>>, + inner: Weak<Inner>, /// Tracks the entry state. This value contains the following information: /// @@ -104,18 +104,41 @@ const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Entry { - Entry { + pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> { + let handle_priv = HandlePriv::current(); + let handle = handle_priv.inner().unwrap(); + + // Increment the number of active timeouts + if handle.increment().is_err() { + // TODO(kleimkuhler): Transition to error state instead of + // panicking? + panic!("failed to add entry; timer at capacity"); + }; + + let when = handle.normalize_deadline(deadline); + let state = if when <= handle.elapsed() { + ELAPSED + } else { + when + }; + + let entry = Arc::new(Entry { time: CachePadded(UnsafeCell::new(Time { deadline, duration })), - inner: None, + inner: handle_priv.into_inner(), waker: AtomicWaker::new(), - state: AtomicU64::new(0), + state: AtomicU64::new(state), queued: AtomicBool::new(false), next_atomic: UnsafeCell::new(ptr::null_mut()), when: UnsafeCell::new(None), next_stack: UnsafeCell::new(None), prev_stack: UnsafeCell::new(ptr::null_mut()), + }); + + if handle.queue(&entry).is_err() { + entry.error(); } + + entry } /// Only called by `Registration` @@ -129,77 +152,6 @@ impl Entry { &mut *self.time.0.get() } - /// Returns `true` if the `Entry` is currently associated with a timer - /// instance. - pub(crate) fn is_registered(&self) -> bool { - self.inner.is_some() - } - - /// Only called by `Registration` - pub(crate) fn register(me: &mut Arc<Self>) { - let handle = match HandlePriv::try_current() { - Ok(handle) => handle, - Err(_) => { - // Could not associate the entry with a timer, transition the - // state to error - Arc::get_mut(me).unwrap().transition_to_error(); - - return; - } - }; - - Entry::register_with(me, handle) - } - - /// Only called by `Registration` - pub(crate) fn register_with(me: &mut Arc<Self>, handle: HandlePriv) { - assert!(!me.is_registered(), "only register an entry once"); - - let deadline = me.time_ref().deadline; - - let inner = match handle.inner() { - Some(inner) => inner, - None => { - // Could not associate the entry with a timer, transition the - // state to error - Arc::get_mut(me).unwrap().transition_to_error(); - - return; - } - }; - - // Increment the number of active timeouts - if inner.increment().is_err() { - Arc::get_mut(me).unwrap().transition_to_error(); - - return; - } - - // Associate the entry with the timer - Arc::get_mut(me).unwrap().inner = Some(handle.into_inner()); - - let when = inner.normalize_deadline(deadline); - - // Relaxed OK: At this point, there are no other threads that have - // access to this entry. - if when <= inner.elapsed() { - me.state.store(ELAPSED, Relaxed); - return; - } else { - me.state.store(when, Relaxed); - } - - if inner.queue(me).is_err() { - // The timer has shutdown, transition the entry to the error state. - me.error(); - } - } - - fn transition_to_error(&mut self) { - self.inner = Some(Weak::new()); - self.state = AtomicU64::new(ERROR); - } - /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option<u64> { @@ -317,10 +269,6 @@ impl Entry { /// Only called by `Registration` pub(crate) fn reset(entry: &mut Arc<Entry>) { - if !entry.is_registered() { - return; - } - let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, @@ -367,7 +315,7 @@ impl Entry { } fn upgrade_inner(&self) -> Option<Arc<Inner>> { - self.inner.as_ref().and_then(|inner| inner.upgrade()) + self.inner.upgrade() } } diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index 5d2e6b9b..86ff904e 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -1,6 +1,4 @@ use crate::time::driver::Inner; -use crate::time::Error; - use std::cell::RefCell; use std::fmt; use std::marker::PhantomData; @@ -24,7 +22,7 @@ thread_local! { } #[derive(Debug)] -///Unsets default timer handler on drop. +/// Guard that unsets the current default timer on drop. pub(crate) struct DefaultGuard<'a> { prev: Option<HandlePriv>, _lifetime: PhantomData<&'a u8>, @@ -39,7 +37,7 @@ impl Drop for DefaultGuard<'_> { } } -///Sets handle to default timer, returning guard that unsets it on drop. +/// Sets handle to default timer, returning guard that unsets it on drop. /// /// # Panics /// @@ -82,11 +80,13 @@ impl Default for Handle { impl HandlePriv { /// Try to get a handle to the current timer. /// - /// Returns `Err` if no handle is found. - pub(crate) fn try_current() -> Result<HandlePriv, Error> { + /// # Panics + /// + /// This function panics if there is no current timer set. + pub(crate) fn current() -> Self { CURRENT_TIMER.with(|current| match *current.borrow() { - Some(ref handle) => Ok(handle.clone()), - None => Err(Error::shutdown()), + Some(ref handle) => handle.clone(), + None => panic!("no current timer"), }) } diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs index 27b4c1cc..728d2993 100644 --- a/tokio/src/time/driver/registration.rs +++ b/tokio/src/time/driver/registration.rs @@ -15,11 +15,8 @@ pub(crate) struct Registration { impl Registration { pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { - fn is_send<T: Send + Sync>() {} - is_send::<Registration>(); - Registration { - entry: Arc::new(Entry::new(deadline, duration)), + entry: Entry::new(deadline, duration), } } @@ -27,12 +24,6 @@ impl Registration { self.entry.time_ref().deadline } - pub(crate) fn register(&mut self) { - if !self.entry.is_registered() { - Entry::register(&mut self.entry) - } - } - pub(crate) fn reset(&mut self, deadline: Instant) { unsafe { self.entry.time_mut().deadline = deadline; diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index 0aead136..6a9f25da 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -1,4 +1,24 @@ mod mock_clock; - mod test_delay; mod test_queue; + +use crate::time::{self, Instant}; +use std::time::Duration; + +fn assert_send<T: Send>() {} +fn assert_sync<T: Sync>() {} + +#[test] +fn registration_is_send_and_sync() { + use crate::time::driver::Registration; + + assert_send::<Registration>(); + assert_sync::<Registration>(); +} + +#[test] +#[should_panic] +fn delay_is_eager() { + let when = Instant::now() + Duration::from_millis(100); + let _ = time::delay_until(when); +} diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs index 8b52e0a3..43bfe379 100644 --- a/tokio/src/time/tests/test_delay.rs +++ b/tokio/src/time/tests/test_delay.rs @@ -181,29 +181,13 @@ fn delayed_delay_level_1() { } #[test] +#[should_panic] fn creating_delay_outside_of_context() { let now = Instant::now(); // This creates a delay outside of the context of a mock timer. This tests - // that it will still expire. - let mut fut = task::spawn(delay_until(now + ms(500))); - - mock(|clock| { - // This registers the delay with the timer - assert_pending!(fut.poll()); - - // Wait some time... the timer is cascading - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(448)); - - assert_pending!(fut.poll()); - - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(500)); - - // The delay has elapsed - assert_ready!(fut.poll()); - }); + // that it will panic. + let _fut = task::spawn(delay_until(now + ms(500))); } #[test] |