diff options
Diffstat (limited to 'tokio/src/runtime/thread_pool/slice.rs')
-rw-r--r-- | tokio/src/runtime/thread_pool/slice.rs | 76 |
1 files changed, 31 insertions, 45 deletions
diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs index 4b3ef996..aa521a15 100644 --- a/tokio/src/runtime/thread_pool/slice.rs +++ b/tokio/src/runtime/thread_pool/slice.rs @@ -3,7 +3,8 @@ //! slice. use crate::loom::rand::seed; -use crate::runtime::park::Unpark; +use crate::park::Park; +use crate::runtime::Parker; use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared}; use crate::task::{self, JoinHandle, Task}; use crate::util::{CachePadded, FastRand}; @@ -11,48 +12,38 @@ use crate::util::{CachePadded, FastRand}; use std::cell::UnsafeCell; use std::future::Future; -pub(super) struct Set<P> -where - P: 'static, -{ +pub(super) struct Set { /// Data accessible from all workers. - shared: Box<[Shared<P>]>, + shared: Box<[Shared]>, /// Data owned by the worker. - owned: Box<[UnsafeCell<CachePadded<Owned<P>>>]>, + owned: Box<[UnsafeCell<CachePadded<Owned>>]>, /// Submit work to the pool while *not* currently on a worker thread. - inject: queue::Inject<Shared<P>>, + inject: queue::Inject<Shared>, /// Coordinates idle workers idle: Idle, } -unsafe impl<P: Unpark> Send for Set<P> {} -unsafe impl<P: Unpark> Sync for Set<P> {} +unsafe impl Send for Set {} +unsafe impl Sync for Set {} -impl<P> Set<P> -where - P: Unpark, -{ +impl Set { /// Create a new worker set using the provided queues. - pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self - where - F: FnMut(usize) -> P, - { - assert!(num_workers > 0); + pub(crate) fn new(parkers: &[Parker]) -> Self { + assert!(!parkers.is_empty()); - let queues = queue::build(num_workers); + let queues = queue::build(parkers.len()); let inject = queues[0].injector(); let mut shared = Vec::with_capacity(queues.len()); let mut owned = Vec::with_capacity(queues.len()); for (i, queue) in queues.into_iter().enumerate() { - let unpark = mk_unpark(i); let rand = FastRand::new(seed()); - shared.push(Shared::new(unpark)); + shared.push(Shared::new(parkers[i].unpark())); owned.push(UnsafeCell::new(CachePadded::new(Owned::new(queue, rand)))); } @@ -60,12 +51,21 @@ where shared: shared.into_boxed_slice(), owned: owned.into_boxed_slice(), inject, - idle: Idle::new(num_workers), - // blocking, + idle: Idle::new(parkers.len()), } } - fn inject_task(&self, task: Task<Shared<P>>) { + pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.schedule(task); + handle + } + + fn inject_task(&self, task: Task<Shared>) { self.inject.push(task, |res| { if let Err(task) = res { task.shutdown(); @@ -95,7 +95,7 @@ where } } - pub(crate) fn schedule(&self, task: Task<Shared<P>>) { + pub(crate) fn schedule(&self, task: Task<Shared>) { current::get(|current_worker| match current_worker.as_member(self) { Some(worker) => { if worker.submit_local(task) { @@ -136,28 +136,26 @@ where self.shared.len() } - pub(super) fn index_of(&self, shared: &Shared<P>) -> usize { + pub(super) fn index_of(&self, shared: &Shared) -> usize { use std::mem; - let size = mem::size_of::<Shared<P>>(); + let size = mem::size_of::<Shared>(); ((shared as *const _ as usize) - (&self.shared[0] as *const _ as usize)) / size } - pub(super) fn shared(&self) -> &[Shared<P>] { + pub(super) fn shared(&self) -> &[Shared] { &self.shared } - pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned<P>>>] { + pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned>>] { &self.owned } pub(super) fn idle(&self) -> &Idle { &self.idle } -} -impl<P: 'static> Set<P> { /// Wait for all locks on the injection queue to drop. /// /// This is done by locking w/o doing anything. @@ -166,21 +164,9 @@ impl<P: 'static> Set<P> { } } -impl<P: 'static> Drop for Set<P> { +impl Drop for Set { fn drop(&mut self) { // Before proceeding, wait for all concurrent wakers to exit self.wait_for_unlocked(); } } - -impl Set<Box<dyn Unpark>> { - pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.schedule(task); - handle - } -} |