summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/queue/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/thread_pool/queue/worker.rs')
-rw-r--r--tokio/src/runtime/thread_pool/queue/worker.rs127
1 files changed, 0 insertions, 127 deletions
diff --git a/tokio/src/runtime/thread_pool/queue/worker.rs b/tokio/src/runtime/thread_pool/queue/worker.rs
deleted file mode 100644
index 6d364896..00000000
--- a/tokio/src/runtime/thread_pool/queue/worker.rs
+++ /dev/null
@@ -1,127 +0,0 @@
-use crate::loom::sync::Arc;
-use crate::runtime::thread_pool::queue::{local, Cluster, Inject};
-use crate::task::Task;
-
-use std::cell::Cell;
-use std::fmt;
-
-pub(crate) struct Worker<T: 'static> {
- cluster: Arc<Cluster<T>>,
- index: u16,
- /// Task to pop next
- next: Cell<Option<Task<T>>>,
-}
-
-impl<T: 'static> Worker<T> {
- pub(super) fn new(cluster: Arc<Cluster<T>>, index: usize) -> Worker<T> {
- Worker {
- cluster,
- index: index as u16,
- next: Cell::new(None),
- }
- }
-
- pub(crate) fn injector(&self) -> Inject<T> {
- Inject::new(self.cluster.clone())
- }
-
- /// Returns `true` if the queue is closed
- pub(crate) fn is_closed(&self) -> bool {
- self.cluster.global.is_closed()
- }
-
- /// Pushes to the local queue.
- ///
- /// If the local queue is full, the task is pushed onto the global queue.
- ///
- /// # Return
- ///
- /// Returns `true` if the pushed task can be stolen by another worker.
- pub(crate) fn push(&self, task: Task<T>) -> bool {
- let prev = self.next.take();
- let ret = prev.is_some();
-
- if let Some(prev) = prev {
- // safety: we guarantee that only one thread pushes to this local
- // queue at a time.
- unsafe {
- self.local().push(prev, &self.cluster.global);
- }
- }
-
- self.next.set(Some(task));
-
- ret
- }
-
- pub(crate) fn push_yield(&self, task: Task<T>) {
- unsafe { self.local().push(task, &self.cluster.global) }
- }
-
- /// Pops a task checking the local queue first.
- pub(crate) fn pop_local_first(&self) -> Option<Task<T>> {
- self.local_pop().or_else(|| self.cluster.global.pop())
- }
-
- /// Pops a task checking the global queue first.
- pub(crate) fn pop_global_first(&self) -> Option<Task<T>> {
- self.cluster.global.pop().or_else(|| self.local_pop())
- }
-
- /// Steals from other local queues.
- ///
- /// `start` specifies the queue from which to start stealing.
- pub(crate) fn steal(&self, start: usize) -> Option<Task<T>> {
- let num_queues = self.cluster.local.len();
-
- for i in 0..num_queues {
- let i = (start + i) % num_queues;
-
- if i == self.index as usize {
- continue;
- }
-
- // safety: we own the dst queue
- let ret = unsafe { self.cluster.local[i].steal(self.local()) };
-
- if ret.is_some() {
- return ret;
- }
- }
-
- None
- }
-
- /// An approximation of whether or not the queue is empty.
- pub(crate) fn is_empty(&self) -> bool {
- for local_queue in &self.cluster.local[..] {
- if !local_queue.is_empty() {
- return false;
- }
- }
-
- self.cluster.global.is_empty()
- }
-
- fn local_pop(&self) -> Option<Task<T>> {
- if let Some(task) = self.next.take() {
- return Some(task);
- }
- // safety: we guarantee that only one thread pushes to this local queue
- // at a time.
- unsafe { self.local().pop() }
- }
-
- fn local(&self) -> &local::Queue<T> {
- &self.cluster.local[self.index as usize]
- }
-}
-
-impl<T: 'static> fmt::Debug for Worker<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("queue::Worker")
- .field("cluster", &"...")
- .field("index", &self.index)
- .finish()
- }
-}