summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorMichael P. Jung <michael.jung@terreon.de>2019-12-10 17:01:23 +0100
committerCarl Lerche <me@carllerche.com>2019-12-10 08:01:23 -0800
commit975576952f33c64e4faaa616f67ae9d6b596e4aa (patch)
tree879c74c5fcf04e17fffe0b33b9b98a15bf1d464a /tokio
parent5d5755dca4586f80d591e5d61de3f245bf8c4506 (diff)
Add Mutex::try_lock and (Unbounded)Receiver::try_recv (#1939)
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs17
-rw-r--r--tokio/src/sync/mpsc/chan.rs18
-rw-r--r--tokio/src/sync/mpsc/error.rs29
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs17
-rw-r--r--tokio/src/sync/mutex.rs34
-rw-r--r--tokio/tests/sync_mpsc.rs40
-rw-r--r--tokio/tests/sync_mutex.rs13
7 files changed, 164 insertions, 4 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index b2b2df9f..5cca1596 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,5 +1,5 @@
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
+use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
use crate::sync::semaphore;
use std::fmt;
@@ -150,6 +150,21 @@ impl<T> Receiver<T> {
self.chan.recv(cx)
}
+ /// Attempts to return a pending value on this receiver without blocking.
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with
+ /// a possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with recv, this function has two failure cases instead of
+ /// one (one for disconnection, one for an empty buffer).
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2cf0d055..b6e94d5a 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use crate::sync::mpsc::error::ClosedError;
+use crate::sync::mpsc::error::{ClosedError, TryRecvError};
use crate::sync::mpsc::{error, list};
use std::fmt;
@@ -306,6 +306,22 @@ where
}
})
}
+
+ /// Receive the next value without blocking
+ pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ use super::block::Read::*;
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+ match rx_fields.list.pop(&self.inner.tx) {
+ Some(Value(value)) => {
+ self.inner.semaphore.add_permit();
+ Ok(value)
+ }
+ Some(Closed) => Err(TryRecvError::Closed),
+ None => Err(TryRecvError::Empty),
+ }
+ })
+ }
}
impl<T, S> Drop for Rx<T, S>
diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs
index 6238f854..4b8d9054 100644
--- a/tokio/src/sync/mpsc/error.rs
+++ b/tokio/src/sync/mpsc/error.rs
@@ -65,6 +65,35 @@ impl fmt::Display for RecvError {
impl Error for RecvError {}
+// ===== TryRecvError =====
+
+/// This enumeration is the list of the possible reasons that try_recv
+/// could not return data when called.
+#[derive(Debug)]
+pub enum TryRecvError {
+ /// This channel is currently empty, but the Sender(s) have not yet
+ /// disconnected, so data may yet become available.
+ Empty,
+ /// The channel's sending half has been closed, and there will
+ /// never be any more data received on it.
+ Closed,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TryRecvError::Empty => "channel empty",
+ TryRecvError::Closed => "channel closed",
+ }
+ )
+ }
+}
+
+impl Error for TryRecvError {}
+
// ===== ClosedError =====
/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 07a173c2..4a6ba8ee 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::SendError;
+use crate::sync::mpsc::error::{SendError, TryRecvError};
use std::fmt;
use std::task::{Context, Poll};
@@ -123,6 +123,21 @@ impl<T> UnboundedReceiver<T> {
poll_fn(|cx| self.poll_recv(cx)).await
}
+ /// Attempts to return a pending value on this receiver without blocking.
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with
+ /// a possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with recv, this function has two failure cases instead of
+ /// one (one for disconnection, one for an empty buffer).
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index 07a5a63f..35e13300 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -38,6 +38,7 @@ use crate::future::poll_fn;
use crate::sync::semaphore;
use std::cell::UnsafeCell;
+use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
@@ -73,6 +74,30 @@ unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
+/// An enumeration of possible errors associated with a `TryLockResult`
+/// which can occur while trying to aquire a lock from the `try_lock`
+/// method on a `Mutex`.
+#[derive(Debug)]
+pub enum TryLockError {
+ /// The lock could not be acquired at this time because the operation
+ /// would otherwise block.
+ WouldBlock,
+}
+
+impl fmt::Display for TryLockError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TryLockError::WouldBlock => "operation would block"
+ }
+ )
+ }
+}
+
+impl Error for TryLockError {}
+
#[test]
#[cfg(not(loom))]
fn bounds() {
@@ -104,6 +129,15 @@ impl<T> Mutex<T> {
});
guard
}
+
+ /// Try to aquire the lock
+ pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
+ let mut permit = semaphore::Permit::new();
+ match permit.try_acquire(&self.s) {
+ Ok(_) => Ok(MutexGuard { lock: self, permit }),
+ Err(_) => Err(TryLockError::WouldBlock),
+ }
+ }
}
impl<'a, T> Drop for MutexGuard<'a, T> {
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index 127e7572..f906d357 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -2,7 +2,7 @@
#![cfg(feature = "full")]
use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -413,3 +413,41 @@ fn unconsumed_messages_are_dropped() {
assert_eq!(1, Arc::strong_count(&msg));
}
+
+#[test]
+fn try_recv() {
+ let (mut tx, mut rx) = mpsc::channel(1);
+ match rx.try_recv() {
+ Err(TryRecvError::Empty) => {}
+ _ => panic!(),
+ }
+ tx.try_send(42).unwrap();
+ match rx.try_recv() {
+ Ok(42) => {}
+ _ => panic!(),
+ }
+ drop(tx);
+ match rx.try_recv() {
+ Err(TryRecvError::Closed) => {}
+ _ => panic!(),
+ }
+}
+
+#[test]
+fn try_recv_unbounded() {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+ match rx.try_recv() {
+ Err(TryRecvError::Empty) => {}
+ _ => panic!(),
+ }
+ tx.send(42).unwrap();
+ match rx.try_recv() {
+ Ok(42) => {}
+ _ => panic!(),
+ }
+ drop(tx);
+ match rx.try_recv() {
+ Err(TryRecvError::Closed) => {}
+ _ => panic!(),
+ }
+}
diff --git a/tokio/tests/sync_mutex.rs b/tokio/tests/sync_mutex.rs
index daa6f1e9..d803bd59 100644
--- a/tokio/tests/sync_mutex.rs
+++ b/tokio/tests/sync_mutex.rs
@@ -134,3 +134,16 @@ async fn aborted_future_2() {
.await
.expect("Mutex is locked");
}
+
+#[test]
+fn try_lock() {
+ let m: Mutex<usize> = Mutex::new(0);
+ {
+ let g1 = m.try_lock();
+ assert_eq!(g1.is_ok(), true);
+ let g2 = m.try_lock();
+ assert_eq!(g2.is_ok(), false);
+ }
+ let g3 = m.try_lock();
+ assert_eq!(g3.is_ok(), true);
+}