summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/chan.rs6
-rw-r--r--tokio/src/sync/mutex.rs19
-rw-r--r--tokio/src/sync/oneshot.rs6
-rw-r--r--tokio/src/sync/rwlock.rs3
-rw-r--r--tokio/src/sync/semaphore.rs11
5 files changed, 35 insertions, 10 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2fc915d0..dc02dae2 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -265,6 +265,9 @@ where
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
@@ -424,6 +427,9 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
cx: &mut Context<'_>,
permit: &mut Permit,
) -> Poll<Result<(), ClosedError>> {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
permit
.poll_acquire(cx, 1, &self.0)
.map_err(|_| ClosedError::new())
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index 92218cbc..4aceb000 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -156,13 +156,18 @@ impl<T> Mutex<T> {
lock: self,
permit: semaphore::Permit::new(),
};
- poll_fn(|cx| guard.permit.poll_acquire(cx, 1, &self.s))
- .await
- .unwrap_or_else(|_| {
- // The semaphore was closed. but, we never explicitly close it, and we have a
- // handle to it through the Arc, which means that this can never happen.
- unreachable!()
- });
+ poll_fn(|cx| {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
+ guard.permit.poll_acquire(cx, 1, &self.s)
+ })
+ .await
+ .unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
guard
}
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
index 6c7b97cf..163a708d 100644
--- a/tokio/src/sync/oneshot.rs
+++ b/tokio/src/sync/oneshot.rs
@@ -196,6 +196,9 @@ impl<T> Sender<T> {
#[doc(hidden)] // TODO: remove
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
let inner = self.inner.as_ref().unwrap();
let mut state = State::load(&inner.state, Acquire);
@@ -544,6 +547,9 @@ impl<T> Inner<T> {
}
fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
// Load the state
let mut state = State::load(&self.state, Acquire);
diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs
index bd52f6d7..ccd0e884 100644
--- a/tokio/src/sync/rwlock.rs
+++ b/tokio/src/sync/rwlock.rs
@@ -119,6 +119,9 @@ impl<'a, T> ReleasingPermit<'a, T> {
cx: &mut Context<'_>,
s: &Semaphore,
) -> Poll<Result<(), AcquireError>> {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
self.permit.poll_acquire(cx, self.num_permits, s)
}
}
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index 7721e01f..ec43bc52 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -60,9 +60,14 @@ impl Semaphore {
sem: &self,
ll_permit: ll::Permit::new(),
};
- poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem))
- .await
- .unwrap();
+ poll_fn(|cx| {
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
+ permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem)
+ })
+ .await
+ .unwrap();
permit
}