summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin Leimkuhler <kleimkuhler@icloud.com>2019-11-20 12:24:41 -0800
committerCarl Lerche <me@carllerche.com>2019-11-20 12:24:41 -0800
commit3e643c7b81736a4c2b11387a6f71aba99450270b (patch)
tree2be18c72374efcf8716c3a7d90783027231f909a
parentbc150cd0b56cf6dcb7c4feab64e83b9938faa186 (diff)
time: Eagerly bind delays to timer (#1800)
## Motivation Similar to #1666, it is no longer necessary to lazily register delays with the executions default timer. All delays are expected to be created from within a runtime, and should panic if not done so. ## Solution `tokio::time` now assumes there to be a `CURRENT_TIMER` set when creating a delay; this can be assumed if called within a tokio runtime. If there is no current timer, the application will panic with a "no current timer" message. ## Follow-up Similar to #1666, `HandlePriv` can probably be removed, but this mainly prepares for 0.2 API changes. Because it is not in the public API, this can be done in a following change. Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
-rw-r--r--tokio-test/tests/block_on.rs7
-rw-r--r--tokio/src/time/delay.rs13
-rw-r--r--tokio/src/time/delay_queue.rs120
-rw-r--r--tokio/src/time/driver/entry.rs112
-rw-r--r--tokio/src/time/driver/handle.rs16
-rw-r--r--tokio/src/time/driver/registration.rs11
-rw-r--r--tokio/src/time/tests/mod.rs22
-rw-r--r--tokio/src/time/tests/test_delay.rs22
8 files changed, 145 insertions, 178 deletions
diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs
index b50f9ae1..7aec82cc 100644
--- a/tokio-test/tests/block_on.rs
+++ b/tokio-test/tests/block_on.rs
@@ -20,5 +20,10 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
- assert_eq!((), block_on(delay_until(deadline)));
+ assert_eq!(
+ (),
+ block_on(async {
+ delay_until(deadline).await;
+ })
+ );
}
diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs
index e3b605e7..bae3d9c8 100644
--- a/tokio/src/time/delay.rs
+++ b/tokio/src/time/delay.rs
@@ -42,7 +42,7 @@ pub fn delay_for(duration: Duration) -> Delay {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Delay {
- /// The link between the `Delay` instance at the timer that drives it.
+ /// The link between the `Delay` instance and the timer that drives it.
///
/// This also stores the `deadline` value.
registration: Registration,
@@ -76,21 +76,12 @@ impl Delay {
pub fn reset(&mut self, deadline: Instant) {
self.registration.reset(deadline);
}
-
- /// Register the delay with the timer instance for the current execution
- /// context.
- fn register(&mut self) {
- self.registration.register();
- }
}
impl Future for Delay {
type Output = ();
- fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
- // Ensure the `Delay` instance is associated with a timer.
- self.register();
-
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathlogical case where far too many
diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs
index 6a7cc6b3..1fd70625 100644
--- a/tokio/src/time/delay_queue.rs
+++ b/tokio/src/time/delay_queue.rs
@@ -228,15 +228,19 @@ impl<T> DelayQueue<T> {
/// ```rust
/// # use tokio::time::DelayQueue;
/// # use std::time::Duration;
- /// let mut delay_queue = DelayQueue::with_capacity(10);
///
- /// // These insertions are done without further allocation
- /// for i in 0..10 {
- /// delay_queue.insert(i, Duration::from_secs(i));
- /// }
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::with_capacity(10);
///
- /// // This will make the queue allocate additional storage
- /// delay_queue.insert(11, Duration::from_secs(11));
+ /// // These insertions are done without further allocation
+ /// for i in 0..10 {
+ /// delay_queue.insert(i, Duration::from_secs(i));
+ /// }
+ ///
+ /// // This will make the queue allocate additional storage
+ /// delay_queue.insert(11, Duration::from_secs(11));
+ /// # }
/// ```
pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
DelayQueue {
@@ -279,13 +283,16 @@ impl<T> DelayQueue<T> {
/// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant};
///
- /// let mut delay_queue = DelayQueue::new();
- /// let key = delay_queue.insert_at(
- /// "foo", Instant::now() + Duration::from_secs(5));
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// let key = delay_queue.insert_at(
+ /// "foo", Instant::now() + Duration::from_secs(5));
///
- /// // Remove the entry
- /// let item = delay_queue.remove(&key);
- /// assert_eq!(*item.get_ref(), "foo");
+ /// // Remove the entry
+ /// let item = delay_queue.remove(&key);
+ /// assert_eq!(*item.get_ref(), "foo");
+ /// # }
/// ```
///
/// [`poll`]: #method.poll
@@ -380,12 +387,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
- /// let key = delay_queue.insert("foo", Duration::from_secs(5));
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// let key = delay_queue.insert("foo", Duration::from_secs(5));
///
- /// // Remove the entry
- /// let item = delay_queue.remove(&key);
- /// assert_eq!(*item.get_ref(), "foo");
+ /// // Remove the entry
+ /// let item = delay_queue.remove(&key);
+ /// assert_eq!(*item.get_ref(), "foo");
+ /// # }
/// ```
///
/// [`poll`]: #method.poll
@@ -430,12 +440,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
- /// let key = delay_queue.insert("foo", Duration::from_secs(5));
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// let key = delay_queue.insert("foo", Duration::from_secs(5));
///
- /// // Remove the entry
- /// let item = delay_queue.remove(&key);
- /// assert_eq!(*item.get_ref(), "foo");
+ /// // Remove the entry
+ /// let item = delay_queue.remove(&key);
+ /// assert_eq!(*item.get_ref(), "foo");
+ /// # }
/// ```
pub fn remove(&mut self, key: &Key) -> Expired<T> {
use crate::time::wheel::Stack;
@@ -477,14 +490,17 @@ impl<T> DelayQueue<T> {
/// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant};
///
- /// let mut delay_queue = DelayQueue::new();
- /// let key = delay_queue.insert("foo", Duration::from_secs(5));
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// let key = delay_queue.insert("foo", Duration::from_secs(5));
///
- /// // "foo" is scheduled to be returned in 5 seconds
+ /// // "foo" is scheduled to be returned in 5 seconds
///
- /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
+ /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
///
- /// // "foo"is now scheduled to be returned in 10 seconds
+ /// // "foo"is now scheduled to be returned in 10 seconds
+ /// # }
/// ```
pub fn reset_at(&mut self, key: &Key, when: Instant) {
self.wheel.remove(&key.index, &mut self.slab);
@@ -531,14 +547,17 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
- /// let key = delay_queue.insert("foo", Duration::from_secs(5));
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// let key = delay_queue.insert("foo", Duration::from_secs(5));
///
- /// // "foo" is scheduled to be returned in 5 seconds
+ /// // "foo" is scheduled to be returned in 5 seconds
///
- /// delay_queue.reset(&key, Duration::from_secs(10));
+ /// delay_queue.reset(&key, Duration::from_secs(10));
///
- /// // "foo"is now scheduled to be returned in 10 seconds
+ /// // "foo"is now scheduled to be returned in 10 seconds
+ /// # }
/// ```
pub fn reset(&mut self, key: &Key, timeout: Duration) {
self.reset_at(key, Instant::now() + timeout);
@@ -558,15 +577,18 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
///
- /// delay_queue.insert("foo", Duration::from_secs(5));
+ /// delay_queue.insert("foo", Duration::from_secs(5));
///
- /// assert!(!delay_queue.is_empty());
+ /// assert!(!delay_queue.is_empty());
///
- /// delay_queue.clear();
+ /// delay_queue.clear();
///
- /// assert!(delay_queue.is_empty());
+ /// assert!(delay_queue.is_empty());
+ /// # }
/// ```
pub fn clear(&mut self) {
self.slab.clear();
@@ -612,12 +634,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
///
- /// delay_queue.insert("hello", Duration::from_secs(10));
- /// delay_queue.reserve(10);
+ /// delay_queue.insert("hello", Duration::from_secs(10));
+ /// delay_queue.reserve(10);
///
- /// assert!(delay_queue.capacity() >= 11);
+ /// assert!(delay_queue.capacity() >= 11);
+ /// # }
/// ```
pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional);
@@ -634,11 +659,14 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue;
/// use std::time::Duration;
///
- /// let mut delay_queue = DelayQueue::new();
- /// assert!(delay_queue.is_empty());
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let mut delay_queue = DelayQueue::new();
+ /// assert!(delay_queue.is_empty());
///
- /// delay_queue.insert("hello", Duration::from_secs(5));
- /// assert!(!delay_queue.is_empty());
+ /// delay_queue.insert("hello", Duration::from_secs(5));
+ /// assert!(!delay_queue.is_empty());
+ /// # }
/// ```
pub fn is_empty(&self) -> bool {
self.slab.is_empty()
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index 97ce34de..708b6cfb 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -6,7 +6,7 @@ use crate::time::{Duration, Error, Instant};
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
-use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::{self, Poll};
use std::u64;
@@ -31,7 +31,7 @@ pub(crate) struct Entry {
/// without all `Delay` instances having completed.
///
/// When `None`, the entry has not yet been linked with a timer instance.
- inner: Option<Weak<Inner>>,
+ inner: Weak<Inner>,
/// Tracks the entry state. This value contains the following information:
///
@@ -104,18 +104,41 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====
impl Entry {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Entry {
- Entry {
+ pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> {
+ let handle_priv = HandlePriv::current();
+ let handle = handle_priv.inner().unwrap();
+
+ // Increment the number of active timeouts
+ if handle.increment().is_err() {
+ // TODO(kleimkuhler): Transition to error state instead of
+ // panicking?
+ panic!("failed to add entry; timer at capacity");
+ };
+
+ let when = handle.normalize_deadline(deadline);
+ let state = if when <= handle.elapsed() {
+ ELAPSED
+ } else {
+ when
+ };
+
+ let entry = Arc::new(Entry {
time: CachePadded(UnsafeCell::new(Time { deadline, duration })),
- inner: None,
+ inner: handle_priv.into_inner(),
waker: AtomicWaker::new(),
- state: AtomicU64::new(0),
+ state: AtomicU64::new(state),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
+ });
+
+ if handle.queue(&entry).is_err() {
+ entry.error();
}
+
+ entry
}
/// Only called by `Registration`
@@ -129,77 +152,6 @@ impl Entry {
&mut *self.time.0.get()
}
- /// Returns `true` if the `Entry` is currently associated with a timer
- /// instance.
- pub(crate) fn is_registered(&self) -> bool {
- self.inner.is_some()
- }
-
- /// Only called by `Registration`
- pub(crate) fn register(me: &mut Arc<Self>) {
- let handle = match HandlePriv::try_current() {
- Ok(handle) => handle,
- Err(_) => {
- // Could not associate the entry with a timer, transition the
- // state to error
- Arc::get_mut(me).unwrap().transition_to_error();
-
- return;
- }
- };
-
- Entry::register_with(me, handle)
- }
-
- /// Only called by `Registration`
- pub(crate) fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
- assert!(!me.is_registered(), "only register an entry once");
-
- let deadline = me.time_ref().deadline;
-
- let inner = match handle.inner() {
- Some(inner) => inner,
- None => {
- // Could not associate the entry with a timer, transition the
- // state to error
- Arc::get_mut(me).unwrap().transition_to_error();
-
- return;
- }
- };
-
- // Increment the number of active timeouts
- if inner.increment().is_err() {
- Arc::get_mut(me).unwrap().transition_to_error();
-
- return;
- }
-
- // Associate the entry with the timer
- Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
-
- let when = inner.normalize_deadline(deadline);
-
- // Relaxed OK: At this point, there are no other threads that have
- // access to this entry.
- if when <= inner.elapsed() {
- me.state.store(ELAPSED, Relaxed);
- return;
- } else {
- me.state.store(when, Relaxed);
- }
-
- if inner.queue(me).is_err() {
- // The timer has shutdown, transition the entry to the error state.
- me.error();
- }
- }
-
- fn transition_to_error(&mut self) {
- self.inner = Some(Weak::new());
- self.state = AtomicU64::new(ERROR);
- }
-
/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> {
@@ -317,10 +269,6 @@ impl Entry {
/// Only called by `Registration`
pub(crate) fn reset(entry: &mut Arc<Entry>) {
- if !entry.is_registered() {
- return;
- }
-
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
@@ -367,7 +315,7 @@ impl Entry {
}
fn upgrade_inner(&self) -> Option<Arc<Inner>> {
- self.inner.as_ref().and_then(|inner| inner.upgrade())
+ self.inner.upgrade()
}
}
diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs
index 5d2e6b9b..86ff904e 100644
--- a/tokio/src/time/driver/handle.rs
+++ b/tokio/src/time/driver/handle.rs
@@ -1,6 +1,4 @@
use crate::time::driver::Inner;
-use crate::time::Error;
-
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
@@ -24,7 +22,7 @@ thread_local! {
}
#[derive(Debug)]
-///Unsets default timer handler on drop.
+/// Guard that unsets the current default timer on drop.
pub(crate) struct DefaultGuard<'a> {
prev: Option<HandlePriv>,
_lifetime: PhantomData<&'a u8>,
@@ -39,7 +37,7 @@ impl Drop for DefaultGuard<'_> {
}
}
-///Sets handle to default timer, returning guard that unsets it on drop.
+/// Sets handle to default timer, returning guard that unsets it on drop.
///
/// # Panics
///
@@ -82,11 +80,13 @@ impl Default for Handle {
impl HandlePriv {
/// Try to get a handle to the current timer.
///
- /// Returns `Err` if no handle is found.
- pub(crate) fn try_current() -> Result<HandlePriv, Error> {
+ /// # Panics
+ ///
+ /// This function panics if there is no current timer set.
+ pub(crate) fn current() -> Self {
CURRENT_TIMER.with(|current| match *current.borrow() {
- Some(ref handle) => Ok(handle.clone()),
- None => Err(Error::shutdown()),
+ Some(ref handle) => handle.clone(),
+ None => panic!("no current timer"),
})
}
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
index 27b4c1cc..728d2993 100644
--- a/tokio/src/time/driver/registration.rs
+++ b/tokio/src/time/driver/registration.rs
@@ -15,11 +15,8 @@ pub(crate) struct Registration {
impl Registration {
pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
- fn is_send<T: Send + Sync>() {}
- is_send::<Registration>();
-
Registration {
- entry: Arc::new(Entry::new(deadline, duration)),
+ entry: Entry::new(deadline, duration),
}
}
@@ -27,12 +24,6 @@ impl Registration {
self.entry.time_ref().deadline
}
- pub(crate) fn register(&mut self) {
- if !self.entry.is_registered() {
- Entry::register(&mut self.entry)
- }
- }
-
pub(crate) fn reset(&mut self, deadline: Instant) {
unsafe {
self.entry.time_mut().deadline = deadline;
diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs
index 0aead136..6a9f25da 100644
--- a/tokio/src/time/tests/mod.rs
+++ b/tokio/src/time/tests/mod.rs
@@ -1,4 +1,24 @@
mod mock_clock;
-
mod test_delay;
mod test_queue;
+
+use crate::time::{self, Instant};
+use std::time::Duration;
+
+fn assert_send<T: Send>() {}
+fn assert_sync<T: Sync>() {}
+
+#[test]
+fn registration_is_send_and_sync() {
+ use crate::time::driver::Registration;
+
+ assert_send::<Registration>();
+ assert_sync::<Registration>();
+}
+
+#[test]
+#[should_panic]
+fn delay_is_eager() {
+ let when = Instant::now() + Duration::from_millis(100);
+ let _ = time::delay_until(when);
+}
diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs
index 8b52e0a3..43bfe379 100644
--- a/tokio/src/time/tests/test_delay.rs
+++ b/tokio/src/time/tests/test_delay.rs
@@ -181,29 +181,13 @@ fn delayed_delay_level_1() {
}
#[test]
+#[should_panic]
fn creating_delay_outside_of_context() {
let now = Instant::now();
// This creates a delay outside of the context of a mock timer. This tests
- // that it will still expire.
- let mut fut = task::spawn(delay_until(now + ms(500)));
-
- mock(|clock| {
- // This registers the delay with the timer
- assert_pending!(fut.poll());
-
- // Wait some time... the timer is cascading
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(448));
-
- assert_pending!(fut.poll());
-
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(500));
-
- // The delay has elapsed
- assert_ready!(fut.poll());
- });
+ // that it will panic.
+ let _fut = task::spawn(delay_until(now + ms(500)));
}
#[test]