summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor')
-rw-r--r--tokio/src/executor/blocking/builder.rs58
-rw-r--r--tokio/src/executor/blocking/mod.rs332
-rw-r--r--tokio/src/executor/current_thread/mod.rs873
-rw-r--r--tokio/src/executor/current_thread/scheduler.rs808
-rw-r--r--tokio/src/executor/enter.rs139
-rw-r--r--tokio/src/executor/error.rs49
-rw-r--r--tokio/src/executor/executor.rs181
-rw-r--r--tokio/src/executor/global.rs233
-rw-r--r--tokio/src/executor/loom/mod.rs27
-rw-r--r--tokio/src/executor/loom/std/atomic_u32.rs44
-rw-r--r--tokio/src/executor/loom/std/atomic_usize.rs45
-rw-r--r--tokio/src/executor/loom/std/causal_cell.rs48
-rw-r--r--tokio/src/executor/loom/std/mod.rs77
-rw-r--r--tokio/src/executor/mod.rs84
-rw-r--r--tokio/src/executor/park/mod.rs140
-rw-r--r--tokio/src/executor/park/thread.rs263
-rw-r--r--tokio/src/executor/task/core.rs153
-rw-r--r--tokio/src/executor/task/error.rs48
-rw-r--r--tokio/src/executor/task/harness.rs546
-rw-r--r--tokio/src/executor/task/join.rs74
-rw-r--r--tokio/src/executor/task/list.rs70
-rw-r--r--tokio/src/executor/task/mod.rs130
-rw-r--r--tokio/src/executor/task/raw.rs190
-rw-r--r--tokio/src/executor/task/stack.rs85
-rw-r--r--tokio/src/executor/task/state.rs502
-rw-r--r--tokio/src/executor/task/tests/loom.rs277
-rw-r--r--tokio/src/executor/task/tests/mod.rs5
-rw-r--r--tokio/src/executor/task/tests/task.rs643
-rw-r--r--tokio/src/executor/task/waker.rs107
-rw-r--r--tokio/src/executor/tests/backoff.rs32
-rw-r--r--tokio/src/executor/tests/loom_oneshot.rs49
-rw-r--r--tokio/src/executor/tests/loom_schedule.rs51
-rw-r--r--tokio/src/executor/tests/mock_park.rs66
-rw-r--r--tokio/src/executor/tests/mock_schedule.rs131
-rw-r--r--tokio/src/executor/tests/mod.rs40
-rw-r--r--tokio/src/executor/tests/track_drop.rs57
-rw-r--r--tokio/src/executor/thread_pool/builder.rs259
-rw-r--r--tokio/src/executor/thread_pool/current.rs85
-rw-r--r--tokio/src/executor/thread_pool/idle.rs229
-rw-r--r--tokio/src/executor/thread_pool/join.rs42
-rw-r--r--tokio/src/executor/thread_pool/mod.rs58
-rw-r--r--tokio/src/executor/thread_pool/owned.rs77
-rw-r--r--tokio/src/executor/thread_pool/park.rs182
-rw-r--r--tokio/src/executor/thread_pool/pool.rs111
-rw-r--r--tokio/src/executor/thread_pool/queue/global.rs195
-rw-r--r--tokio/src/executor/thread_pool/queue/inject.rs36
-rw-r--r--tokio/src/executor/thread_pool/queue/local.rs298
-rw-r--r--tokio/src/executor/thread_pool/queue/mod.rs41
-rw-r--r--tokio/src/executor/thread_pool/queue/worker.rs127
-rw-r--r--tokio/src/executor/thread_pool/set.rs209
-rw-r--r--tokio/src/executor/thread_pool/shared.rs104
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs48
-rw-r--r--tokio/src/executor/thread_pool/spawner.rs61
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_pool.rs138
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_queue.rs68
-rw-r--r--tokio/src/executor/thread_pool/tests/mod.rs11
-rw-r--r--tokio/src/executor/thread_pool/tests/queue.rs281
-rw-r--r--tokio/src/executor/thread_pool/tests/worker.rs68
-rw-r--r--tokio/src/executor/thread_pool/worker.rs415
-rw-r--r--tokio/src/executor/typed.rs178
-rw-r--r--tokio/src/executor/util/mod.rs5
-rw-r--r--tokio/src/executor/util/pad.rs52
-rw-r--r--tokio/src/executor/util/rand.rs52
63 files changed, 10087 insertions, 0 deletions
diff --git a/tokio/src/executor/blocking/builder.rs b/tokio/src/executor/blocking/builder.rs
new file mode 100644
index 00000000..05ea28c2
--- /dev/null
+++ b/tokio/src/executor/blocking/builder.rs
@@ -0,0 +1,58 @@
+use crate::executor::blocking::Pool;
+use crate::executor::loom::thread;
+
+use std::usize;
+
+/// Builds a blocking thread pool with custom configuration values.
+pub(crate) struct Builder {
+ /// Thread name
+ name: String,
+
+ /// Thread stack size
+ stack_size: Option<usize>,
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Builder {
+ name: "tokio-blocking-thread".to_string(),
+ stack_size: None,
+ }
+ }
+}
+
+impl Builder {
+ /// Set name of threads spawned by the pool
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ self.name = val.into();
+ self
+ }
+
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.stack_size = Some(val);
+ self
+ }
+
+ pub(crate) fn build(self) -> Pool {
+ let mut p = Pool::default();
+ let Builder { stack_size, name } = self;
+ p.new_thread = Box::new(move || {
+ let mut b = thread::Builder::new().name(name.clone());
+ if let Some(stack_size) = stack_size {
+ b = b.stack_size(stack_size);
+ }
+ b
+ });
+ p
+ }
+}
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
new file mode 100644
index 00000000..16faa03e
--- /dev/null
+++ b/tokio/src/executor/blocking/mod.rs
@@ -0,0 +1,332 @@
+//! Thread pool for blocking operations
+
+use crate::executor::loom::sync::{Arc, Condvar, Mutex};
+use crate::executor::loom::thread;
+#[cfg(feature = "blocking")]
+use tokio_sync::oneshot;
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+#[cfg(feature = "blocking")]
+use std::future::Future;
+use std::ops::Deref;
+#[cfg(feature = "blocking")]
+use std::pin::Pin;
+#[cfg(feature = "blocking")]
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+#[cfg(feature = "rt-full")]
+mod builder;
+
+#[cfg(feature = "rt-full")]
+pub(crate) use builder::Builder;
+
+#[derive(Clone, Copy)]
+enum State {
+ Empty,
+ Ready(*const Arc<Pool>),
+}
+
+thread_local! {
+ /// Thread-local tracking the current executor
+ static BLOCKING: Cell<State> = Cell::new(State::Empty)
+}
+
+/// Set the blocking pool for the duration of the closure
+///
+/// If a blocking pool is already set, it will be restored when the closure returns or if it
+/// panics.
+#[allow(dead_code)] // we allow dead code since this won't be called if no executors are enabled
+pub(crate) fn with_pool<F, R>(pool: &Arc<Pool>, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // While scary, this is safe. The function takes a `&Pool`, which guarantees
+ // that the reference lives for the duration of `with_pool`.
+ //
+ // Because we are always clearing the TLS value at the end of the
+ // function, we can cast the reference to 'static which thread-local
+ // cells require.
+ BLOCKING.with(|cell| {
+ let was = cell.replace(State::Empty);
+
+ // Ensure that the pool is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<State>, State);
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ self.0.set(self.1);
+ }
+ }
+
+ let _reset = Reset(cell, was);
+ cell.set(State::Ready(pool as *const _));
+ f()
+ })
+}
+
+pub(crate) struct Pool {
+ shared: Mutex<Shared>,
+ condvar: Condvar,
+ new_thread: Box<dyn Fn() -> thread::Builder + Send + Sync + 'static>,
+}
+
+impl fmt::Debug for Pool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Pool").finish()
+ }
+}
+
+struct Shared {
+ queue: VecDeque<Box<dyn FnOnce() + Send>>,
+ num_th: u32,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+}
+
+const MAX_THREADS: u32 = 1_000;
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Result of a blocking operation running on the blocking thread pool.
+#[cfg(feature = "blocking")]
+#[derive(Debug)]
+pub struct Blocking<T> {
+ rx: oneshot::Receiver<T>,
+}
+
+impl Pool {
+ /// Run the provided function on an executor dedicated to blocking operations.
+ pub(crate) fn spawn(this: &Arc<Self>, f: Box<dyn FnOnce() + Send + 'static>) {
+ let should_spawn = {
+ let mut shared = this.shared.lock().unwrap();
+
+ if shared.shutdown {
+ // no need to even push this task; it would never get picked up
+ return;
+ }
+
+ shared.queue.push_back(f);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == MAX_THREADS {
+ // At max number of threads
+ false
+ } else {
+ shared.num_th += 1;
+ true
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ this.condvar.notify_one();
+ false
+ }
+ };
+
+ if should_spawn {
+ Pool::spawn_thread(Arc::clone(this), (this.new_thread)());
+ }
+ }
+
+ // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc
+ fn spawn_thread(this: Arc<Self>, builder: thread::Builder) {
+ builder
+ .spawn(move || {
+ let mut shared = this.shared.lock().unwrap();
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ run_task(task);
+ shared = this.shared.lock().unwrap();
+ if shared.shutdown {
+ break; // Need to increment idle before we exit
+ }
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = this.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ if timeout_result.timed_out() {
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ this.condvar.notify_one();
+ }
+ })
+ .unwrap();
+ }
+
+ /// Shut down all workers in the pool the next time they are idle.
+ ///
+ /// Blocks until all threads have exited.
+ pub(crate) fn shutdown(&self) {
+ let mut shared = self.shared.lock().unwrap();
+ shared.shutdown = true;
+ self.condvar.notify_all();
+
+ while shared.num_th > 0 {
+ shared = self.condvar.wait(shared).unwrap();
+ }
+ }
+}
+
+pub(crate) struct PoolWaiter(Arc<Pool>);
+
+impl From<Pool> for PoolWaiter {
+ fn from(p: Pool) -> Self {
+ Self::from(Arc::new(p))
+ }
+}
+
+impl From<Arc<Pool>> for PoolWaiter {
+ fn from(p: Arc<Pool>) -> Self {
+ Self(p)
+ }
+}
+
+impl Deref for PoolWaiter {
+ type Target = Arc<Pool>;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl Drop for PoolWaiter {
+ fn drop(&mut self) {
+ self.0.shutdown();
+ }
+}
+
+/// Run the provided closure on a thread where blocking is acceptable.
+///
+/// In general, issuing a blocking call or performing a lot of compute in a future without
+/// yielding is not okay, as it may prevent the executor from driving other futures forward.
+/// A closure that is run through this method will instead be run on a dedicated thread pool for
+/// such blocking tasks without holding up the main futures executor.
+///
+/// # Examples
+///
+/// ```
+/// # async fn docs() {
+/// tokio::executor::blocking::run(move || {
+/// // do some compute-heavy work or call synchronous code
+/// }).await;
+/// # }
+/// ```
+#[cfg(feature = "blocking")]
+pub fn run<F, R>(f: F) -> Blocking<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+{
+ let (tx, rx) = oneshot::channel();
+
+ BLOCKING.with(|current_pool| match current_pool.get() {
+ State::Ready(pool) => {
+ let pool = unsafe { &*pool };
+ Pool::spawn(
+ pool,
+ Box::new(move || {
+ // receiver may have gone away
+ let _ = tx.send(f());
+ }),
+ );
+ }
+ State::Empty => panic!("must be called from the context of Tokio runtime"),
+ });
+
+ Blocking { rx }
+}
+
+#[cfg(feature = "blocking")]
+impl<T> Future for Blocking<T> {
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ use std::task::Poll::*;
+
+ match Pin::new(&mut self.rx).poll(cx) {
+ Ready(Ok(v)) => Ready(v),
+ Ready(Err(_)) => panic!(
+ "the blocking operation has been dropped before completing. \
+ This should not happen and is a bug."
+ ),
+ Pending => Pending,
+ }
+ }
+}
+
+fn run_task(f: Box<dyn FnOnce() + Send>) {
+ use std::panic::{catch_unwind, AssertUnwindSafe};
+
+ let _ = catch_unwind(AssertUnwindSafe(|| f()));
+}
+
+impl Default for Pool {
+ fn default() -> Self {
+ Pool {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ }),
+ condvar: Condvar::new(),
+ new_thread: Box::new(|| {
+ thread::Builder::new().name("tokio-blocking-driver".to_string())
+ }),
+ }
+ }
+}
diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs
new file mode 100644
index 00000000..dcc9c51a
--- /dev/null
+++ b/tokio/src/executor/current_thread/mod.rs
@@ -0,0 +1,873 @@
+//! A single-threaded executor which executes tasks on the same thread from which
+//! they are spawned.
+//!
+//! [`CurrentThread`] is the main type of this crate. It executes tasks on the
+//! current thread. The easiest way to start a new [`CurrentThread`] executor
+//! is to call [`block_on_all`] with an initial task to seed the executor. All
+//! tasks that are being managed by a [`CurrentThread`] executor are able to
+//! spawn additional tasks by calling [`spawn`].
+//!
+//! Application authors will not use this crate directly. Instead, they will use
+//! the `tokio` crate. Library authors should only depend on
+//! `tokio-current-thread` if they are building a custom task executor.
+//!
+//! [`CurrentThread`]: struct.CurrentThread.html
+//! [`spawn`]: fn.spawn.html
+//! [`block_on_all`]: fn.block_on_all.html
+
+mod scheduler;
+use self::scheduler::{Scheduler, TickArgs};
+
+use crate::executor::{EnterError, Executor, SpawnError, TypedExecutor};
+#[cfg(feature = "blocking")]
+use crate::executor::blocking::{Pool, PoolWaiter};
+use crate::executor::park::{Park, ParkThread, Unpark};
+
+use std::cell::Cell;
+use std::error::Error;
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::{atomic, Arc};
+use std::task::{Context, Poll, Waker};
+use std::thread;
+use std::time::{Duration, Instant};
+
+/// Executes tasks on the current thread
+pub struct CurrentThread<P: Park = ParkThread> {
+ /// Execute futures and receive unpark notifications.
+ scheduler: Scheduler<P::Unpark>,
+
+ /// Current number of futures being executed.
+ ///
+ /// The LSB is used to indicate that the runtime is preparing to shut down.
+ /// Thus, to get the actual number of pending futures, `>>1`.
+ num_futures: Arc<atomic::AtomicUsize>,
+
+ /// Thread park handle
+ park: P,
+
+ /// Handle for spawning new futures from other threads
+ spawn_handle: Handle,
+
+ /// Receiver for futures spawned from other threads
+ spawn_receiver: crossbeam_channel::Receiver<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
+
+ /// Handle to pool for handling blocking tasks
+ #[cfg(feature = "blocking")]
+ blocking: PoolWaiter,
+
+ /// The thread-local ID assigned to this executor.
+ id: u64,
+}
+
+/// Executes futures on the current thread.
+///
+/// All futures executed using this executor will be executed on the current
+/// thread. As such, `run` will wait for these futures to complete before
+/// returning.
+///
+/// For more details, see the [module level](index.html) documentation.
+#[derive(Debug, Clone)]
+pub struct TaskExecutor {
+ // Prevent the handle from moving across threads.
+ _p: ::std::marker::PhantomData<Rc<()>>,
+}
+
+/// Returned by the `turn` function.
+#[derive(Debug)]
+pub struct Turn {
+ polled: bool,
+}
+
+impl Turn {
+ /// `true` if any futures were polled at all and `false` otherwise.
+ pub fn has_polled(&self) -> bool {
+ self.polled
+ }
+}
+
+/// A `CurrentThread` instance bound to a supplied execution context.
+pub struct Entered<'a, P: Park> {
+ executor: &'a mut CurrentThread<P>,
+}
+
+/// Error returned by the `run` function.
+#[derive(Debug)]
+pub struct RunError {
+ _p: (),
+}
+
+impl fmt::Display for RunError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Run error")
+ }
+}
+
+impl Error for RunError {}
+
+/// Error returned by the `run_timeout` function.
+#[derive(Debug)]
+pub struct RunTimeoutError {
+ timeout: bool,
+}
+
+impl fmt::Display for RunTimeoutError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let descr = if self.timeout {
+ "Run timeout error (timeout)"
+ } else {
+ "Run timeout error (not timeout)"
+ };
+ write!(fmt, "{}", descr)
+ }
+}
+
+impl Error for RunTimeoutError {}
+
+/// Error returned by the `turn` function.
+#[derive(Debug)]
+pub struct TurnError {
+ _p: (),
+}
+
+impl fmt::Display for TurnError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Turn error")
+ }
+}
+
+impl Error for TurnError {}
+
+/// Error returned by the `block_on` function.
+#[derive(Debug)]
+pub struct BlockError<T> {
+ inner: Option<T>,
+}
+
+impl<T> fmt::Display for BlockError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Block error")
+ }
+}
+
+impl<T: fmt::Debug> Error for BlockError<T> {}
+
+/// This is mostly split out to make the borrow checker happy.
+struct Borrow<'a, U> {
+ spawner: BorrowSpawner<'a, U>,
+ #[cfg(feature = "blocking")]
+ blocking: &'a PoolWaiter,
+}
+
+/// As is this.
+struct BorrowSpawner<'a, U> {
+ id: u64,
+ num_futures: &'a atomic::AtomicUsize,
+ scheduler: &'a mut Scheduler<U>,
+}
+
+trait SpawnLocal {
+ fn spawn_local(&mut self, future: Pin<Box<dyn Future<Output = ()>>>, already_counted: bool);
+}
+
+struct CurrentRunner {
+ spawn: Cell<Option<*mut dyn SpawnLocal>>,
+ id: Cell<Option<u64>>,
+}
+
+thread_local! {
+ /// Current thread's task runner. This is set in `TaskRunner::with`
+ static CURRENT: CurrentRunner = CurrentRunner {
+ spawn: Cell::new(None),
+ id: Cell::new(None),
+ }
+}
+
+thread_local! {
+ /// Unique ID to assign to each new executor launched on this thread.
+ ///
+ /// The unique ID is used to determine if the currently running executor matches the one
+ /// referred to by a `Handle` so that direct task dispatch can be used.
+ static EXECUTOR_ID: Cell<u64> = Cell::new(0)
+}
+
+/// Run the executor bootstrapping the execution with the provided future.
+///
+/// This creates a new [`CurrentThread`] executor, spawns the provided future,
+/// and blocks the current thread until the provided future and **all**
+/// subsequently spawned futures complete. In other words:
+///
+/// * If the provided bootstrap future does **not** spawn any additional tasks,
+/// `block_on_all` returns once `future` completes.
+/// * If the provided bootstrap future **does** spawn additional tasks, then
+/// `block_on_all` returns once **all** spawned futures complete.
+///
+/// See [module level][mod] documentation for more details.
+///
+/// [`CurrentThread`]: struct.CurrentThread.html
+/// [mod]: index.html