diff options
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 10 | ||||
-rw-r--r-- | tokio/src/io/util/mem.rs | 10 | ||||
-rw-r--r-- | tokio/src/loom/mocked.rs | 27 | ||||
-rw-r--r-- | tokio/src/loom/std/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/loom/std/mutex.rs | 31 | ||||
-rw-r--r-- | tokio/src/loom/std/parking_lot.rs | 13 | ||||
-rw-r--r-- | tokio/src/park/thread.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 25 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 12 | ||||
-rw-r--r-- | tokio/src/runtime/park.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/queue.rs | 10 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/idle.rs | 8 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/batch_semaphore.rs | 19 | ||||
-rw-r--r-- | tokio/src/sync/broadcast.rs | 12 | ||||
-rw-r--r-- | tokio/src/sync/notify.rs | 10 | ||||
-rw-r--r-- | tokio/src/util/slab.rs | 8 |
17 files changed, 127 insertions, 88 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 3e7d9570..88daeb2d 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -193,7 +193,7 @@ impl ScheduledIo { } pub(super) fn wake(&self, ready: mio::Ready) { - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // check for AsyncRead slot if !(ready & (!mio::Ready::writable())).is_empty() { @@ -241,7 +241,7 @@ impl ScheduledIo { if ready.is_empty() { // Update the task info - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); let slot = match direction { Direction::Read => &mut waiters.reader, Direction::Write => &mut waiters.writer, @@ -375,7 +375,7 @@ cfg_io_readiness! { } // Wasn't ready, take the lock (and check again while locked). - let mut waiters = scheduled_io.waiters.lock().unwrap(); + let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); @@ -408,7 +408,7 @@ cfg_io_readiness! { // `notify.waiters`). In order to access the waker fields, // we must hold the lock. - let waiters = scheduled_io.waiters.lock().unwrap(); + let waiters = scheduled_io.waiters.lock(); // Safety: called while locked let w = unsafe { &mut *waiter.get() }; @@ -450,7 +450,7 @@ cfg_io_readiness! { impl Drop for Readiness<'_> { fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock().unwrap(); + let mut waiters = self.scheduled_io.waiters.lock(); // Safety: `waiter` is only ever stored in `waiters` unsafe { diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 1b9b37b7..0dd6ad77 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -100,7 +100,7 @@ impl AsyncRead for DuplexStream { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { - Pin::new(&mut *self.read.lock().unwrap()).poll_read(cx, buf) + Pin::new(&mut *self.read.lock()).poll_read(cx, buf) } } @@ -111,7 +111,7 @@ impl AsyncWrite for DuplexStream { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll<std::io::Result<usize>> { - Pin::new(&mut *self.write.lock().unwrap()).poll_write(cx, buf) + Pin::new(&mut *self.write.lock()).poll_write(cx, buf) } #[allow(unused_mut)] @@ -119,7 +119,7 @@ impl AsyncWrite for DuplexStream { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<std::io::Result<()>> { - Pin::new(&mut *self.write.lock().unwrap()).poll_flush(cx) + Pin::new(&mut *self.write.lock()).poll_flush(cx) } #[allow(unused_mut)] @@ -127,14 +127,14 @@ impl AsyncWrite for DuplexStream { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<std::io::Result<()>> { - Pin::new(&mut *self.write.lock().unwrap()).poll_shutdown(cx) + Pin::new(&mut *self.write.lock()).poll_shutdown(cx) } } impl Drop for DuplexStream { fn drop(&mut self) { // notify the other side of the closure - self.write.lock().unwrap().close(); + self.write.lock().close(); } } diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index 78913952..367d59b4 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -1,5 +1,32 @@ pub(crate) use loom::*; +pub(crate) mod sync { + + pub(crate) use loom::sync::MutexGuard; + + #[derive(Debug)] + pub(crate) struct Mutex<T>(loom::sync::Mutex<T>); + + #[allow(dead_code)] + impl<T> Mutex<T> { + #[inline] + pub(crate) fn new(t: T) -> Mutex<T> { + Mutex(loom::sync::Mutex::new(t)) + } + + #[inline] + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + self.0.lock().unwrap() + } + + #[inline] + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + self.0.try_lock().ok() + } + } + pub(crate) use loom::sync::*; +} + pub(crate) mod rand { pub(crate) fn seed() -> u64 { 1 diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 15a15f0a..6492848e 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -6,6 +6,7 @@ mod atomic_u32; mod atomic_u64; mod atomic_u8; mod atomic_usize; +mod mutex; #[cfg(feature = "parking_lot")] mod parking_lot; mod unsafe_cell; @@ -62,9 +63,10 @@ pub(crate) mod sync { #[cfg(not(feature = "parking_lot"))] #[allow(unused_imports)] - pub(crate) use std::sync::{ - Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult, - }; + pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult}; + + #[cfg(not(feature = "parking_lot"))] + pub(crate) use crate::loom::std::mutex::Mutex; pub(crate) mod atomic { pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; diff --git a/tokio/src/loom/std/mutex.rs b/tokio/src/loom/std/mutex.rs new file mode 100644 index 00000000..bf14d624 --- /dev/null +++ b/tokio/src/loom/std/mutex.rs @@ -0,0 +1,31 @@ +use std::sync::{self, MutexGuard, TryLockError}; + +/// Adapter for `std::Mutex` that removes the poisoning aspects +// from its api +#[derive(Debug)] +pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>); + +#[allow(dead_code)] +impl<T> Mutex<T> { + #[inline] + pub(crate) fn new(t: T) -> Mutex<T> { + Mutex(sync::Mutex::new(t)) + } + + #[inline] + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + match self.0.lock() { + Ok(guard) => guard, + Err(p_err) => p_err.into_inner(), + } + } + + #[inline] + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + match self.0.try_lock() { + Ok(guard) => Some(guard), + Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()), + Err(TryLockError::WouldBlock) => None, + } + } +} diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index 41c47ddd..c03190fe 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -3,7 +3,7 @@ //! //! This can be extended to additional types/methods as required. -use std::sync::{LockResult, TryLockError, TryLockResult}; +use std::sync::LockResult; use std::time::Duration; // Types that do not need wrapping @@ -34,16 +34,13 @@ impl<T> Mutex<T> { } #[inline] - pub(crate) fn lock(&self) -> LockResult<MutexGuard<'_, T>> { - Ok(self.0.lock()) + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + self.0.lock() } #[inline] - pub(crate) fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> { - match self.0.try_lock() { - Some(guard) => Ok(guard), - None => Err(TryLockError::WouldBlock), - } + pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> { + self.0.try_lock() } // Note: Additional methods `is_poisoned` and `into_inner`, can be diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 9ed41310..494c02b4 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -87,7 +87,7 @@ impl Inner { } // Otherwise we need to coordinate going to sleep - let mut m = self.mutex.lock().unwrap(); + let mut m = self.mutex.lock(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} @@ -137,7 +137,7 @@ impl Inner { return; } - let m = self.mutex.lock().unwrap(); + let m = self.mutex.lock(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} @@ -188,7 +188,7 @@ impl Inner { // Releasing `lock` before the call to `notify_one` means that when the // parked thread wakes it doesn't get woken only to have to wait for us // to release `lock`. - drop(self.mutex.lock().unwrap()); + drop(self.mutex.lock()); self.condvar.notify_one() } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 0c0e95a6..60fe92c3 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -10,7 +10,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, PoisonError}; +use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; @@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> { } fn take_inner(&self) -> Option<InnerGuard<'_, P>> { - let inner = self.inner.lock().unwrap().take()?; + let inner = self.inner.lock().take()?; Some(InnerGuard { inner: Some(inner), @@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> { // Avoid a double panic if we are currently panicking and // the lock may be poisoned. - let mut inner = match self - .inner - .lock() - .unwrap_or_else(PoisonError::into_inner) - .take() - { + let mut inner = match self.inner.lock().take() { Some(inner) => inner, None if std::thread::panicking() => return, None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), @@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> { } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + for task in scheduler.spawner.shared.queue.lock().drain(..) { task.shutdown(); } @@ -339,7 +334,7 @@ impl Spawner { } fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { - self.shared.queue.lock().unwrap().pop_front() + self.shared.queue.lock().pop_front() } fn waker_ref(&self) -> WakerRef<'_> { @@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); + self.queue.lock().push_back(task); self.unpark.unpark(); } }); @@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> { impl<P: Park> Drop for InnerGuard<'_, P> { fn drop(&mut self) { if let Some(scheduler) = self.inner.take() { - // We can ignore the poison error here since we are - // just replacing the state. - let mut lock = self - .basic_scheduler - .inner - .lock() - .unwrap_or_else(PoisonError::into_inner); + let mut lock = self.basic_scheduler.inner.lock(); // Replace old scheduler back into the state to allow // other threads to pick it up and drive it. diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 633021ed..df0175b1 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -129,7 +129,7 @@ impl BlockingPool { } pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { - let mut shared = self.spawner.inner.shared.lock().unwrap(); + let mut shared = self.spawner.inner.shared.lock(); // The function can be called multiple times. First, by explicitly // calling `shutdown` then by the drop handler calling `shutdown`. This @@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool { impl Spawner { pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); if shared.shutdown { // Shutdown the task @@ -207,7 +207,7 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); let entry = shared.worker_threads.vacant_entry(); let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); @@ -251,7 +251,7 @@ impl Inner { f() } - let mut shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock(); 'main: loop { // BUSY @@ -259,7 +259,7 @@ impl Inner { drop(shared); task.run(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // IDLE @@ -296,7 +296,7 @@ impl Inner { drop(shared); task.shutdown(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // Work was produced, and we "took" it (by decrementing num_notify). diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index c994c935..033b9f20 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -142,7 +142,7 @@ impl Inner { fn park_condvar(&self) { // Otherwise we need to coordinate going to sleep - let mut m = self.mutex.lock().unwrap(); + let mut m = self.mutex.lock(); match self .state @@ -238,7 +238,7 @@ impl Inner { // Releasing `lock` before the call to `notify_one` means that when the // parked thread wakes it doesn't get woken only to have to wait for us // to release `lock`. - drop(self.mutex.lock().unwrap()); + drop(self.mutex.lock()); self.condvar.notify_one() } diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index c654514b..cdf4009c 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> { /// Close the injection queue, returns `true` if the queue is open when the /// transition is made. pub(super) fn close(&self) -> bool { - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { return false; @@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> { } pub(super) fn is_closed(&self) -> bool { - self.pointers.lock().unwrap().is_closed + self.pointers.lock().is_closed } pub(super) fn len(&self) -> usize { @@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> { /// Pushes a value into the queue. pub(super) fn push(&self, task: task::Notified<T>) { // Acquire queue lock - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { // Drop the mutex to avoid a potential deadlock when @@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> { debug_assert!(get_next(batch_tail).is_none()); - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if let Some(tail) = p.tail { set_next(tail, Some(batch_head)); @@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> { return None; } - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); // It is possible to hit null here if another thread poped the last // task between us checking `len` and acquiring the lock. diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index ae87ca4b..6e692fd8 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -55,7 +55,7 @@ impl Idle { } // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Check again, now that the lock is acquired if !self.notify_should_wakeup() { @@ -77,7 +77,7 @@ impl Idle { /// work. pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Decrement the number of unparked threads let ret = State::dec_num_unparked(&self.state, is_searching); @@ -112,7 +112,7 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { if sleepers[index] == worker_id { @@ -128,7 +128,7 @@ impl Idle { /// Returns `true` if `worker_id` is contained in the sleep set pub(super) fn is_parked(&self, worker_id: usize) -> bool { - let sleepers = self.sleepers.lock().unwrap(); + let sleepers = self.sleepers.lock(); sleepers.contains(&worker_id) } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 10e973f6..c88f9954 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -783,7 +783,7 @@ impl Shared { /// /// If all workers have reached this point, the final cleanup is performed. fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { - let mut workers = self.shutdown_workers.lock().unwrap(); + let mut workers = self.shutdown_workers.lock(); workers.push((core, worker)); if workers.len() != self.remotes.len() { diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 9f324f9c..6305485b 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -158,7 +158,7 @@ impl Semaphore { } // Assign permits to the wait queue - self.add_permits_locked(added, self.waiters.lock().unwrap()); + self.add_permits_locked(added, self.waiters.lock()); } /// Closes the semaphore. This prevents the semaphore from issuing new @@ -166,7 +166,7 @@ impl Semaphore { // This will be used once the bounded MPSC is updated to use the new // semaphore implementation. pub(crate) fn close(&self) { - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // If the semaphore's permits counter has enough permits for an // unqueued waiter to acquire all the permits it needs immediately, // it won't touch the wait list. Therefore, we have to set a bit on @@ -231,7 +231,7 @@ impl Semaphore { let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { - let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap()); + let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); 'inner: for slot in &mut wakers[..] { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { @@ -324,7 +324,7 @@ impl Semaphore { // counter. Otherwise, if we subtract the permits and then // acquire the lock, we might miss additional permits being // added while waiting for the lock. - lock = Some(self.waiters.lock().unwrap()); + lock = Some(self.waiters.lock()); } match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { @@ -334,7 +334,7 @@ impl Semaphore { if !queued { return Ready(Ok(())); } else if lock.is_none() { - break self.waiters.lock().unwrap(); + break self.waiters.lock(); } } break lock.expect("lock must be acquired before waiting"); @@ -484,14 +484,7 @@ impl Drop for Acquire<'_> { // This is where we ensure safety. The future is being dropped, // which means we must ensure that the waiter entry is no longer stored // in the linked list. - let mut waiters = match self.semaphore.waiters.lock() { - Ok(lock) => lock, - // Removing the node from the linked list is necessary to ensure - // safety. Even if the lock was poisoned, we need to make sure it is - // removed from the linked list before dropping it --- otherwise, - // the list will contain a dangling pointer to this node. - Err(e) => e.into_inner(), - }; + let mut waiters = self.semaphore.waiters.lock(); // remove the entry from the list let node = NonNull::from(&mut self.node); diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9484e130..fe826290 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -529,7 +529,7 @@ impl<T> Sender<T> { pub fn subscribe(&self) -> Receiver<T> { let shared = self.shared.clone(); - let mut tail = shared.tail.lock().unwrap(); + let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { panic!("max receivers"); @@ -584,12 +584,12 @@ impl<T> Sender<T> { /// } /// ``` pub fn receiver_count(&self) -> usize { - let tail = self.shared.tail.lock().unwrap(); + let tail = self.shared.tail.lock(); tail.rx_cnt } fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> { - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); if tail.rx_cnt == 0 { return Err(SendError(value)); @@ -695,7 +695,7 @@ impl<T> Receiver<T> { // the slot lock. drop(slot); - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); @@ -979,7 +979,7 @@ where impl<T> Drop for Receiver<T> { fn drop(&mut self) { - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); if let Some(waiter) = &self.waiter { // safety: tail lock is held @@ -1142,7 +1142,7 @@ where fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap(); + let mut tail = self.receiver.as_mut().shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index d319e8aa..17117bfe 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -306,7 +306,7 @@ impl Notify { } // There are waiters, the lock must be acquired to notify. - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // The state must be reloaded while the lock is held. The state may only // transition out of WAITING while the lock is held. @@ -321,7 +321,7 @@ impl Notify { /// Notifies all waiting tasks pub(crate) fn notify_waiters(&self) { // There are waiters, the lock must be acquired to notify. - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // The state must be reloaded while the lock is held. The state may only // transition out of WAITING while the lock is held. @@ -452,7 +452,7 @@ impl Future for Notified<'_> { // Acquire the lock and attempt to transition to the waiting // state. - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // Reload the state with the lock held let mut curr = notify.state.load(SeqCst); @@ -516,7 +516,7 @@ impl Future for Notified<'_> { // `notify.waiters`). In order to access the waker fields, // we must hold the lock. - let waiters = notify.waiters.lock().unwrap(); + let waiters = notify.waiters.lock(); // Safety: called while locked let w = unsafe { &mut *waiter.get() }; @@ -564,7 +564,7 @@ impl Drop for Notified<'_> { // longer stored in the linked list. if let Waiting = *state { let mut notify_state = WAITING; - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // `Notify.state` may be in any of the three states (Empty, Waiting, // Notified). It doesn't actually matter what the atomic is set to diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 854232c2..aa1e2362 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -278,7 +278,7 @@ impl<T> Slab<T> { } let mut slots = match page.slots.try_lock() { - Ok(slots) => slots, + Some(slots) => slots, // If the lock cannot be acquired due to being held by another // thread, don't try to compact the page. _ => continue, @@ -376,7 +376,7 @@ impl<T: Entry> Page<T> { } // Allocating objects requires synchronization - let mut locked = me.slots.lock().unwrap(); + let mut locked = me.slots.lock(); if locked.head < locked.slots.len() { // Re-use an already initialized slot. @@ -471,7 +471,7 @@ impl<T> Default for Page<T> { impl<T> Page<T> { /// Release a slot into the page's free list fn release(&self, value: *const Value<T>) { - let mut locked = self.slots.lock().unwrap(); + let mut locked = self.slots.lock(); let idx = locked.index_for(value); locked.slots[idx].next = locked.head as u32; @@ -485,7 +485,7 @@ impl<T> Page<T> { impl<T> CachedPage<T> { /// Refresh the cache fn refresh(&mut self, page: &Page<T>) { - let slots = page.slots.lock().unwrap(); + let slots = page.slots.lock(); self.slots = slots.slots.as_ptr(); self.init = slots.slots.len(); } |