summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/slice.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/thread_pool/slice.rs')
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs76
1 files changed, 31 insertions, 45 deletions
diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs
index 4b3ef996..aa521a15 100644
--- a/tokio/src/runtime/thread_pool/slice.rs
+++ b/tokio/src/runtime/thread_pool/slice.rs
@@ -3,7 +3,8 @@
//! slice.
use crate::loom::rand::seed;
-use crate::runtime::park::Unpark;
+use crate::park::Park;
+use crate::runtime::Parker;
use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared};
use crate::task::{self, JoinHandle, Task};
use crate::util::{CachePadded, FastRand};
@@ -11,48 +12,38 @@ use crate::util::{CachePadded, FastRand};
use std::cell::UnsafeCell;
use std::future::Future;
-pub(super) struct Set<P>
-where
- P: 'static,
-{
+pub(super) struct Set {
/// Data accessible from all workers.
- shared: Box<[Shared<P>]>,
+ shared: Box<[Shared]>,
/// Data owned by the worker.
- owned: Box<[UnsafeCell<CachePadded<Owned<P>>>]>,
+ owned: Box<[UnsafeCell<CachePadded<Owned>>]>,
/// Submit work to the pool while *not* currently on a worker thread.
- inject: queue::Inject<Shared<P>>,
+ inject: queue::Inject<Shared>,
/// Coordinates idle workers
idle: Idle,
}
-unsafe impl<P: Unpark> Send for Set<P> {}
-unsafe impl<P: Unpark> Sync for Set<P> {}
+unsafe impl Send for Set {}
+unsafe impl Sync for Set {}
-impl<P> Set<P>
-where
- P: Unpark,
-{
+impl Set {
/// Create a new worker set using the provided queues.
- pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self
- where
- F: FnMut(usize) -> P,
- {
- assert!(num_workers > 0);
+ pub(crate) fn new(parkers: &[Parker]) -> Self {
+ assert!(!parkers.is_empty());
- let queues = queue::build(num_workers);
+ let queues = queue::build(parkers.len());
let inject = queues[0].injector();
let mut shared = Vec::with_capacity(queues.len());
let mut owned = Vec::with_capacity(queues.len());
for (i, queue) in queues.into_iter().enumerate() {
- let unpark = mk_unpark(i);
let rand = FastRand::new(seed());
- shared.push(Shared::new(unpark));
+ shared.push(Shared::new(parkers[i].unpark()));
owned.push(UnsafeCell::new(CachePadded::new(Owned::new(queue, rand))));
}
@@ -60,12 +51,21 @@ where
shared: shared.into_boxed_slice(),
owned: owned.into_boxed_slice(),
inject,
- idle: Idle::new(num_workers),
- // blocking,
+ idle: Idle::new(parkers.len()),
}
}
- fn inject_task(&self, task: Task<Shared<P>>) {
+ pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.schedule(task);
+ handle
+ }
+
+ fn inject_task(&self, task: Task<Shared>) {
self.inject.push(task, |res| {
if let Err(task) = res {
task.shutdown();
@@ -95,7 +95,7 @@ where
}
}
- pub(crate) fn schedule(&self, task: Task<Shared<P>>) {
+ pub(crate) fn schedule(&self, task: Task<Shared>) {
current::get(|current_worker| match current_worker.as_member(self) {
Some(worker) => {
if worker.submit_local(task) {
@@ -136,28 +136,26 @@ where
self.shared.len()
}
- pub(super) fn index_of(&self, shared: &Shared<P>) -> usize {
+ pub(super) fn index_of(&self, shared: &Shared) -> usize {
use std::mem;
- let size = mem::size_of::<Shared<P>>();
+ let size = mem::size_of::<Shared>();
((shared as *const _ as usize) - (&self.shared[0] as *const _ as usize)) / size
}
- pub(super) fn shared(&self) -> &[Shared<P>] {
+ pub(super) fn shared(&self) -> &[Shared] {
&self.shared
}
- pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned<P>>>] {
+ pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned>>] {
&self.owned
}
pub(super) fn idle(&self) -> &Idle {
&self.idle
}
-}
-impl<P: 'static> Set<P> {
/// Wait for all locks on the injection queue to drop.
///
/// This is done by locking w/o doing anything.
@@ -166,21 +164,9 @@ impl<P: 'static> Set<P> {
}
}
-impl<P: 'static> Drop for Set<P> {
+impl Drop for Set {
fn drop(&mut self) {
// Before proceeding, wait for all concurrent wakers to exit
self.wait_for_unlocked();
}
}
-
-impl Set<Box<dyn Unpark>> {
- pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let (task, handle) = task::joinable(future);
- self.schedule(task);
- handle
- }
-}