diff options
author | Michael P. Jung <michael.jung@terreon.de> | 2019-12-10 17:01:23 +0100 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-10 08:01:23 -0800 |
commit | 975576952f33c64e4faaa616f67ae9d6b596e4aa (patch) | |
tree | 879c74c5fcf04e17fffe0b33b9b98a15bf1d464a /tokio | |
parent | 5d5755dca4586f80d591e5d61de3f245bf8c4506 (diff) |
Add Mutex::try_lock and (Unbounded)Receiver::try_recv (#1939)
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 17 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 18 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/error.rs | 29 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 17 | ||||
-rw-r--r-- | tokio/src/sync/mutex.rs | 34 | ||||
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 40 | ||||
-rw-r--r-- | tokio/tests/sync_mutex.rs | 13 |
7 files changed, 164 insertions, 4 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index b2b2df9f..5cca1596 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,5 +1,5 @@ use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError}; +use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; use crate::sync::semaphore; use std::fmt; @@ -150,6 +150,21 @@ impl<T> Receiver<T> { self.chan.recv(cx) } + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with + /// a possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with recv, this function has two failure cases instead of + /// one (one for disconnection, one for an empty buffer). + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + self.chan.try_recv() + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2cf0d055..b6e94d5a 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use crate::sync::mpsc::error::ClosedError; +use crate::sync::mpsc::error::{ClosedError, TryRecvError}; use crate::sync::mpsc::{error, list}; use std::fmt; @@ -306,6 +306,22 @@ where } }) } + + /// Receive the next value without blocking + pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> { + use super::block::Read::*; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + Ok(value) + } + Some(Closed) => Err(TryRecvError::Closed), + None => Err(TryRecvError::Empty), + } + }) + } } impl<T, S> Drop for Rx<T, S> diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs index 6238f854..4b8d9054 100644 --- a/tokio/src/sync/mpsc/error.rs +++ b/tokio/src/sync/mpsc/error.rs @@ -65,6 +65,35 @@ impl fmt::Display for RecvError { impl Error for RecvError {} +// ===== TryRecvError ===== + +/// This enumeration is the list of the possible reasons that try_recv +/// could not return data when called. +#[derive(Debug)] +pub enum TryRecvError { + /// This channel is currently empty, but the Sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// The channel's sending half has been closed, and there will + /// never be any more data received on it. + Closed, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TryRecvError::Empty => "channel empty", + TryRecvError::Closed => "channel closed", + } + ) + } +} + +impl Error for TryRecvError {} + // ===== ClosedError ===== /// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 07a173c2..4a6ba8ee 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,6 +1,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::SendError; +use crate::sync::mpsc::error::{SendError, TryRecvError}; use std::fmt; use std::task::{Context, Poll}; @@ -123,6 +123,21 @@ impl<T> UnboundedReceiver<T> { poll_fn(|cx| self.poll_recv(cx)).await } + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with + /// a possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with recv, this function has two failure cases instead of + /// one (one for disconnection, one for an empty buffer). + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + self.chan.try_recv() + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 07a5a63f..35e13300 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -38,6 +38,7 @@ use crate::future::poll_fn; use crate::sync::semaphore; use std::cell::UnsafeCell; +use std::error::Error; use std::fmt; use std::ops::{Deref, DerefMut}; @@ -73,6 +74,30 @@ unsafe impl<T> Send for Mutex<T> where T: Send {} unsafe impl<T> Sync for Mutex<T> where T: Send {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} +/// An enumeration of possible errors associated with a `TryLockResult` +/// which can occur while trying to aquire a lock from the `try_lock` +/// method on a `Mutex`. +#[derive(Debug)] +pub enum TryLockError { + /// The lock could not be acquired at this time because the operation + /// would otherwise block. + WouldBlock, +} + +impl fmt::Display for TryLockError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TryLockError::WouldBlock => "operation would block" + } + ) + } +} + +impl Error for TryLockError {} + #[test] #[cfg(not(loom))] fn bounds() { @@ -104,6 +129,15 @@ impl<T> Mutex<T> { }); guard } + + /// Try to aquire the lock + pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> { + let mut permit = semaphore::Permit::new(); + match permit.try_acquire(&self.s) { + Ok(_) => Ok(MutexGuard { lock: self, permit }), + Err(_) => Err(TryLockError::WouldBlock), + } + } } impl<'a, T> Drop for MutexGuard<'a, T> { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 127e7572..f906d357 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -2,7 +2,7 @@ #![cfg(feature = "full")] use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -413,3 +413,41 @@ fn unconsumed_messages_are_dropped() { assert_eq!(1, Arc::strong_count(&msg)); } + +#[test] +fn try_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.try_send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} + +#[test] +fn try_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} diff --git a/tokio/tests/sync_mutex.rs b/tokio/tests/sync_mutex.rs index daa6f1e9..d803bd59 100644 --- a/tokio/tests/sync_mutex.rs +++ b/tokio/tests/sync_mutex.rs @@ -134,3 +134,16 @@ async fn aborted_future_2() { .await .expect("Mutex is locked"); } + +#[test] +fn try_lock() { + let m: Mutex<usize> = Mutex::new(0); + { + let g1 = m.try_lock(); + assert_eq!(g1.is_ok(), true); + let g2 = m.try_lock(); + assert_eq!(g2.is_ok(), false); + } + let g3 = m.try_lock(); + assert_eq!(g3.is_ok(), true); +} |