summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/semaphore.rs')
-rw-r--r--tokio-sync/src/semaphore.rs73
1 files changed, 38 insertions, 35 deletions
diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs
index 43e89c37..33fd2318 100644
--- a/tokio-sync/src/semaphore.rs
+++ b/tokio-sync/src/semaphore.rs
@@ -6,21 +6,23 @@
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
-//! `NotReady`. The task is notified once a permit becomes available.
+//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::{
- futures::AtomicTask,
+ futures::AtomicWaker,
sync::{
atomic::{AtomicPtr, AtomicUsize},
CausalCell,
},
yield_now,
};
-use futures::Poll;
+
use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
@@ -80,8 +82,8 @@ struct WaiterNode {
/// See `NodeState` for more details.
state: AtomicUsize,
- /// Task to notify when a permit is made available.
- task: AtomicTask,
+ /// Task to wake when a permit is made available.
+ waker: AtomicWaker,
/// Next pointer in the queue of waiting senders.
next: AtomicPtr<WaiterNode>,
@@ -174,9 +176,10 @@ impl Semaphore {
}
/// Poll for a permit
- fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> {
- use futures::Async::*;
-
+ fn poll_permit(
+ &self,
+ mut permit: Option<(&mut Context<'_>, &mut Permit)>,
+ ) -> Poll<Result<(), AcquireError>> {
// Load the current state
let mut curr = SemState::load(&self.state, Acquire);
@@ -205,7 +208,7 @@ impl Semaphore {
if curr.is_closed() {
undo_strong!();
- return Err(AcquireError::closed());
+ return Ready(Err(AcquireError::closed()));
}
if !next.acquire_permit(&self.stub) {
@@ -214,13 +217,13 @@ impl Semaphore {
debug_assert!(curr.waiter().is_some());
if maybe_strong.is_none() {
- if let Some(ref mut permit) = permit {
+ if let Some((ref mut cx, ref mut permit)) = permit {
// Get the Sender's waiter node, or initialize one
let waiter = permit
.waiter
.get_or_insert_with(|| Arc::new(WaiterNode::new()));
- waiter.register();
+ waiter.register(cx);
debug!(" + poll_permit -- to_queued_waiting");
@@ -228,14 +231,14 @@ impl Semaphore {
debug!(" + poll_permit; waiter already queued");
// The node is alrady queued, there is no further work
// to do.
- return Ok(NotReady);
+ return Pending;
}
maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
} else {
// If no `waiter`, then the task is not registered and there
// is no further work to do.
- return Ok(NotReady);
+ return Pending;
}
}
@@ -261,14 +264,14 @@ impl Semaphore {
debug!(" + poll_permit -- waiter pushed");
- return Ok(NotReady);
+ return Pending;
}
None => {
debug!(" + poll_permit -- permit acquired");
undo_strong!();
- return Ok(Ready(()));
+ return Ready(Ok(()));
}
}
}
@@ -571,42 +574,42 @@ impl Permit {
/// Try to acquire the permit. If no permits are available, the current task
/// is notified once a new permit becomes available.
- pub fn poll_acquire(&mut self, semaphore: &Semaphore) -> Poll<(), AcquireError> {
- use futures::Async::*;
-
+ pub fn poll_acquire(
+ &mut self,
+ cx: &mut Context<'_>,
+ semaphore: &Semaphore,
+ ) -> Poll<Result<(), AcquireError>> {
match self.state {
PermitState::Idle => {}
PermitState::Waiting => {
let waiter = self.waiter.as_ref().unwrap();
- if waiter.acquire()? {
+ if waiter.acquire(cx)? {
self.state = PermitState::Acquired;
- return Ok(Ready(()));
+ return Ready(Ok(()));
} else {
- return Ok(NotReady);
+ return Pending;
}
}
PermitState::Acquired => {
- return Ok(Ready(()));
+ return Ready(Ok(()));
}
}
- match semaphore.poll_permit(Some(self))? {
+ match semaphore.poll_permit(Some((cx, self)))? {
Ready(v) => {
self.state = PermitState::Acquired;
- Ok(Ready(v))
+ Ready(Ok(v))
}
- NotReady => {
+ Pending => {
self.state = PermitState::Waiting;
- Ok(NotReady)
+ Pending
}
}
}
/// Try to acquire the permit.
pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
- use futures::Async::*;
-
match self.state {
PermitState::Idle => {}
PermitState::Waiting => {
@@ -629,7 +632,7 @@ impl Permit {
self.state = PermitState::Acquired;
Ok(())
}
- NotReady => Err(TryAcquireError::no_permits()),
+ Pending => Err(TryAcquireError::no_permits()),
}
}
@@ -748,17 +751,17 @@ impl WaiterNode {
fn new() -> WaiterNode {
WaiterNode {
state: AtomicUsize::new(NodeState::new().to_usize()),
- task: AtomicTask::new(),
+ waker: AtomicWaker::new(),
next: AtomicPtr::new(ptr::null_mut()),
}
}
- fn acquire(&self) -> Result<bool, AcquireError> {
+ fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> {
if self.acquire2()? {
return Ok(true);
}
- self.task.register();
+ self.waker.register_by_ref(cx.waker());
self.acquire2()
}
@@ -773,8 +776,8 @@ impl WaiterNode {
}
}
- fn register(&self) {
- self.task.register()
+ fn register(&self, cx: &mut Context<'_>) {
+ self.waker.register_by_ref(cx.waker())
}
/// Returns `true` if the permit has been acquired
@@ -860,7 +863,7 @@ impl WaiterNode {
Ok(_) => match curr {
QueuedWaiting => {
debug!(" + notify -- task notified");
- self.task.notify();
+ self.waker.wake();
return true;
}
other => {