summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/thread_pool/queue/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor/thread_pool/queue/worker.rs')
-rw-r--r--tokio/src/executor/thread_pool/queue/worker.rs127
1 files changed, 127 insertions, 0 deletions
diff --git a/tokio/src/executor/thread_pool/queue/worker.rs b/tokio/src/executor/thread_pool/queue/worker.rs
new file mode 100644
index 00000000..2cd34a97
--- /dev/null
+++ b/tokio/src/executor/thread_pool/queue/worker.rs
@@ -0,0 +1,127 @@
+use crate::executor::loom::sync::Arc;
+use crate::executor::task::Task;
+use crate::executor::thread_pool::queue::{local, Cluster, Inject};
+
+use std::cell::Cell;
+use std::fmt;
+
+pub(crate) struct Worker<T: 'static> {
+ cluster: Arc<Cluster<T>>,
+ index: u16,
+ /// Task to pop next
+ next: Cell<Option<Task<T>>>,
+}
+
+impl<T: 'static> Worker<T> {
+ pub(super) fn new(cluster: Arc<Cluster<T>>, index: usize) -> Worker<T> {
+ Worker {
+ cluster,
+ index: index as u16,
+ next: Cell::new(None),
+ }
+ }
+
+ pub(crate) fn injector(&self) -> Inject<T> {
+ Inject::new(self.cluster.clone())
+ }
+
+ /// Returns `true` if the queue is closed
+ pub(crate) fn is_closed(&self) -> bool {
+ self.cluster.global.is_closed()
+ }
+
+ /// Push to the local queue.
+ ///
+ /// If the local queue is full, the task is pushed onto the global queue.
+ ///
+ /// # Return
+ ///
+ /// Returns `true` if the pushed task can be stolen by another worker.
+ pub(crate) fn push(&self, task: Task<T>) -> bool {
+ let prev = self.next.take();
+ let ret = prev.is_some();
+
+ if let Some(prev) = prev {
+ // safety: we guarantee that only one thread pushes to this local
+ // queue at a time.
+ unsafe {
+ self.local().push(prev, &self.cluster.global);
+ }
+ }
+
+ self.next.set(Some(task));
+
+ ret
+ }
+
+ pub(crate) fn push_yield(&self, task: Task<T>) {
+ unsafe { self.local().push(task, &self.cluster.global) }
+ }
+
+ /// Pop a task checking the local queue first.
+ pub(crate) fn pop_local_first(&self) -> Option<Task<T>> {
+ self.local_pop().or_else(|| self.cluster.global.pop())
+ }
+
+ /// Pop a task checking the global queue first.
+ pub(crate) fn pop_global_first(&self) -> Option<Task<T>> {
+ self.cluster.global.pop().or_else(|| self.local_pop())
+ }
+
+ /// Steal from other local queues.
+ ///
+ /// `start` specifies the queue from which to start stealing.
+ pub(crate) fn steal(&self, start: usize) -> Option<Task<T>> {
+ let num_queues = self.cluster.local.len();
+
+ for i in 0..num_queues {
+ let i = (start + i) % num_queues;
+
+ if i == self.index as usize {
+ continue;
+ }
+
+ // safety: we own the dst queue
+ let ret = unsafe { self.cluster.local[i].steal(self.local()) };
+
+ if ret.is_some() {
+ return ret;
+ }
+ }
+
+ None
+ }
+
+ /// An approximation of whether or not the queue is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ for local_queue in &self.cluster.local[..] {
+ if !local_queue.is_empty() {
+ return false;
+ }
+ }
+
+ self.cluster.global.is_empty()
+ }
+
+ fn local_pop(&self) -> Option<Task<T>> {
+ if let Some(task) = self.next.take() {
+ return Some(task);
+ }
+ // safety: we guarantee that only one thread pushes to this local queue
+ // at a time.
+ unsafe { self.local().pop() }
+ }
+
+ fn local(&self) -> &local::Queue<T> {
+ &self.cluster.local[self.index as usize]
+ }
+}
+
+impl<T: 'static> fmt::Debug for Worker<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("queue::Worker")
+ .field("cluster", &"...")
+ .field("index", &self.index)
+ .finish()
+ }
+}