diff options
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/mutex.rs | 19 | ||||
-rw-r--r-- | tokio/src/sync/oneshot.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/rwlock.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync/semaphore.rs | 11 |
5 files changed, 35 insertions, 10 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2fc915d0..dc02dae2 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -265,6 +265,9 @@ where pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { use super::block::Read::*; + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -424,6 +427,9 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll<Result<(), ClosedError>> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + permit .poll_acquire(cx, 1, &self.0) .map_err(|_| ClosedError::new()) diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 92218cbc..4aceb000 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -156,13 +156,18 @@ impl<T> Mutex<T> { lock: self, permit: semaphore::Permit::new(), }; - poll_fn(|cx| guard.permit.poll_acquire(cx, 1, &self.s)) - .await - .unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. - unreachable!() - }); + poll_fn(|cx| { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + guard.permit.poll_acquire(cx, 1, &self.s) + }) + .await + .unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); guard } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6c7b97cf..163a708d 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -196,6 +196,9 @@ impl<T> Sender<T> { #[doc(hidden)] // TODO: remove pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); @@ -544,6 +547,9 @@ impl<T> Inner<T> { } fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + // Load the state let mut state = State::load(&self.state, Acquire); diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index bd52f6d7..ccd0e884 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -119,6 +119,9 @@ impl<'a, T> ReleasingPermit<'a, T> { cx: &mut Context<'_>, s: &Semaphore, ) -> Poll<Result<(), AcquireError>> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + self.permit.poll_acquire(cx, self.num_permits, s) } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 7721e01f..ec43bc52 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -60,9 +60,14 @@ impl Semaphore { sem: &self, ll_permit: ll::Permit::new(), }; - poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem)) - .await - .unwrap(); + poll_fn(|cx| { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem) + }) + .await + .unwrap(); permit } |