diff options
author | Carl Lerche <me@carllerche.com> | 2019-02-21 11:56:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-21 11:56:15 -0800 |
commit | 80162306e71c8561873a9c9496d65f2c1387d119 (patch) | |
tree | 83327ca8d9d1326d54e3c679e1fb4eb16775d4be /tokio-threadpool | |
parent | ab595d08253dd7ee0422144f8dafffa382700976 (diff) |
chore: apply rustfmt to all crates (#917)
Diffstat (limited to 'tokio-threadpool')
27 files changed, 393 insertions, 370 deletions
diff --git a/tokio-threadpool/benches/basic.rs b/tokio-threadpool/benches/basic.rs index e2d43bbd..70dee74f 100644 --- a/tokio-threadpool/benches/basic.rs +++ b/tokio-threadpool/benches/basic.rs @@ -1,11 +1,11 @@ #![feature(test)] #![deny(warnings)] -extern crate tokio_threadpool; extern crate futures; extern crate futures_cpupool; extern crate num_cpus; extern crate test; +extern crate tokio_threadpool; const NUM_SPAWN: usize = 10_000; const NUM_YIELD: usize = 1_000; @@ -13,12 +13,12 @@ const TASKS_PER_CPU: usize = 50; mod threadpool { use futures::{future, task, Async}; - use tokio_threadpool::*; use num_cpus; - use test; - use std::sync::{mpsc, Arc}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; + use std::sync::{mpsc, Arc}; + use test; + use tokio_threadpool::*; #[bench] fn spawn_many(b: &mut test::Bencher) { @@ -90,14 +90,14 @@ mod threadpool { // See rust-lang-nursery/futures-rs#617 // mod cpupool { - use futures::{task, Async}; use futures::future::{self, Executor}; + use futures::{task, Async}; use futures_cpupool::*; use num_cpus; - use test; - use std::sync::{mpsc, Arc}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; + use std::sync::{mpsc, Arc}; + use test; #[bench] fn spawn_many(b: &mut test::Bencher) { @@ -119,7 +119,9 @@ mod cpupool { } Ok(()) - })).ok().unwrap(); + })) + .ok() + .unwrap(); } let _ = rx.recv().unwrap(); @@ -151,7 +153,9 @@ mod cpupool { // Not ready Ok(Async::NotReady) } - })).ok().unwrap(); + })) + .ok() + .unwrap(); } for _ in 0..tasks { diff --git a/tokio-threadpool/benches/blocking.rs b/tokio-threadpool/benches/blocking.rs index ea432c88..8ea900ea 100644 --- a/tokio-threadpool/benches/blocking.rs +++ b/tokio-threadpool/benches/blocking.rs @@ -3,9 +3,9 @@ extern crate futures; extern crate rand; -extern crate tokio_threadpool; -extern crate threadpool; extern crate test; +extern crate threadpool; +extern crate tokio_threadpool; const ITER: usize = 1_000; @@ -13,14 +13,11 @@ mod blocking { use super::*; use futures::future::*; - use tokio_threadpool::{Builder, blocking}; + use tokio_threadpool::{blocking, Builder}; #[bench] fn cpu_bound(b: &mut test::Bencher) { - let pool = Builder::new() - .pool_size(2) - .max_blocking(20) - .build(); + let pool = Builder::new().pool_size(2).max_blocking(20).build(); b.iter(|| { let count_down = Arc::new(CountDown::new(::ITER)); @@ -29,17 +26,12 @@ mod blocking { let count_down = count_down.clone(); pool.spawn(lazy(move || { - poll_fn(|| { - blocking(|| { - perform_complex_computation() + poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!())) + .and_then(move |_| { + // Do something with the value + count_down.dec(); + Ok(()) }) - .map_err(|_| panic!()) - }) - .and_then(move |_| { - // Do something with the value - count_down.dec(); - Ok(()) - }) })); } @@ -57,10 +49,7 @@ mod message_passing { #[bench] fn cpu_bound(b: &mut test::Bencher) { - let pool = Builder::new() - .pool_size(2) - .max_blocking(20) - .build(); + let pool = Builder::new().pool_size(2).max_blocking(20).build(); let blocking = threadpool::ThreadPool::new(20); @@ -85,7 +74,8 @@ mod message_passing { rx.and_then(move |_| { count_down.dec(); Ok(()) - }).map_err(|_| panic!()) + }) + .map_err(|_| panic!()) })); } @@ -104,9 +94,9 @@ fn perform_complex_computation() -> usize { // Util for waiting until the tasks complete -use std::sync::*; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::*; +use std::sync::*; struct CountDown { rem: AtomicUsize, diff --git a/tokio-threadpool/benches/depth.rs b/tokio-threadpool/benches/depth.rs index d500ad4a..2d89ac90 100644 --- a/tokio-threadpool/benches/depth.rs +++ b/tokio-threadpool/benches/depth.rs @@ -1,19 +1,19 @@ #![feature(test)] #![deny(warnings)] -extern crate tokio_threadpool; extern crate futures; extern crate futures_cpupool; extern crate num_cpus; extern crate test; +extern crate tokio_threadpool; const ITER: usize = 20_000; mod us { - use tokio_threadpool::*; use futures::future; - use test; use std::sync::mpsc; + use test; + use tokio_threadpool::*; #[bench] fn chained_spawn(b: &mut test::Bencher) { @@ -24,10 +24,12 @@ mod us { res_tx.send(()).unwrap(); } else { let pool_tx2 = pool_tx.clone(); - pool_tx.spawn(future::lazy(move || { - spawn(pool_tx2, res_tx, n - 1); - Ok(()) - })).unwrap(); + pool_tx + .spawn(future::lazy(move || { + spawn(pool_tx2, res_tx, n - 1); + Ok(()) + })) + .unwrap(); } } @@ -44,8 +46,8 @@ mod cpupool { use futures::future::{self, Executor}; use futures_cpupool::*; use num_cpus; - use test; use std::sync::mpsc; + use test; #[bench] fn chained_spawn(b: &mut test::Bencher) { @@ -59,7 +61,9 @@ mod cpupool { pool.execute(future::lazy(move || { spawn(pool2, res_tx, n - 1); Ok(()) - })).ok().unwrap(); + })) + .ok() + .unwrap(); } } diff --git a/tokio-threadpool/examples/depth.rs b/tokio-threadpool/examples/depth.rs index 7957f09e..3d376dd3 100644 --- a/tokio-threadpool/examples/depth.rs +++ b/tokio-threadpool/examples/depth.rs @@ -1,9 +1,9 @@ +extern crate env_logger; extern crate futures; extern crate tokio_threadpool; -extern crate env_logger; -use tokio_threadpool::*; use futures::future::{self, Executor}; +use tokio_threadpool::*; use std::sync::mpsc; @@ -22,7 +22,9 @@ fn chained_spawn() { tx.execute(future::lazy(move || { spawn(tx2, res_tx, n - 1); Ok(()) - })).ok().unwrap(); + })) + .ok() + .unwrap(); } } diff --git a/tokio-threadpool/examples/hello.rs b/tokio-threadpool/examples/hello.rs index 3324f862..87eb688c 100644 --- a/tokio-threadpool/examples/hello.rs +++ b/tokio-threadpool/examples/hello.rs @@ -1,10 +1,10 @@ +extern crate env_logger; extern crate futures; extern crate tokio_threadpool; -extern crate env_logger; -use tokio_threadpool::*; -use futures::*; use futures::sync::oneshot; +use futures::*; +use tokio_threadpool::*; pub fn main() { let _ = ::env_logger::init(); @@ -12,10 +12,13 @@ pub fn main() { let pool = ThreadPool::new(); let tx = pool.sender().clone(); - let res = oneshot::spawn(future::lazy(|| { - println!("Running on the pool"); - Ok::<_, ()>("complete") - }), &tx); + let res = oneshot::spawn( + future::lazy(|| { + println!("Running on the pool"); + Ok::<_, ()>("complete") + }), + &tx, + ); println!("Result: {:?}", res.wait()); } diff --git a/tokio-threadpool/src/blocking.rs b/tokio-threadpool/src/blocking.rs index 88cdd15f..9f91234b 100644 --- a/tokio-threadpool/src/blocking.rs +++ b/tokio-threadpool/src/blocking.rs @@ -122,7 +122,8 @@ pub struct BlockingError { /// } /// ``` pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError> -where F: FnOnce() -> T, +where + F: FnOnce() -> T, { let res = Worker::with_current(|worker| { let worker = match worker { @@ -148,8 +149,7 @@ where F: FnOnce() -> T, // back ownership of the worker if the worker handoff didn't complete yet. Worker::with_current(|worker| { // Worker must be set since it was above. - worker.unwrap() - .transition_from_blocking(); + worker.unwrap().transition_from_blocking(); }); // Return the result diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 82c7cac6..1cb2ec52 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -1,21 +1,21 @@ use callback::Callback; use config::{Config, MAX_WORKERS}; use park::{BoxPark, BoxedPark, DefaultPark}; -use shutdown::ShutdownTrigger; use pool::{Pool, MAX_BACKUP}; +use shutdown::ShutdownTrigger; use thread_pool::ThreadPool; use worker::{self, Worker, WorkerId}; +use std::cmp::max; use std::error::Error; use std::fmt; use std::sync::Arc; use std::time::Duration; -use std::cmp::max; use crossbeam_deque::Injector; use num_cpus; -use tokio_executor::Enter; use tokio_executor::park::Park; +use tokio_executor::Enter; /// Builds a thread pool with custom configuration values. /// @@ -93,10 +93,8 @@ impl Builder { pub fn new() -> Builder { let num_cpus = max(1, num_cpus::get()); - let new_park = Box::new(|_: &WorkerId| { - Box::new(BoxedPark::new(DefaultPark::new())) - as BoxPark - }); + let new_park = + Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark); Builder { pool_size: num_cpus, @@ -280,7 +278,8 @@ impl Builder { /// /// [`Worker::run`]: struct.Worker.html#method.run pub fn around_worker<F>(&mut self, f: F) -> &mut Self - where F: Fn(&Worker, &mut Enter) + Send + Sync + 'static + where + F: Fn(&Worker, &mut Enter) + Send + Sync + 'static, { self.config.around_worker = Some(Callback::new(f)); self @@ -307,7 +306,8 @@ impl Builder { /// # } /// ``` pub fn after_start<F>(&mut self, f: F) -> &mut Self - where F: Fn() + Send + Sync + 'static + where + F: Fn() + Send + Sync + 'static, { self.config.after_start = Some(Arc::new(f)); self @@ -333,7 +333,8 @@ impl Builder { /// # } /// ``` pub fn before_stop<F>(&mut self, f: F) -> &mut Self - where F: Fn() + Send + Sync + 'static + where + F: Fn() + Send + Sync + 'static, { self.config.before_stop = Some(Arc::new(f)); self @@ -369,13 +370,12 @@ impl Builder { /// # } /// ``` pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self - where F: Fn(&WorkerId) -> P + 'static, - P: Park + Send + 'static, - P::Error: Error, + where + F: Fn(&WorkerId) -> P + 'static, + P: Park + Send + 'static, + P::Error: Error, { - self.new_park = Box::new(move |id| { - Box::new(BoxedPark::new(f(id))) - }); + self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id)))); self } diff --git a/tokio-threadpool/src/callback.rs b/tokio-threadpool/src/callback.rs index e269872a..aabf876f 100644 --- a/tokio-threadpool/src/callback.rs +++ b/tokio-threadpool/src/callback.rs @@ -12,7 +12,8 @@ pub(crate) struct Callback { impl Callback { pub fn new<F>(f: F) -> Self - where F: Fn(&Worker, &mut Enter) + Send + Sync + 'static + where + F: Fn(&Worker, &mut Enter) + Send + Sync + 'static, { Callback { f: Arc::new(f) } } diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index 94d86700..4ea3d6a9 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -159,5 +159,5 @@ pub use blocking::{blocking, BlockingError}; pub use builder::Builder; pub use sender::Sender; pub use shutdown::Shutdown; -pub use thread_pool::{ThreadPool, SpawnHandle}; +pub use thread_pool::{SpawnHandle, ThreadPool}; pub use worker::{Worker, WorkerId}; diff --git a/tokio-threadpool/src/park/boxed.rs b/tokio-threadpool/src/park/boxed.rs index bd3671d4..8beaa0bb 100644 --- a/tokio-threadpool/src/park/boxed.rs +++ b/tokio-threadpool/src/park/boxed.rs @@ -15,7 +15,8 @@ impl<T> BoxedPark<T> { } impl<T: Park + Send> Park for BoxedPark<T> -where T::Error: Error, +where + T::Error: Error, { type Unpark = BoxUnpark; type Error = (); @@ -25,16 +26,20 @@ where T::Error: Error, } fn park(&mut self) -> Result<(), Self::Error> { - self.0.park() - .map_err(|e| { - warn!("calling `park` on worker thread errored -- shutting down thread: {}", e); - }) + self.0.park().map_err(|e| { + warn!( + "calling `park` on worker thread errored -- shutting down thread: {}", + e + ); + }) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.0.park_timeout(duration) - .map_err(|e| { - warn!("calling `park` on worker thread errored -- shutting down thread: {}", e); - }) + self.0.park_timeout(duration).map_err(|e| { + warn!( + "calling `park` on worker thread errored -- shutting down thread: {}", + e + ); + }) } } diff --git a/tokio-threadpool/src/pool/backup.rs b/tokio-threadpool/src/pool/backup.rs index feaff306..e94e95d6 100644 --- a/tokio-threadpool/src/pool/backup.rs +++ b/tokio-threadpool/src/pool/backup.rs @@ -1,10 +1,10 @@ use park::DefaultPark; -use worker::{WorkerId}; +use worker::WorkerId; use std::cell::UnsafeCell; use std::fmt; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed}; use std::time::{Duration, Instant}; /// State associated with a thread in the thread pool. @@ -100,9 +100,11 @@ impl Backup { }); // The handoff value is equal to `worker_id` - debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); + debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); - unsafe { *self.handoff.get() = None; } + unsafe { + *self.handoff.get() = None; + } } pub fn is_running(&self) -> bool { @@ -167,10 +169,7 @@ impl Backup { return Handoff::Terminated; } - let worker_id = unsafe { - (*self.handoff.get()).take() - .expect("no worker handoff") - }; + let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") }; return Handoff::Worker(worker_id); } @@ -192,10 +191,10 @@ impl Backup { let mut next = state; next.unset_running(); - let actual = self.state.compare_and_swap( - state.into(), - next.into(), - AcqRel).into(); + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if actual == state { debug_assert!(!next.is_running()); @@ -226,7 +225,9 @@ impl Backup { #[inline] pub fn set_next_sleeper(&self, val: BackupId) { - unsafe { *self.next_sleeper.get() = val; } + unsafe { + *self.next_sleeper.get() = val; + } } } @@ -271,8 +272,9 @@ impl State { next.set_running(); next.unset_pushed(); - let actual = state.compare_and_swap( - curr.into(), next.into(), AcqRel).into(); + let actual = state + .compare_and_swap(curr.into(), next.into(), AcqRel) + .into(); if actual == curr { return curr; diff --git a/tokio-threadpool/src/pool/backup_stack.rs b/tokio-threadpool/src/pool/backup_stack.rs index aa69e143..b9a46d08 100644 --- a/tokio-threadpool/src/pool/backup_stack.rs +++ b/tokio-threadpool/src/pool/backup_stack.rs @@ -1,7 +1,7 @@ use pool::{Backup, BackupId}; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Acquire, AcqRel}; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; #[derive(Debug)] pub(crate) struct BackupStack { @@ -65,8 +65,10 @@ impl BackupStack { entries[id.0].set_next_sleeper(head); next.set_head(id); - let actual = self.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if state == actual { return Ok(()); @@ -110,8 +112,10 @@ impl BackupStack { return Ok(None); } - let actual = self.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if actual != state { state = actual; @@ -138,8 +142,10 @@ impl BackupStack { next.set_head(next_head); } - let actual = self.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if actual == state { debug_assert!(entries[head.0].is_pushed()); diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 2326fca0..92917835 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -4,11 +4,7 @@ mod state; pub(crate) use self::backup::{Backup, BackupId}; pub(crate) use self::backup_stack::MAX_BACKUP; -pub(crate) use self::state::{ - State, - Lifecycle, - MAX_FUTURES, -}; +pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES}; use self::backup::Handoff; use self::backup_stack::BackupStack; @@ -22,8 +18,8 @@ use futures::Poll; use std::cell::Cell; use std::num::Wrapping; -use std::sync::atomic::Ordering::{Acquire, AcqRel}; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; use std::sync::{Arc, Weak}; use std::thread; @@ -100,15 +96,15 @@ impl Pool { // // This is `backup + pool_size` because the core thread pool running the // workers is spawned from backup as well. - let backup = (0..total_size).map(|_| { - Backup::new() - }).collect::<Vec<_>>().into_boxed_slice(); + let backup = (0..total_size) + .map(|_| Backup::new()) + .collect::<Vec<_>>() + .into_boxed_slice(); let backup_stack = BackupStack::new(); for i in (0..backup.len()).rev() { - backup_stack.push(&backup, BackupId(i)) - .unwrap(); + backup_stack.push(&backup, BackupId(i)).unwrap(); } // Initialize the blocking state @@ -174,8 +170,10 @@ impl Pool { } } - let actual = self.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if state == actual { state = next; @@ -299,8 +297,7 @@ impl Pool { } }; - let need_spawn = self.backup[backup_id.0] - .worker_handoff(id.clone()); + let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone()); if !need_spawn { return; @@ -355,8 +352,7 @@ impl Pool { // available for future handoffs. // // This **must** happen before notifying the task. - let res = pool.backup_stack - .push(&pool.backup, backup_id); + let res = pool.backup_stack.push(&pool.backup, backup_id); if res.is_err() { // The pool is being shutdown. @@ -370,8 +366,7 @@ impl Pool { debug_assert!(pool.backup[backup_id.0].is_running()); // Wait for a handoff - let handoff = pool.backup[backup_id.0] - .wait_for_handoff(pool.config.keep_alive); + let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive); match handoff { Handoff::Worker(id) => { @@ -407,7 +402,8 @@ impl Pool { debug_assert!( worker_state.lifecycle() != Signaled, - "actual={:?}", worker_state.lifecycle(), + "actual={:?}", + worker_state.lifecycle(), ); trace!("signal_work -- notify; idx={}", idx); diff --git a/tokio-threadpool/src/pool/state.rs b/tokio-threadpool/src/pool/state.rs index e8f5d12e..5ecb514e 100644 --- a/tokio-threadpool/src/pool/state.rs +++ b/tokio-threadpool/src/pool/state.rs @@ -82,8 +82,7 @@ impl State { } pub fn is_terminated(&self) -> bool { - self.lifecycle() == Lifecycle::ShutdownNow && - self.num_futures() == 0 + self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0 } } @@ -115,9 +114,10 @@ impl From<usize> for Lifecycle { use self::Lifecycle::*; debug_assert!( - src == Running as usize || - src == ShutdownOnIdle as usize || - src == ShutdownNow as usize); + src == Running as usize + || src == ShutdownOnIdle as usize + || src == ShutdownNow as usize + ); unsafe { ::std::mem::transmute(src) } } diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index de5f0e07..15befd43 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -1,11 +1,11 @@ -use pool::{self, Pool, Lifecycle, MAX_FUTURES}; +use pool::{self, Lifecycle, Pool, MAX_FUTURES}; use task::Task; -use std::sync::Arc; use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::Arc; -use tokio_executor::{self, SpawnError}; use futures::{future, Future}; +use tokio_executor::{self, SpawnError}; /// Submit futures to the associated thread pool for execution. /// @@ -77,7 +77,8 @@ impl Sender { /// # } /// ``` pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> - where F: Future<Item = (), Error = ()> + Send + 'static, + where + F: Future<Item = (), Error = ()> + Send + 'static, { let mut s = self; tokio_executor::Executor::spawn(&mut s, Box::new(future)) @@ -104,8 +105,11 @@ impl Sender { next.inc_num_futures(); - let actual = self.pool.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + let actual = self + .pool + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); if actual == state { trace!("execute; count={:?}", next.num_futures()); @@ -125,9 +129,10 @@ impl tokio_executor::Executor for Sender { tokio_executor::Executor::status(&s) } - fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) - -> Result<(), SpawnError> - { + fn spawn( + &mut self, + future: Box<Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { let mut s = &*self; tokio_executor::Executor::spawn(&mut s, future) } @@ -150,9 +155,10 @@ impl<'a> tokio_executor::Executor for &'a Sender { Ok(()) } - fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) - -> Result<(), SpawnError> - { + fn spawn( + &mut self, + future: Box<Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { self.prepare_for_spawn()?; // At this point, the pool has accepted the future, so schedule it for @@ -171,7 +177,8 @@ impl<'a> tokio_executor::Executor for &'a Sender { } impl<T> future::Executor<T> for Sender -where T: Future<Item = (), Error = ()> + Send + 'static, +where + T: Future<Item = (), Error = ()> + Send + 'static, { fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { if let Err(e) = tokio_executor::Executor::status(self) { diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs index 290cb182..c3d04a00 100644 --- a/tokio-threadpool/src/shutdown.rs +++ b/tokio-threadpool/src/shutdown.rs @@ -2,8 +2,8 @@ use task::Task; use worker; use crossbeam_deque::Injector; -use futures::{Future, Poll, Async}; use futures::task::AtomicTask; +use futures::{Async, Future, Poll}; use std::sync::{Arc, Mutex}; diff --git a/tokio-threadpool/src/task/blocking.rs b/tokio-threadpool/src/task/blocking.rs index cdf2ceff..ded59edf 100644 --- a/tokio-threadpool/src/task/blocking.rs +++ b/tokio-threadpool/src/task/blocking.rs @@ -1,14 +1,14 @@ use pool::Pool; -use task::{Task, BlockingState}; + |