summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-21 23:28:57 +0300
committerCarl Lerche <me@carllerche.com>2019-12-21 12:28:57 -0800
commit8656b7b8eb6f3635ec40694eb71f14fb84211e05 (patch)
treee2fd1f95216660edeaadebbce87144e95ccfffde /tokio
parentf309b295bb0bdee5862a0ab8359a5f0622a588b9 (diff)
chore: fix formatting, remove old rustfmt.toml (#2007)
`cargo fmt` has a bug where it does not format modules scoped with feature flags.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/future/pending.rs8
-rw-r--r--tokio/src/io/driver/mod.rs2
-rw-r--r--tokio/src/io/driver/scheduled_io.rs12
-rw-r--r--tokio/src/io/poll_evented.rs2
-rw-r--r--tokio/src/io/registration.rs4
-rw-r--r--tokio/src/io/util/async_read_ext.rs4
-rw-r--r--tokio/src/io/util/async_seek_ext.rs2
-rw-r--r--tokio/src/io/util/async_write_ext.rs4
-rw-r--r--tokio/src/io/util/read_int.rs3
-rw-r--r--tokio/src/io/util/write_int.rs8
-rw-r--r--tokio/src/net/tcp/stream.rs84
-rw-r--r--tokio/src/net/udp/mod.rs2
-rw-r--r--tokio/src/process/mod.rs2
-rw-r--r--tokio/src/runtime/blocking/pool.rs5
-rw-r--r--tokio/src/runtime/park.rs37
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs14
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs2
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs2
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs21
-rw-r--r--tokio/src/sync/broadcast.rs18
-rw-r--r--tokio/src/sync/semaphore.rs10
-rw-r--r--tokio/src/sync/tests/loom_broadcast.rs8
-rw-r--r--tokio/src/time/mod.rs2
-rw-r--r--tokio/src/time/throttle.rs9
-rw-r--r--tokio/src/util/bit.rs16
-rw-r--r--tokio/src/util/slab/addr.rs9
-rw-r--r--tokio/src/util/slab/mod.rs4
-rw-r--r--tokio/src/util/slab/shard.rs7
-rw-r--r--tokio/src/util/slab/slot.rs2
-rw-r--r--tokio/src/util/try_lock.rs12
30 files changed, 171 insertions, 144 deletions
diff --git a/tokio/src/future/pending.rs b/tokio/src/future/pending.rs
index c844ebc3..287e836f 100644
--- a/tokio/src/future/pending.rs
+++ b/tokio/src/future/pending.rs
@@ -1,6 +1,6 @@
+use sdt::pin::Pin;
use std::future::Future;
use std::marker;
-use sdt::pin::Pin;
use std::task::{Context, Poll};
/// Future for the [`pending()`] function.
@@ -29,7 +29,8 @@ struct Pending<T> {
pub async fn pending() -> ! {
Pending {
_data: marker::PhantomData,
- }.await
+ }
+ .await
}
impl<T> Future for Pending<T> {
@@ -40,5 +41,4 @@ impl<T> Future for Pending<T> {
}
}
-impl<T> Unpin for Pending<T> {
-}
+impl<T> Unpin for Pending<T> {}
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index bb784541..58ce5124 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -12,8 +12,8 @@ use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
-use std::sync::{Arc, Weak};
use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 1eb6624c..e26a3588 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -3,7 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::util::bit;
use crate::util::slab::{Address, Entry, Generation};
-use std::sync::atomic::Ordering::{Acquire, AcqRel, SeqCst};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst};
#[derive(Debug)]
pub(crate) struct ScheduledIo {
@@ -29,12 +29,10 @@ impl Entry for ScheduledIo {
let next = PACK.pack(generation.next().to_usize(), 0);
- match self.readiness.compare_exchange(
- current,
- next,
- AcqRel,
- Acquire,
- ) {
+ match self
+ .readiness
+ .compare_exchange(current, next, AcqRel, Acquire)
+ {
Ok(_) => break,
Err(actual) => current = actual,
}
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index e1d0d74d..8bf9c972 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -1,5 +1,5 @@
+use crate::io::driver::platform;
use crate::io::{AsyncRead, AsyncWrite, Registration};
-use crate::io::driver::{platform};
use mio::event::Evented;
use std::fmt;
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index c7f3d2a0..e2ce2cd3 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -1,9 +1,9 @@
-use crate::io::driver::{Direction, Handle, platform};
+use crate::io::driver::{platform, Direction, Handle};
use crate::util::slab::Address;
use mio::{self, Evented};
-use std::task::{Context, Poll};
use std::io;
+use std::task::{Context, Poll};
cfg_io_driver! {
/// Associates an I/O resource with the reactor instance that drives it.
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs
index 04979336..4ffb769c 100644
--- a/tokio/src/io/util/async_read_ext.rs
+++ b/tokio/src/io/util/async_read_ext.rs
@@ -2,8 +2,8 @@ use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
-use crate::io::util::read_int::{ReadU8, ReadU16, ReadU32, ReadU64, ReadU128};
-use crate::io::util::read_int::{ReadI8, ReadI16, ReadI32, ReadI64, ReadI128};
+use crate::io::util::read_int::{ReadI128, ReadI16, ReadI32, ReadI64, ReadI8};
+use crate::io::util::read_int::{ReadU128, ReadU16, ReadU32, ReadU64, ReadU8};
use crate::io::util::read_to_end::{read_to_end, ReadToEnd};
use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs
index aeae4cbd..3063884b 100644
--- a/tokio/src/io/util/async_seek_ext.rs
+++ b/tokio/src/io/util/async_seek_ext.rs
@@ -35,4 +35,4 @@ pub trait AsyncSeekExt: AsyncSeek {
}
}
-impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} \ No newline at end of file
+impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs
index 13d8b745..e54501d9 100644
--- a/tokio/src/io/util/async_write_ext.rs
+++ b/tokio/src/io/util/async_write_ext.rs
@@ -3,8 +3,8 @@ use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
use crate::io::util::write_buf::{write_buf, WriteBuf};
-use crate::io::util::write_int::{WriteU8, WriteU16, WriteU32, WriteU64, WriteU128};
-use crate::io::util::write_int::{WriteI8, WriteI16, WriteI32, WriteI64, WriteI128};
+use crate::io::util::write_int::{WriteI128, WriteI16, WriteI32, WriteI64, WriteI8};
+use crate::io::util::write_int::{WriteU128, WriteU16, WriteU32, WriteU64, WriteU8};
use crate::io::AsyncWrite;
use bytes::Buf;
diff --git a/tokio/src/io/util/read_int.rs b/tokio/src/io/util/read_int.rs
index 126252dd..9dc4402f 100644
--- a/tokio/src/io/util/read_int.rs
+++ b/tokio/src/io/util/read_int.rs
@@ -48,7 +48,8 @@ macro_rules! reader {
}
while *me.read < $bytes as u8 {
- *me.read += match me.src
+ *me.read += match me
+ .src
.as_mut()
.poll_read(cx, &mut me.buf[*me.read as usize..])
{
diff --git a/tokio/src/io/util/write_int.rs b/tokio/src/io/util/write_int.rs
index eeacffdb..28add549 100644
--- a/tokio/src/io/util/write_int.rs
+++ b/tokio/src/io/util/write_int.rs
@@ -49,7 +49,8 @@ macro_rules! writer {
}
while *me.written < $bytes as u8 {
- *me.written += match me.dst
+ *me.written += match me
+ .dst
.as_mut()
.poll_write(cx, &me.buf[*me.written as usize..])
{
@@ -77,10 +78,7 @@ macro_rules! writer8 {
impl<W> $name<W> {
pub(crate) fn new(dst: W, byte: $ty) -> Self {
- Self {
- dst,
- byte,
- }
+ Self { dst, byte }
}
}
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 343c6c5d..d35f1620 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -674,22 +674,70 @@ impl TcpStream {
// IoSlice isn't Copy, so we must expand this manually ;_;
let mut slices: [IoSlice<'_>; MAX_BUFS] = [
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
- IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
];
let cnt = buf.bytes_vectored(&mut slices);
@@ -703,11 +751,11 @@ impl TcpStream {
Ok(n) => {
buf.advance(n);
Poll::Ready(Ok(n))
- },
+ }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
- },
+ }
Err(e) => Poll::Ready(Err(e)),
}
}
diff --git a/tokio/src/net/udp/mod.rs b/tokio/src/net/udp/mod.rs
index a616f4d5..d43121a1 100644
--- a/tokio/src/net/udp/mod.rs
+++ b/tokio/src/net/udp/mod.rs
@@ -4,4 +4,4 @@ pub(crate) mod socket;
pub(crate) use socket::UdpSocket;
mod split;
-pub use split::{RecvHalf, SendHalf, ReuniteError};
+pub use split::{RecvHalf, ReuniteError, SendHalf};
diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs
index 60e854dd..8dc2133c 100644
--- a/tokio/src/process/mod.rs
+++ b/tokio/src/process/mod.rs
@@ -556,7 +556,7 @@ impl Command {
imp::spawn_child(&mut self.std).map(|spawned_child| Child {
child: ChildDropGuard {
inner: spawned_child.child,
- kill_on_drop: self.kill_on_drop
+ kill_on_drop: self.kill_on_drop,
},
stdin: spawned_child.stdin.map(|inner| ChildStdin { inner }),
stdout: spawned_child.stdout.map(|inner| ChildStdout { inner }),
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 02b3d19b..6c23daec 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -2,10 +2,10 @@
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
-use crate::runtime::{self, io, time, Builder, Callback};
-use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::{self, io, time, Builder, Callback};
use crate::task::{self, JoinHandle};
use std::cell::Cell;
@@ -55,7 +55,6 @@ struct Inner {
clock: time::Clock,
thread_cap: usize,
-
}
struct Shared {
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs
index 1543bcb1..c3bbe9c0 100644
--- a/tokio/src/runtime/park.rs
+++ b/tokio/src/runtime/park.rs
@@ -2,8 +2,8 @@
//!
//! A combination of the various resource driver park handles.
-use crate::loom::sync::{Arc, Mutex, Condvar};
use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::park::{Park, Unpark};
use crate::runtime::time;
@@ -84,7 +84,9 @@ impl Park for Parker {
type Error = ();
fn unpark(&self) -> Unparker {
- Unparker { inner: self.inner.clone() }
+ Unparker {
+ inner: self.inner.clone(),
+ }
}
fn park(&mut self) -> Result<(), Self::Error> {
@@ -97,8 +99,7 @@ impl Park for Parker {
assert_eq!(duration, Duration::from_millis(0));
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
- driver.park_timeout(duration)
- .map_err(|_| ())
+ driver.park_timeout(duration).map_err(|_| ())
} else {
Ok(())
}
@@ -117,7 +118,11 @@ impl Inner {
for _ in 0..3 {
// If we were previously notified then we consume this notification and
// return quickly.
- if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
return;
}
@@ -135,7 +140,10 @@ impl Inner {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock().unwrap();
- match self.state.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) {
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
+ {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
@@ -155,7 +163,11 @@ impl Inner {
loop {
m = self.condvar.wait(m).unwrap();
- if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
// got a notification
return;
}
@@ -165,7 +177,10 @@ impl Inner {
}
fn park_driver(&self, driver: &mut time::Driver) {
- match self.state.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) {
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
+ {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
@@ -186,7 +201,7 @@ impl Inner {
driver.park().unwrap();
match self.state.swap(EMPTY, SeqCst) {
- NOTIFIED => {} // got a notification, hurray!
+ NOTIFIED => {} // got a notification, hurray!
PARKED_DRIVER => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {}", n),
}
@@ -199,8 +214,8 @@ impl Inner {
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
- EMPTY => {}, // no one was waiting
- NOTIFIED => {}, // already unparked
+ EMPTY => {} // no one was waiting
+ NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => self.unpark_driver(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 3d795fa4..3d4c69c2 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -54,20 +54,12 @@ pub(crate) struct Workers {
}
impl ThreadPool {
- pub(crate) fn new(
- pool_size: usize,
- parker: Parker,
- ) -> (ThreadPool, Workers) {
- let (pool, workers) = worker::create_set(
- pool_size,
- parker,
- );
+ pub(crate) fn new(pool_size: usize, parker: Parker) -> (ThreadPool, Workers) {
+ let (pool, workers) = worker::create_set(pool_size, parker);
let spawner = Spawner::new(pool);
- let pool = ThreadPool {
- spawner,
- };
+ let pool = ThreadPool { spawner };
(pool, Workers { workers })
}
diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs
index aa521a15..05380329 100644
--- a/tokio/src/runtime/thread_pool/slice.rs
+++ b/tokio/src/runtime/thread_pool/slice.rs
@@ -4,8 +4,8 @@
use crate::loom::rand::seed;
use crate::park::Park;
-use crate::runtime::Parker;
use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared};
+use crate::runtime::Parker;
use crate::task::{self, JoinHandle, Task};
use crate::util::{CachePadded, FastRand};
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index c85ff591..aee66e7f 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,5 +1,5 @@
-use crate::runtime::{self, Runtime};
use crate::runtime::tests::loom_oneshot as oneshot;
+use crate::runtime::{self, Runtime};
use crate::spawn;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 92f3cfbd..18c0db1f 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -1,9 +1,9 @@
use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
use crate::park::Park;
-use crate::runtime::{self, blocking};
use crate::runtime::park::Parker;
use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner};
+use crate::runtime::{self, blocking};
use crate::task::Task;
use std::cell::Cell;
@@ -78,10 +78,7 @@ struct GenerationGuard<'a> {
struct WorkerGone;
// TODO: Move into slices
-pub(super) fn create_set(
- pool_size: usize,
- parker: Parker,
-) -> (Arc<slice::Set>, Vec<Worker>) {
+pub(super) fn create_set(pool_size: usize, parker: Parker) -> (Arc<slice::Set>, Vec<Worker>) {
// Create the parks...
let parkers: Vec<_> = (0..pool_size).map(|_| parker.clone()).collect();
@@ -95,13 +92,7 @@ pub(super) fn create_set(
let workers = parkers
.into_iter()
.enumerate()
- .map(|(index, parker)| {
- Worker::new(
- slices.clone(),
- index,
- parker,
- )
- })
+ .map(|(index, parker)| Worker::new(slices.clone(), index, parker))
.collect();
(slices, workers)
@@ -116,11 +107,7 @@ const GLOBAL_POLL_INTERVAL: u16 = 61;
impl Worker {
// Safe as aquiring a lock is required before doing anything potentially
// dangerous.
- pub(super) fn new(
- slices: Arc<slice::Set>,
- index: usize,
- park: Parker,
- ) -> Self {
+ pub(super) fn new(slices: Arc<slice::Set>, index: usize, park: Parker) -> Self {
Worker {
inner: Arc::new(Inner {
park: CausalCell::new(park),
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 093aa1f3..fd9029a7 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -109,8 +109,8 @@
use crate::loom::cell::CausalCell;
use crate::loom::future::AtomicWaker;
-use crate::loom::sync::{Mutex, Arc, Condvar};
-use crate::loom::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, spin_loop_hint};
+use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
+use crate::loom::sync::{Arc, Condvar, Mutex};
use std::fmt;
use std::ptr;
@@ -387,10 +387,7 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
- tail: Mutex::new(Tail {
- pos: 0,
- rx_cnt: 1,
- }),
+ tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }),
condvar: Condvar::new(),
wait_stack: AtomicPtr::new(ptr::null_mut()),
num_tx: AtomicUsize::new(1),
@@ -406,9 +403,7 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
}),
};
- let tx = Sender {
- shared,
- };
+ let tx = Sender { shared };
(tx, rx)
}
@@ -852,7 +847,10 @@ where
// access to `self.wait.next`.
self.wait.next.with_mut(|ptr| unsafe { *ptr = curr });
- let res = self.shared.wait_stack.compare_exchange(curr, node, SeqCst, SeqCst);
+ let res = self
+ .shared
+ .wait_stack
+ .compare_exchange(curr, node, SeqCst, SeqCst);
match res {
Ok(_) => return,
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index c1e9ef3e..2cfb5d34 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -60,7 +60,9 @@ impl Semaphore {
sem: &self,
ll_permit: ll::Permit::new(),
};
- poll_fn(|cx| permit.ll_permit.poll_acquire(cx, &self.ll_sem)).await.unwrap();
+ poll_fn(|cx| permit.ll_permit.poll_acquire(cx, &self.ll_sem))
+ .await
+ .unwrap();
permit
}
@@ -68,10 +70,12 @@ impl Semaphore {
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
let mut ll_permit = ll::Permit::new();
match ll_permit.try_acquire(&self.ll_sem) {
- Ok(_) => Ok(SemaphorePermit { sem: self, ll_permit }),
+ Ok(_) => Ok(SemaphorePermit {
+ sem: self,
+ ll_permit,
+ }),
Err(_) => Err(TryAcquireError(())),
}
-
}
}
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs
index 53f76a25..da61563b 100644
--- a/tokio/src/sync/tests/loom_broadcast.rs
+++ b/tokio/src/sync/tests/loom_broadcast.rs
@@ -66,9 +66,7 @@ fn broadcast_wrap() {
match rx1.recv().await {
Ok(_) => num += 1,
Err(Closed) => break,
- Err(Lagged(n)) => {
- num += n as usize
- },
+ Err(Lagged(n)) => num += n as usize,
}
}
@@ -84,9 +82,7 @@ fn broadcast_wrap() {
match rx2.recv().await {
Ok(_) => num += 1,
Err(Closed) => break,
- Err(Lagged(n)) => {
- num += n as usize
- }
+ Err(Lagged(n)) => num += n as usize,
}
}
diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs
index 20e8dc80..4193dc15 100644
--- a/tokio/src/time/mod.rs
+++ b/tokio/src/time/mod.rs
@@ -94,7 +94,7 @@ pub use interval::{interval, interval_at, Interval};
mod timeout;
#[doc(inline)]
-pub use timeout::{timeout, timeout_at, Timeout, Elapsed};
+pub use timeout::{timeout, timeout_at, Elapsed, Timeout};
cfg_stream! {
mod throttle;
diff --git a/tokio/src/time/throttle.rs b/tokio/src/time/throttle.rs
index ccb28ad8..07e38628 100644
--- a/tokio/src/time/throttle.rs
+++ b/tokio/src/time/throttle.rs
@@ -97,16 +97,11 @@ impl<T: Stream> Stream for Throttle<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
if !self.has_delayed && self.delay.is_some() {
- ready!(Pin::new(self.as_mut()
- .project().delay.as_mut().unwrap())
- .poll(cx));
+ ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx));
*self.as_mut().project().has_delayed = true;
}
- let value = ready!(self
- .as_mut()
- .project().stream
- .poll_next(cx));
+ let value = ready!(self.as_mut().project().stream.poll_next(cx));
if value.is_some() {
let dur = self.duration;
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs
index 03fb0cfb..d18298f7 100644
--- a/tokio/src/util/bit.rs
+++ b/tokio/src/util/bit.rs
@@ -21,10 +21,7 @@ impl Pack {
pub(crate) const fn least_significant(width: u32) -> Pack {
let mask = mask_for(width);
- Pack {
- mask,
- shift: 0,
- }
+ Pack { mask, shift: 0 }
}
/// Value is packed in the `width` more-significant bits.
@@ -32,10 +29,7 @@ impl Pack {
let shift = pointer_width() - self.mask.leading_zeros();
let mask = mask_for(width) << shift;
- Pack {
- mask,
- shift,
- }
+ Pack { mask, shift }
}
/// Mask used to unpack value
@@ -65,7 +59,11 @@ impl Pack {
impl fmt::Debug for Pack {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "Pack {{ mask: {:b}, shift: {} }}", self.mask, self.shift)
+ write!(
+ fmt,
+ "Pack {{ mask: {:b}, shift: {} }}",
+ self.mask, self.shift
+ )
}
}
diff --git a/tokio/src/util/slab/addr.rs b/tokio/src/util/slab/addr.rs
index 2efe93ef..c14e32e9 100644
--- a/tokio/src/util/slab/addr.rs
+++ b/tokio/src/util/slab/addr.rs
@@ -50,7 +50,7 @@
//! ```
use crate::util::bit;
-use crate::util::slab::{Generation, MAX_PAGES, MAX_THREADS, INITIAL_PAGE_SIZE};
+use crate::util::slab::{Generation, INITIAL_PAGE_SIZE, MAX_PAGES, MAX_THREADS};
use std::usize;
@@ -61,