summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/io/driver/scheduled_io.rs10
-rw-r--r--tokio/src/io/util/mem.rs10
-rw-r--r--tokio/src/loom/mocked.rs27
-rw-r--r--tokio/src/loom/std/mod.rs8
-rw-r--r--tokio/src/loom/std/mutex.rs31
-rw-r--r--tokio/src/loom/std/parking_lot.rs13
-rw-r--r--tokio/src/park/thread.rs6
-rw-r--r--tokio/src/runtime/basic_scheduler.rs25
-rw-r--r--tokio/src/runtime/blocking/pool.rs12
-rw-r--r--tokio/src/runtime/park.rs4
-rw-r--r--tokio/src/runtime/queue.rs10
-rw-r--r--tokio/src/runtime/thread_pool/idle.rs8
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs2
-rw-r--r--tokio/src/sync/batch_semaphore.rs19
-rw-r--r--tokio/src/sync/broadcast.rs12
-rw-r--r--tokio/src/sync/notify.rs10
-rw-r--r--tokio/src/util/slab.rs8
17 files changed, 127 insertions, 88 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 3e7d9570..88daeb2d 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -193,7 +193,7 @@ impl ScheduledIo {
}
pub(super) fn wake(&self, ready: mio::Ready) {
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// check for AsyncRead slot
if !(ready & (!mio::Ready::writable())).is_empty() {
@@ -241,7 +241,7 @@ impl ScheduledIo {
if ready.is_empty() {
// Update the task info
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
@@ -375,7 +375,7 @@ cfg_io_readiness! {
}
// Wasn't ready, take the lock (and check again while locked).
- let mut waiters = scheduled_io.waiters.lock().unwrap();
+ let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
@@ -408,7 +408,7 @@ cfg_io_readiness! {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
- let waiters = scheduled_io.waiters.lock().unwrap();
+ let waiters = scheduled_io.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
@@ -450,7 +450,7 @@ cfg_io_readiness! {
impl Drop for Readiness<'_> {
fn drop(&mut self) {
- let mut waiters = self.scheduled_io.waiters.lock().unwrap();
+ let mut waiters = self.scheduled_io.waiters.lock();
// Safety: `waiter` is only ever stored in `waiters`
unsafe {
diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs
index 1b9b37b7..0dd6ad77 100644
--- a/tokio/src/io/util/mem.rs
+++ b/tokio/src/io/util/mem.rs
@@ -100,7 +100,7 @@ impl AsyncRead for DuplexStream {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
- Pin::new(&mut *self.read.lock().unwrap()).poll_read(cx, buf)
+ Pin::new(&mut *self.read.lock()).poll_read(cx, buf)
}
}
@@ -111,7 +111,7 @@ impl AsyncWrite for DuplexStream {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
- Pin::new(&mut *self.write.lock().unwrap()).poll_write(cx, buf)
+ Pin::new(&mut *self.write.lock()).poll_write(cx, buf)
}
#[allow(unused_mut)]
@@ -119,7 +119,7 @@ impl AsyncWrite for DuplexStream {
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
- Pin::new(&mut *self.write.lock().unwrap()).poll_flush(cx)
+ Pin::new(&mut *self.write.lock()).poll_flush(cx)
}
#[allow(unused_mut)]
@@ -127,14 +127,14 @@ impl AsyncWrite for DuplexStream {
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
- Pin::new(&mut *self.write.lock().unwrap()).poll_shutdown(cx)
+ Pin::new(&mut *self.write.lock()).poll_shutdown(cx)
}
}
impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
- self.write.lock().unwrap().close();
+ self.write.lock().close();
}
}
diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs
index 78913952..367d59b4 100644
--- a/tokio/src/loom/mocked.rs
+++ b/tokio/src/loom/mocked.rs
@@ -1,5 +1,32 @@
pub(crate) use loom::*;
+pub(crate) mod sync {
+
+ pub(crate) use loom::sync::MutexGuard;
+
+ #[derive(Debug)]
+ pub(crate) struct Mutex<T>(loom::sync::Mutex<T>);
+
+ #[allow(dead_code)]
+ impl<T> Mutex<T> {
+ #[inline]
+ pub(crate) fn new(t: T) -> Mutex<T> {
+ Mutex(loom::sync::Mutex::new(t))
+ }
+
+ #[inline]
+ pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
+ self.0.lock().unwrap()
+ }
+
+ #[inline]
+ pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
+ self.0.try_lock().ok()
+ }
+ }
+ pub(crate) use loom::sync::*;
+}
+
pub(crate) mod rand {
pub(crate) fn seed() -> u64 {
1
diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs
index 15a15f0a..6492848e 100644
--- a/tokio/src/loom/std/mod.rs
+++ b/tokio/src/loom/std/mod.rs
@@ -6,6 +6,7 @@ mod atomic_u32;
mod atomic_u64;
mod atomic_u8;
mod atomic_usize;
+mod mutex;
#[cfg(feature = "parking_lot")]
mod parking_lot;
mod unsafe_cell;
@@ -62,9 +63,10 @@ pub(crate) mod sync {
#[cfg(not(feature = "parking_lot"))]
#[allow(unused_imports)]
- pub(crate) use std::sync::{
- Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult,
- };
+ pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult};
+
+ #[cfg(not(feature = "parking_lot"))]
+ pub(crate) use crate::loom::std::mutex::Mutex;
pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
diff --git a/tokio/src/loom/std/mutex.rs b/tokio/src/loom/std/mutex.rs
new file mode 100644
index 00000000..bf14d624
--- /dev/null
+++ b/tokio/src/loom/std/mutex.rs
@@ -0,0 +1,31 @@
+use std::sync::{self, MutexGuard, TryLockError};
+
+/// Adapter for `std::Mutex` that removes the poisoning aspects
+// from its api
+#[derive(Debug)]
+pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);
+
+#[allow(dead_code)]
+impl<T> Mutex<T> {
+ #[inline]
+ pub(crate) fn new(t: T) -> Mutex<T> {
+ Mutex(sync::Mutex::new(t))
+ }
+
+ #[inline]
+ pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
+ match self.0.lock() {
+ Ok(guard) => guard,
+ Err(p_err) => p_err.into_inner(),
+ }
+ }
+
+ #[inline]
+ pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
+ match self.0.try_lock() {
+ Ok(guard) => Some(guard),
+ Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
+ Err(TryLockError::WouldBlock) => None,
+ }
+ }
+}
diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs
index 41c47ddd..c03190fe 100644
--- a/tokio/src/loom/std/parking_lot.rs
+++ b/tokio/src/loom/std/parking_lot.rs
@@ -3,7 +3,7 @@
//!
//! This can be extended to additional types/methods as required.
-use std::sync::{LockResult, TryLockError, TryLockResult};
+use std::sync::LockResult;
use std::time::Duration;
// Types that do not need wrapping
@@ -34,16 +34,13 @@ impl<T> Mutex<T> {
}
#[inline]
- pub(crate) fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
- Ok(self.0.lock())
+ pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
+ self.0.lock()
}
#[inline]
- pub(crate) fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
- match self.0.try_lock() {
- Some(guard) => Ok(guard),
- None => Err(TryLockError::WouldBlock),
- }
+ pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
+ self.0.try_lock()
}
// Note: Additional methods `is_poisoned` and `into_inner`, can be
diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs
index 9ed41310..494c02b4 100644
--- a/tokio/src/park/thread.rs
+++ b/tokio/src/park/thread.rs
@@ -87,7 +87,7 @@ impl Inner {
}
// Otherwise we need to coordinate going to sleep
- let mut m = self.mutex.lock().unwrap();
+ let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
@@ -137,7 +137,7 @@ impl Inner {
return;
}
- let m = self.mutex.lock().unwrap();
+ let m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
@@ -188,7 +188,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
- drop(self.mutex.lock().unwrap());
+ drop(self.mutex.lock());
self.condvar.notify_one()
}
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 0c0e95a6..60fe92c3 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -10,7 +10,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::sync::{Arc, PoisonError};
+use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
@@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> {
}
fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
- let inner = self.inner.lock().unwrap().take()?;
+ let inner = self.inner.lock().take()?;
Some(InnerGuard {
inner: Some(inner),
@@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.
- let mut inner = match self
- .inner
- .lock()
- .unwrap_or_else(PoisonError::into_inner)
- .take()
- {
+ let mut inner = match self.inner.lock().take() {
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
@@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
}
// Drain remote queue
- for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
+ for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
}
@@ -339,7 +334,7 @@ impl Spawner {
}
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
- self.shared.queue.lock().unwrap().pop_front()
+ self.shared.queue.lock().pop_front()
}
fn waker_ref(&self) -> WakerRef<'_> {
@@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
- self.queue.lock().unwrap().push_back(task);
+ self.queue.lock().push_back(task);
self.unpark.unpark();
}
});
@@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> {
impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
- // We can ignore the poison error here since we are
- // just replacing the state.
- let mut lock = self
- .basic_scheduler
- .inner
- .lock()
- .unwrap_or_else(PoisonError::into_inner);
+ let mut lock = self.basic_scheduler.inner.lock();
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 633021ed..df0175b1 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -129,7 +129,7 @@ impl BlockingPool {
}
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
- let mut shared = self.spawner.inner.shared.lock().unwrap();
+ let mut shared = self.spawner.inner.shared.lock();
// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
@@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
if shared.shutdown {
// Shutdown the task
@@ -207,7 +207,7 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();
let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
@@ -251,7 +251,7 @@ impl Inner {
f()
}
- let mut shared = self.shared.lock().unwrap();
+ let mut shared = self.shared.lock();
'main: loop {
// BUSY
@@ -259,7 +259,7 @@ impl Inner {
drop(shared);
task.run();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// IDLE
@@ -296,7 +296,7 @@ impl Inner {
drop(shared);
task.shutdown();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs
index c994c935..033b9f20 100644
--- a/tokio/src/runtime/park.rs
+++ b/tokio/src/runtime/park.rs
@@ -142,7 +142,7 @@ impl Inner {
fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
- let mut m = self.mutex.lock().unwrap();
+ let mut m = self.mutex.lock();
match self
.state
@@ -238,7 +238,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
- drop(self.mutex.lock().unwrap());
+ drop(self.mutex.lock());
self.condvar.notify_one()
}
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs
index c654514b..cdf4009c 100644
--- a/tokio/src/runtime/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> {
/// Close the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(super) fn close(&self) -> bool {
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
return false;
@@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> {
}
pub(super) fn is_closed(&self) -> bool {
- self.pointers.lock().unwrap().is_closed
+ self.pointers.lock().is_closed
}
pub(super) fn len(&self) -> usize {
@@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> {
/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>) {
// Acquire queue lock
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
@@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> {
debug_assert!(get_next(batch_tail).is_none());
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
@@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> {
return None;
}
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
// It is possible to hit null here if another thread poped the last
// task between us checking `len` and acquiring the lock.
diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs
index ae87ca4b..6e692fd8 100644
--- a/tokio/src/runtime/thread_pool/idle.rs
+++ b/tokio/src/runtime/thread_pool/idle.rs
@@ -55,7 +55,7 @@ impl Idle {
}
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Check again, now that the lock is acquired
if !self.notify_should_wakeup() {
@@ -77,7 +77,7 @@ impl Idle {
/// work.
pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Decrement the number of unparked threads
let ret = State::dec_num_unparked(&self.state, is_searching);
@@ -112,7 +112,7 @@ impl Idle {
/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
@@ -128,7 +128,7 @@ impl Idle {
/// Returns `true` if `worker_id` is contained in the sleep set
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
- let sleepers = self.sleepers.lock().unwrap();
+ let sleepers = self.sleepers.lock();
sleepers.contains(&worker_id)
}
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 10e973f6..c88f9954 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -783,7 +783,7 @@ impl Shared {
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock().unwrap();
+ let mut workers = self.shutdown_workers.lock();
workers.push((core, worker));
if workers.len() != self.remotes.len() {
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 9f324f9c..6305485b 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -158,7 +158,7 @@ impl Semaphore {
}
// Assign permits to the wait queue
- self.add_permits_locked(added, self.waiters.lock().unwrap());
+ self.add_permits_locked(added, self.waiters.lock());
}
/// Closes the semaphore. This prevents the semaphore from issuing new
@@ -166,7 +166,7 @@ impl Semaphore {
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
pub(crate) fn close(&self) {
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
// unqueued waiter to acquire all the permits it needs immediately,
// it won't touch the wait list. Therefore, we have to set a bit on
@@ -231,7 +231,7 @@ impl Semaphore {
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
- let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
+ let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
'inner: for slot in &mut wakers[..] {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
@@ -324,7 +324,7 @@ impl Semaphore {
// counter. Otherwise, if we subtract the permits and then
// acquire the lock, we might miss additional permits being
// added while waiting for the lock.
- lock = Some(self.waiters.lock().unwrap());
+ lock = Some(self.waiters.lock());
}
match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
@@ -334,7 +334,7 @@ impl Semaphore {
if !queued {
return Ready(Ok(()));
} else if lock.is_none() {
- break self.waiters.lock().unwrap();
+ break self.waiters.lock();
}
}
break lock.expect("lock must be acquired before waiting");
@@ -484,14 +484,7 @@ impl Drop for Acquire<'_> {
// This is where we ensure safety. The future is being dropped,
// which means we must ensure that the waiter entry is no longer stored
// in the linked list.
- let mut waiters = match self.semaphore.waiters.lock() {
- Ok(lock) => lock,
- // Removing the node from the linked list is necessary to ensure
- // safety. Even if the lock was poisoned, we need to make sure it is
- // removed from the linked list before dropping it --- otherwise,
- // the list will contain a dangling pointer to this node.
- Err(e) => e.into_inner(),
- };
+ let mut waiters = self.semaphore.waiters.lock();
// remove the entry from the list
let node = NonNull::from(&mut self.node);
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 9484e130..fe826290 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -529,7 +529,7 @@ impl<T> Sender<T> {
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
- let mut tail = shared.tail.lock().unwrap();
+ let mut tail = shared.tail.lock();
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
@@ -584,12 +584,12 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn receiver_count(&self) -> usize {
- let tail = self.shared.tail.lock().unwrap();
+ let tail = self.shared.tail.lock();
tail.rx_cnt
}
fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
if tail.rx_cnt == 0 {
return Err(SendError(value));
@@ -695,7 +695,7 @@ impl<T> Receiver<T> {
// the slot lock.
drop(slot);
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
// Acquire slot lock again
slot = self.shared.buffer[idx].read().unwrap();
@@ -979,7 +979,7 @@ where
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
if let Some(waiter) = &self.waiter {
// safety: tail lock is held
@@ -1142,7 +1142,7 @@ where
fn drop(&mut self) {
// Acquire the tail lock. This is required for safety before accessing
// the waiter node.
- let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap();
+ let mut tail = self.receiver.as_mut().shared.tail.lock();
// safety: tail lock is held
let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index d319e8aa..17117bfe 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -306,7 +306,7 @@ impl Notify {
}
// There are waiters, the lock must be acquired to notify.
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
@@ -321,7 +321,7 @@ impl Notify {
/// Notifies all waiting tasks
pub(crate) fn notify_waiters(&self) {
// There are waiters, the lock must be acquired to notify.
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
@@ -452,7 +452,7 @@ impl Future for Notified<'_> {
// Acquire the lock and attempt to transition to the waiting
// state.
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);
@@ -516,7 +516,7 @@ impl Future for Notified<'_> {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
- let waiters = notify.waiters.lock().unwrap();
+ let waiters = notify.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
@@ -564,7 +564,7 @@ impl Drop for Notified<'_> {
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// `Notify.state` may be in any of the three states (Empty, Waiting,
// Notified). It doesn't actually matter what the atomic is set to
diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs
index 854232c2..aa1e2362 100644
--- a/tokio/src/util/slab.rs
+++ b/tokio/src/util/slab.rs
@@ -278,7 +278,7 @@ impl<T> Slab<T> {
}
let mut slots = match page.slots.try_lock() {
- Ok(slots) => slots,
+ Some(slots) => slots,
// If the lock cannot be acquired due to being held by another
// thread, don't try to compact the page.
_ => continue,
@@ -376,7 +376,7 @@ impl<T: Entry> Page<T> {
}
// Allocating objects requires synchronization
- let mut locked = me.slots.lock().unwrap();
+ let mut locked = me.slots.lock();
if locked.head < locked.slots.len() {
// Re-use an already initialized slot.
@@ -471,7 +471,7 @@ impl<T> Default for Page<T> {
impl<T> Page<T> {
/// Release a slot into the page's free list
fn release(&self, value: *const Value<T>) {
- let mut locked = self.slots.lock().unwrap();
+ let mut locked = self.slots.lock();
let idx = locked.index_for(value);
locked.slots[idx].next = locked.head as u32;
@@ -485,7 +485,7 @@ impl<T> Page<T> {
impl<T> CachedPage<T> {
/// Refresh the cache
fn refresh(&mut self, page: &Page<T>) {
- let slots = page.slots.lock().unwrap();
+ let slots = page.slots.lock();
self.slots = slots.slots.as_ptr();
self.init = slots.slots.len();
}