summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorMichael P. Jung <michael.jung@terreon.de>2019-12-10 17:01:23 +0100
committerCarl Lerche <me@carllerche.com>2019-12-10 08:01:23 -0800
commit975576952f33c64e4faaa616f67ae9d6b596e4aa (patch)
tree879c74c5fcf04e17fffe0b33b9b98a15bf1d464a /tokio/src/sync
parent5d5755dca4586f80d591e5d61de3f245bf8c4506 (diff)
Add Mutex::try_lock and (Unbounded)Receiver::try_recv (#1939)
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs17
-rw-r--r--tokio/src/sync/mpsc/chan.rs18
-rw-r--r--tokio/src/sync/mpsc/error.rs29
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs17
-rw-r--r--tokio/src/sync/mutex.rs34
5 files changed, 112 insertions, 3 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index b2b2df9f..5cca1596 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,5 +1,5 @@
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
+use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
use crate::sync::semaphore;
use std::fmt;
@@ -150,6 +150,21 @@ impl<T> Receiver<T> {
self.chan.recv(cx)
}
+ /// Attempts to return a pending value on this receiver without blocking.
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with
+ /// a possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with recv, this function has two failure cases instead of
+ /// one (one for disconnection, one for an empty buffer).
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2cf0d055..b6e94d5a 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use crate::sync::mpsc::error::ClosedError;
+use crate::sync::mpsc::error::{ClosedError, TryRecvError};
use crate::sync::mpsc::{error, list};
use std::fmt;
@@ -306,6 +306,22 @@ where
}
})
}
+
+ /// Receive the next value without blocking
+ pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ use super::block::Read::*;
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+ match rx_fields.list.pop(&self.inner.tx) {
+ Some(Value(value)) => {
+ self.inner.semaphore.add_permit();
+ Ok(value)
+ }
+ Some(Closed) => Err(TryRecvError::Closed),
+ None => Err(TryRecvError::Empty),
+ }
+ })
+ }
}
impl<T, S> Drop for Rx<T, S>
diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs
index 6238f854..4b8d9054 100644
--- a/tokio/src/sync/mpsc/error.rs
+++ b/tokio/src/sync/mpsc/error.rs
@@ -65,6 +65,35 @@ impl fmt::Display for RecvError {
impl Error for RecvError {}
+// ===== TryRecvError =====
+
+/// This enumeration is the list of the possible reasons that try_recv
+/// could not return data when called.
+#[derive(Debug)]
+pub enum TryRecvError {
+ /// This channel is currently empty, but the Sender(s) have not yet
+ /// disconnected, so data may yet become available.
+ Empty,
+ /// The channel's sending half has been closed, and there will
+ /// never be any more data received on it.
+ Closed,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TryRecvError::Empty => "channel empty",
+ TryRecvError::Closed => "channel closed",
+ }
+ )
+ }
+}
+
+impl Error for TryRecvError {}
+
// ===== ClosedError =====
/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 07a173c2..4a6ba8ee 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::SendError;
+use crate::sync::mpsc::error::{SendError, TryRecvError};
use std::fmt;
use std::task::{Context, Poll};
@@ -123,6 +123,21 @@ impl<T> UnboundedReceiver<T> {
poll_fn(|cx| self.poll_recv(cx)).await
}
+ /// Attempts to return a pending value on this receiver without blocking.
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with
+ /// a possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with recv, this function has two failure cases instead of
+ /// one (one for disconnection, one for an empty buffer).
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index 07a5a63f..35e13300 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -38,6 +38,7 @@ use crate::future::poll_fn;
use crate::sync::semaphore;
use std::cell::UnsafeCell;
+use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
@@ -73,6 +74,30 @@ unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
+/// An enumeration of possible errors associated with a `TryLockResult`
+/// which can occur while trying to aquire a lock from the `try_lock`
+/// method on a `Mutex`.
+#[derive(Debug)]
+pub enum TryLockError {
+ /// The lock could not be acquired at this time because the operation
+ /// would otherwise block.
+ WouldBlock,
+}
+
+impl fmt::Display for TryLockError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TryLockError::WouldBlock => "operation would block"
+ }
+ )
+ }
+}
+
+impl Error for TryLockError {}
+
#[test]
#[cfg(not(loom))]
fn bounds() {
@@ -104,6 +129,15 @@ impl<T> Mutex<T> {
});
guard
}
+
+ /// Try to aquire the lock
+ pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
+ let mut permit = semaphore::Permit::new();
+ match permit.try_acquire(&self.s) {
+ Ok(_) => Ok(MutexGuard { lock: self, permit }),
+ Err(_) => Err(TryLockError::WouldBlock),
+ }
+ }
}
impl<'a, T> Drop for MutexGuard<'a, T> {