summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-02-21 11:56:15 -0800
committerGitHub <noreply@github.com>2019-02-21 11:56:15 -0800
commit80162306e71c8561873a9c9496d65f2c1387d119 (patch)
tree83327ca8d9d1326d54e3c679e1fb4eb16775d4be /tokio-threadpool
parentab595d08253dd7ee0422144f8dafffa382700976 (diff)
chore: apply rustfmt to all crates (#917)
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/benches/basic.rs22
-rw-r--r--tokio-threadpool/benches/blocking.rs36
-rw-r--r--tokio-threadpool/benches/depth.rs22
-rw-r--r--tokio-threadpool/examples/depth.rs8
-rw-r--r--tokio-threadpool/examples/hello.rs17
-rw-r--r--tokio-threadpool/src/blocking.rs6
-rw-r--r--tokio-threadpool/src/builder.rs32
-rw-r--r--tokio-threadpool/src/callback.rs3
-rw-r--r--tokio-threadpool/src/lib.rs2
-rw-r--r--tokio-threadpool/src/park/boxed.rs23
-rw-r--r--tokio-threadpool/src/pool/backup.rs32
-rw-r--r--tokio-threadpool/src/pool/backup_stack.rs20
-rw-r--r--tokio-threadpool/src/pool/mod.rs36
-rw-r--r--tokio-threadpool/src/pool/state.rs10
-rw-r--r--tokio-threadpool/src/sender.rs33
-rw-r--r--tokio-threadpool/src/shutdown.rs2
-rw-r--r--tokio-threadpool/src/task/blocking.rs31
-rw-r--r--tokio-threadpool/src/task/mod.rs55
-rw-r--r--tokio-threadpool/src/task/state.rs6
-rw-r--r--tokio-threadpool/src/thread_pool.rs34
-rw-r--r--tokio-threadpool/src/worker/entry.rs21
-rw-r--r--tokio-threadpool/src/worker/mod.rs60
-rw-r--r--tokio-threadpool/src/worker/stack.rs33
-rw-r--r--tokio-threadpool/src/worker/state.rs21
-rw-r--r--tokio-threadpool/tests/blocking.rs99
-rw-r--r--tokio-threadpool/tests/hammer.rs32
-rw-r--r--tokio-threadpool/tests/threadpool.rs67
27 files changed, 393 insertions, 370 deletions
diff --git a/tokio-threadpool/benches/basic.rs b/tokio-threadpool/benches/basic.rs
index e2d43bbd..70dee74f 100644
--- a/tokio-threadpool/benches/basic.rs
+++ b/tokio-threadpool/benches/basic.rs
@@ -1,11 +1,11 @@
#![feature(test)]
#![deny(warnings)]
-extern crate tokio_threadpool;
extern crate futures;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate test;
+extern crate tokio_threadpool;
const NUM_SPAWN: usize = 10_000;
const NUM_YIELD: usize = 1_000;
@@ -13,12 +13,12 @@ const TASKS_PER_CPU: usize = 50;
mod threadpool {
use futures::{future, task, Async};
- use tokio_threadpool::*;
use num_cpus;
- use test;
- use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
+ use tokio_threadpool::*;
#[bench]
fn spawn_many(b: &mut test::Bencher) {
@@ -90,14 +90,14 @@ mod threadpool {
// See rust-lang-nursery/futures-rs#617
//
mod cpupool {
- use futures::{task, Async};
use futures::future::{self, Executor};
+ use futures::{task, Async};
use futures_cpupool::*;
use num_cpus;
- use test;
- use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
#[bench]
fn spawn_many(b: &mut test::Bencher) {
@@ -119,7 +119,9 @@ mod cpupool {
}
Ok(())
- })).ok().unwrap();
+ }))
+ .ok()
+ .unwrap();
}
let _ = rx.recv().unwrap();
@@ -151,7 +153,9 @@ mod cpupool {
// Not ready
Ok(Async::NotReady)
}
- })).ok().unwrap();
+ }))
+ .ok()
+ .unwrap();
}
for _ in 0..tasks {
diff --git a/tokio-threadpool/benches/blocking.rs b/tokio-threadpool/benches/blocking.rs
index ea432c88..8ea900ea 100644
--- a/tokio-threadpool/benches/blocking.rs
+++ b/tokio-threadpool/benches/blocking.rs
@@ -3,9 +3,9 @@
extern crate futures;
extern crate rand;
-extern crate tokio_threadpool;
-extern crate threadpool;
extern crate test;
+extern crate threadpool;
+extern crate tokio_threadpool;
const ITER: usize = 1_000;
@@ -13,14 +13,11 @@ mod blocking {
use super::*;
use futures::future::*;
- use tokio_threadpool::{Builder, blocking};
+ use tokio_threadpool::{blocking, Builder};
#[bench]
fn cpu_bound(b: &mut test::Bencher) {
- let pool = Builder::new()
- .pool_size(2)
- .max_blocking(20)
- .build();
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
b.iter(|| {
let count_down = Arc::new(CountDown::new(::ITER));
@@ -29,17 +26,12 @@ mod blocking {
let count_down = count_down.clone();
pool.spawn(lazy(move || {
- poll_fn(|| {
- blocking(|| {
- perform_complex_computation()
+ poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!()))
+ .and_then(move |_| {
+ // Do something with the value
+ count_down.dec();
+ Ok(())
})
- .map_err(|_| panic!())
- })
- .and_then(move |_| {
- // Do something with the value
- count_down.dec();
- Ok(())
- })
}));
}
@@ -57,10 +49,7 @@ mod message_passing {
#[bench]
fn cpu_bound(b: &mut test::Bencher) {
- let pool = Builder::new()
- .pool_size(2)
- .max_blocking(20)
- .build();
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
let blocking = threadpool::ThreadPool::new(20);
@@ -85,7 +74,8 @@ mod message_passing {
rx.and_then(move |_| {
count_down.dec();
Ok(())
- }).map_err(|_| panic!())
+ })
+ .map_err(|_| panic!())
}));
}
@@ -104,9 +94,9 @@ fn perform_complex_computation() -> usize {
// Util for waiting until the tasks complete
-use std::sync::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
+use std::sync::*;
struct CountDown {
rem: AtomicUsize,
diff --git a/tokio-threadpool/benches/depth.rs b/tokio-threadpool/benches/depth.rs
index d500ad4a..2d89ac90 100644
--- a/tokio-threadpool/benches/depth.rs
+++ b/tokio-threadpool/benches/depth.rs
@@ -1,19 +1,19 @@
#![feature(test)]
#![deny(warnings)]
-extern crate tokio_threadpool;
extern crate futures;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate test;
+extern crate tokio_threadpool;
const ITER: usize = 20_000;
mod us {
- use tokio_threadpool::*;
use futures::future;
- use test;
use std::sync::mpsc;
+ use test;
+ use tokio_threadpool::*;
#[bench]
fn chained_spawn(b: &mut test::Bencher) {
@@ -24,10 +24,12 @@ mod us {
res_tx.send(()).unwrap();
} else {
let pool_tx2 = pool_tx.clone();
- pool_tx.spawn(future::lazy(move || {
- spawn(pool_tx2, res_tx, n - 1);
- Ok(())
- })).unwrap();
+ pool_tx
+ .spawn(future::lazy(move || {
+ spawn(pool_tx2, res_tx, n - 1);
+ Ok(())
+ }))
+ .unwrap();
}
}
@@ -44,8 +46,8 @@ mod cpupool {
use futures::future::{self, Executor};
use futures_cpupool::*;
use num_cpus;
- use test;
use std::sync::mpsc;
+ use test;
#[bench]
fn chained_spawn(b: &mut test::Bencher) {
@@ -59,7 +61,9 @@ mod cpupool {
pool.execute(future::lazy(move || {
spawn(pool2, res_tx, n - 1);
Ok(())
- })).ok().unwrap();
+ }))
+ .ok()
+ .unwrap();
}
}
diff --git a/tokio-threadpool/examples/depth.rs b/tokio-threadpool/examples/depth.rs
index 7957f09e..3d376dd3 100644
--- a/tokio-threadpool/examples/depth.rs
+++ b/tokio-threadpool/examples/depth.rs
@@ -1,9 +1,9 @@
+extern crate env_logger;
extern crate futures;
extern crate tokio_threadpool;
-extern crate env_logger;
-use tokio_threadpool::*;
use futures::future::{self, Executor};
+use tokio_threadpool::*;
use std::sync::mpsc;
@@ -22,7 +22,9 @@ fn chained_spawn() {
tx.execute(future::lazy(move || {
spawn(tx2, res_tx, n - 1);
Ok(())
- })).ok().unwrap();
+ }))
+ .ok()
+ .unwrap();
}
}
diff --git a/tokio-threadpool/examples/hello.rs b/tokio-threadpool/examples/hello.rs
index 3324f862..87eb688c 100644
--- a/tokio-threadpool/examples/hello.rs
+++ b/tokio-threadpool/examples/hello.rs
@@ -1,10 +1,10 @@
+extern crate env_logger;
extern crate futures;
extern crate tokio_threadpool;
-extern crate env_logger;
-use tokio_threadpool::*;
-use futures::*;
use futures::sync::oneshot;
+use futures::*;
+use tokio_threadpool::*;
pub fn main() {
let _ = ::env_logger::init();
@@ -12,10 +12,13 @@ pub fn main() {
let pool = ThreadPool::new();
let tx = pool.sender().clone();
- let res = oneshot::spawn(future::lazy(|| {
- println!("Running on the pool");
- Ok::<_, ()>("complete")
- }), &tx);
+ let res = oneshot::spawn(
+ future::lazy(|| {
+ println!("Running on the pool");
+ Ok::<_, ()>("complete")
+ }),
+ &tx,
+ );
println!("Result: {:?}", res.wait());
}
diff --git a/tokio-threadpool/src/blocking.rs b/tokio-threadpool/src/blocking.rs
index 88cdd15f..9f91234b 100644
--- a/tokio-threadpool/src/blocking.rs
+++ b/tokio-threadpool/src/blocking.rs
@@ -122,7 +122,8 @@ pub struct BlockingError {
/// }
/// ```
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
-where F: FnOnce() -> T,
+where
+ F: FnOnce() -> T,
{
let res = Worker::with_current(|worker| {
let worker = match worker {
@@ -148,8 +149,7 @@ where F: FnOnce() -> T,
// back ownership of the worker if the worker handoff didn't complete yet.
Worker::with_current(|worker| {
// Worker must be set since it was above.
- worker.unwrap()
- .transition_from_blocking();
+ worker.unwrap().transition_from_blocking();
});
// Return the result
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs
index 82c7cac6..1cb2ec52 100644
--- a/tokio-threadpool/src/builder.rs
+++ b/tokio-threadpool/src/builder.rs
@@ -1,21 +1,21 @@
use callback::Callback;
use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
-use shutdown::ShutdownTrigger;
use pool::{Pool, MAX_BACKUP};
+use shutdown::ShutdownTrigger;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
+use std::cmp::max;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
-use std::cmp::max;
use crossbeam_deque::Injector;
use num_cpus;
-use tokio_executor::Enter;
use tokio_executor::park::Park;
+use tokio_executor::Enter;
/// Builds a thread pool with custom configuration values.
///
@@ -93,10 +93,8 @@ impl Builder {
pub fn new() -> Builder {
let num_cpus = max(1, num_cpus::get());
- let new_park = Box::new(|_: &WorkerId| {
- Box::new(BoxedPark::new(DefaultPark::new()))
- as BoxPark
- });
+ let new_park =
+ Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
Builder {
pool_size: num_cpus,
@@ -280,7 +278,8 @@ impl Builder {
///
/// [`Worker::run`]: struct.Worker.html#method.run
pub fn around_worker<F>(&mut self, f: F) -> &mut Self
- where F: Fn(&Worker, &mut Enter) + Send + Sync + 'static
+ where
+ F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
{
self.config.around_worker = Some(Callback::new(f));
self
@@ -307,7 +306,8 @@ impl Builder {
/// # }
/// ```
pub fn after_start<F>(&mut self, f: F) -> &mut Self
- where F: Fn() + Send + Sync + 'static
+ where
+ F: Fn() + Send + Sync + 'static,
{
self.config.after_start = Some(Arc::new(f));
self
@@ -333,7 +333,8 @@ impl Builder {
/// # }
/// ```
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
- where F: Fn() + Send + Sync + 'static
+ where
+ F: Fn() + Send + Sync + 'static,
{
self.config.before_stop = Some(Arc::new(f));
self
@@ -369,13 +370,12 @@ impl Builder {
/// # }
/// ```
pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
- where F: Fn(&WorkerId) -> P + 'static,
- P: Park + Send + 'static,
- P::Error: Error,
+ where
+ F: Fn(&WorkerId) -> P + 'static,
+ P: Park + Send + 'static,
+ P::Error: Error,
{
- self.new_park = Box::new(move |id| {
- Box::new(BoxedPark::new(f(id)))
- });
+ self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
self
}
diff --git a/tokio-threadpool/src/callback.rs b/tokio-threadpool/src/callback.rs
index e269872a..aabf876f 100644
--- a/tokio-threadpool/src/callback.rs
+++ b/tokio-threadpool/src/callback.rs
@@ -12,7 +12,8 @@ pub(crate) struct Callback {
impl Callback {
pub fn new<F>(f: F) -> Self
- where F: Fn(&Worker, &mut Enter) + Send + Sync + 'static
+ where
+ F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
{
Callback { f: Arc::new(f) }
}
diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs
index 94d86700..4ea3d6a9 100644
--- a/tokio-threadpool/src/lib.rs
+++ b/tokio-threadpool/src/lib.rs
@@ -159,5 +159,5 @@ pub use blocking::{blocking, BlockingError};
pub use builder::Builder;
pub use sender::Sender;
pub use shutdown::Shutdown;
-pub use thread_pool::{ThreadPool, SpawnHandle};
+pub use thread_pool::{SpawnHandle, ThreadPool};
pub use worker::{Worker, WorkerId};
diff --git a/tokio-threadpool/src/park/boxed.rs b/tokio-threadpool/src/park/boxed.rs
index bd3671d4..8beaa0bb 100644
--- a/tokio-threadpool/src/park/boxed.rs
+++ b/tokio-threadpool/src/park/boxed.rs
@@ -15,7 +15,8 @@ impl<T> BoxedPark<T> {
}
impl<T: Park + Send> Park for BoxedPark<T>
-where T::Error: Error,
+where
+ T::Error: Error,
{
type Unpark = BoxUnpark;
type Error = ();
@@ -25,16 +26,20 @@ where T::Error: Error,
}
fn park(&mut self) -> Result<(), Self::Error> {
- self.0.park()
- .map_err(|e| {
- warn!("calling `park` on worker thread errored -- shutting down thread: {}", e);
- })
+ self.0.park().map_err(|e| {
+ warn!(
+ "calling `park` on worker thread errored -- shutting down thread: {}",
+ e
+ );
+ })
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.0.park_timeout(duration)
- .map_err(|e| {
- warn!("calling `park` on worker thread errored -- shutting down thread: {}", e);
- })
+ self.0.park_timeout(duration).map_err(|e| {
+ warn!(
+ "calling `park` on worker thread errored -- shutting down thread: {}",
+ e
+ );
+ })
}
}
diff --git a/tokio-threadpool/src/pool/backup.rs b/tokio-threadpool/src/pool/backup.rs
index feaff306..e94e95d6 100644
--- a/tokio-threadpool/src/pool/backup.rs
+++ b/tokio-threadpool/src/pool/backup.rs
@@ -1,10 +1,10 @@
use park::DefaultPark;
-use worker::{WorkerId};
+use worker::WorkerId;
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
use std::time::{Duration, Instant};
/// State associated with a thread in the thread pool.
@@ -100,9 +100,11 @@ impl Backup {
});
// The handoff value is equal to `worker_id`
- debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
+ debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
- unsafe { *self.handoff.get() = None; }
+ unsafe {
+ *self.handoff.get() = None;
+ }
}
pub fn is_running(&self) -> bool {
@@ -167,10 +169,7 @@ impl Backup {
return Handoff::Terminated;
}
- let worker_id = unsafe {
- (*self.handoff.get()).take()
- .expect("no worker handoff")
- };
+ let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") };
return Handoff::Worker(worker_id);
}
@@ -192,10 +191,10 @@ impl Backup {
let mut next = state;
next.unset_running();
- let actual = self.state.compare_and_swap(
- state.into(),
- next.into(),
- AcqRel).into();
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if actual == state {
debug_assert!(!next.is_running());
@@ -226,7 +225,9 @@ impl Backup {
#[inline]
pub fn set_next_sleeper(&self, val: BackupId) {
- unsafe { *self.next_sleeper.get() = val; }
+ unsafe {
+ *self.next_sleeper.get() = val;
+ }
}
}
@@ -271,8 +272,9 @@ impl State {
next.set_running();
next.unset_pushed();
- let actual = state.compare_and_swap(
- curr.into(), next.into(), AcqRel).into();
+ let actual = state
+ .compare_and_swap(curr.into(), next.into(), AcqRel)
+ .into();
if actual == curr {
return curr;
diff --git a/tokio-threadpool/src/pool/backup_stack.rs b/tokio-threadpool/src/pool/backup_stack.rs
index aa69e143..b9a46d08 100644
--- a/tokio-threadpool/src/pool/backup_stack.rs
+++ b/tokio-threadpool/src/pool/backup_stack.rs
@@ -1,7 +1,7 @@
use pool::{Backup, BackupId};
use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::{Acquire, AcqRel};
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
#[derive(Debug)]
pub(crate) struct BackupStack {
@@ -65,8 +65,10 @@ impl BackupStack {
entries[id.0].set_next_sleeper(head);
next.set_head(id);
- let actual = self.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if state == actual {
return Ok(());
@@ -110,8 +112,10 @@ impl BackupStack {
return Ok(None);
}
- let actual = self.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if actual != state {
state = actual;
@@ -138,8 +142,10 @@ impl BackupStack {
next.set_head(next_head);
}
- let actual = self.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if actual == state {
debug_assert!(entries[head.0].is_pushed());
diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs
index 2326fca0..92917835 100644
--- a/tokio-threadpool/src/pool/mod.rs
+++ b/tokio-threadpool/src/pool/mod.rs
@@ -4,11 +4,7 @@ mod state;
pub(crate) use self::backup::{Backup, BackupId};
pub(crate) use self::backup_stack::MAX_BACKUP;
-pub(crate) use self::state::{
- State,
- Lifecycle,
- MAX_FUTURES,
-};
+pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES};
use self::backup::Handoff;
use self::backup_stack::BackupStack;
@@ -22,8 +18,8 @@ use futures::Poll;
use std::cell::Cell;
use std::num::Wrapping;
-use std::sync::atomic::Ordering::{Acquire, AcqRel};
use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::sync::{Arc, Weak};
use std::thread;
@@ -100,15 +96,15 @@ impl Pool {
//
// This is `backup + pool_size` because the core thread pool running the
// workers is spawned from backup as well.
- let backup = (0..total_size).map(|_| {
- Backup::new()
- }).collect::<Vec<_>>().into_boxed_slice();
+ let backup = (0..total_size)
+ .map(|_| Backup::new())
+ .collect::<Vec<_>>()
+ .into_boxed_slice();
let backup_stack = BackupStack::new();
for i in (0..backup.len()).rev() {
- backup_stack.push(&backup, BackupId(i))
- .unwrap();
+ backup_stack.push(&backup, BackupId(i)).unwrap();
}
// Initialize the blocking state
@@ -174,8 +170,10 @@ impl Pool {
}
}
- let actual = self.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if state == actual {
state = next;
@@ -299,8 +297,7 @@ impl Pool {
}
};
- let need_spawn = self.backup[backup_id.0]
- .worker_handoff(id.clone());
+ let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone());
if !need_spawn {
return;
@@ -355,8 +352,7 @@ impl Pool {
// available for future handoffs.
//
// This **must** happen before notifying the task.
- let res = pool.backup_stack
- .push(&pool.backup, backup_id);
+ let res = pool.backup_stack.push(&pool.backup, backup_id);
if res.is_err() {
// The pool is being shutdown.
@@ -370,8 +366,7 @@ impl Pool {
debug_assert!(pool.backup[backup_id.0].is_running());
// Wait for a handoff
- let handoff = pool.backup[backup_id.0]
- .wait_for_handoff(pool.config.keep_alive);
+ let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive);
match handoff {
Handoff::Worker(id) => {
@@ -407,7 +402,8 @@ impl Pool {
debug_assert!(
worker_state.lifecycle() != Signaled,
- "actual={:?}", worker_state.lifecycle(),
+ "actual={:?}",
+ worker_state.lifecycle(),
);
trace!("signal_work -- notify; idx={}", idx);
diff --git a/tokio-threadpool/src/pool/state.rs b/tokio-threadpool/src/pool/state.rs
index e8f5d12e..5ecb514e 100644
--- a/tokio-threadpool/src/pool/state.rs
+++ b/tokio-threadpool/src/pool/state.rs
@@ -82,8 +82,7 @@ impl State {
}
pub fn is_terminated(&self) -> bool {
- self.lifecycle() == Lifecycle::ShutdownNow &&
- self.num_futures() == 0
+ self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0
}
}
@@ -115,9 +114,10 @@ impl From<usize> for Lifecycle {
use self::Lifecycle::*;
debug_assert!(
- src == Running as usize ||
- src == ShutdownOnIdle as usize ||
- src == ShutdownNow as usize);
+ src == Running as usize
+ || src == ShutdownOnIdle as usize
+ || src == ShutdownNow as usize
+ );
unsafe { ::std::mem::transmute(src) }
}
diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs
index de5f0e07..15befd43 100644
--- a/tokio-threadpool/src/sender.rs
+++ b/tokio-threadpool/src/sender.rs
@@ -1,11 +1,11 @@
-use pool::{self, Pool, Lifecycle, MAX_FUTURES};
+use pool::{self, Lifecycle, Pool, MAX_FUTURES};
use task::Task;
-use std::sync::Arc;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::Arc;
-use tokio_executor::{self, SpawnError};
use futures::{future, Future};
+use tokio_executor::{self, SpawnError};
/// Submit futures to the associated thread pool for execution.
///
@@ -77,7 +77,8 @@ impl Sender {
/// # }
/// ```
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
- where F: Future<Item = (), Error = ()> + Send + 'static,
+ where
+ F: Future<Item = (), Error = ()> + Send + 'static,
{
let mut s = self;
tokio_executor::Executor::spawn(&mut s, Box::new(future))
@@ -104,8 +105,11 @@ impl Sender {
next.inc_num_futures();
- let actual = self.pool.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ let actual = self
+ .pool
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
if actual == state {
trace!("execute; count={:?}", next.num_futures());
@@ -125,9 +129,10 @@ impl tokio_executor::Executor for Sender {
tokio_executor::Executor::status(&s)
}
- fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
- -> Result<(), SpawnError>
- {
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
let mut s = &*self;
tokio_executor::Executor::spawn(&mut s, future)
}
@@ -150,9 +155,10 @@ impl<'a> tokio_executor::Executor for &'a Sender {
Ok(())
}
- fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
- -> Result<(), SpawnError>
- {
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
self.prepare_for_spawn()?;
// At this point, the pool has accepted the future, so schedule it for
@@ -171,7 +177,8 @@ impl<'a> tokio_executor::Executor for &'a Sender {
}
impl<T> future::Executor<T> for Sender
-where T: Future<Item = (), Error = ()> + Send + 'static,
+where
+ T: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
if let Err(e) = tokio_executor::Executor::status(self) {
diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs
index 290cb182..c3d04a00 100644
--- a/tokio-threadpool/src/shutdown.rs
+++ b/tokio-threadpool/src/shutdown.rs
@@ -2,8 +2,8 @@ use task::Task;
use worker;
use crossbeam_deque::Injector;
-use futures::{Future, Poll, Async};
use futures::task::AtomicTask;
+use futures::{Async, Future, Poll};
use std::sync::{Arc, Mutex};
diff --git a/tokio-threadpool/src/task/blocking.rs b/tokio-threadpool/src/task/blocking.rs
index cdf2ceff..ded59edf 100644
--- a/tokio-threadpool/src/task/blocking.rs
+++ b/tokio-threadpool/src/task/blocking.rs
@@ -1,14 +1,14 @@
use pool::Pool;
-use task::{Task, BlockingState};
+