summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/executor.rs104
-rw-r--r--tokio/src/executor/blocking/builder.rs58
-rw-r--r--tokio/src/executor/blocking/mod.rs332
-rw-r--r--tokio/src/executor/current_thread/mod.rs873
-rw-r--r--tokio/src/executor/current_thread/scheduler.rs808
-rw-r--r--tokio/src/executor/enter.rs139
-rw-r--r--tokio/src/executor/error.rs49
-rw-r--r--tokio/src/executor/executor.rs181
-rw-r--r--tokio/src/executor/global.rs233
-rw-r--r--tokio/src/executor/loom/mod.rs27
-rw-r--r--tokio/src/executor/loom/std/atomic_u32.rs44
-rw-r--r--tokio/src/executor/loom/std/atomic_usize.rs45
-rw-r--r--tokio/src/executor/loom/std/causal_cell.rs48
-rw-r--r--tokio/src/executor/loom/std/mod.rs77
-rw-r--r--tokio/src/executor/mod.rs84
-rw-r--r--tokio/src/executor/park/mod.rs140
-rw-r--r--tokio/src/executor/park/thread.rs263
-rw-r--r--tokio/src/executor/task/core.rs153
-rw-r--r--tokio/src/executor/task/error.rs48
-rw-r--r--tokio/src/executor/task/harness.rs546
-rw-r--r--tokio/src/executor/task/join.rs74
-rw-r--r--tokio/src/executor/task/list.rs70
-rw-r--r--tokio/src/executor/task/mod.rs130
-rw-r--r--tokio/src/executor/task/raw.rs190
-rw-r--r--tokio/src/executor/task/stack.rs85
-rw-r--r--tokio/src/executor/task/state.rs502
-rw-r--r--tokio/src/executor/task/tests/loom.rs277
-rw-r--r--tokio/src/executor/task/tests/mod.rs5
-rw-r--r--tokio/src/executor/task/tests/task.rs643
-rw-r--r--tokio/src/executor/task/waker.rs107
-rw-r--r--tokio/src/executor/tests/backoff.rs32
-rw-r--r--tokio/src/executor/tests/loom_oneshot.rs49
-rw-r--r--tokio/src/executor/tests/loom_schedule.rs51
-rw-r--r--tokio/src/executor/tests/mock_park.rs66
-rw-r--r--tokio/src/executor/tests/mock_schedule.rs131
-rw-r--r--tokio/src/executor/tests/mod.rs40
-rw-r--r--tokio/src/executor/tests/track_drop.rs57
-rw-r--r--tokio/src/executor/thread_pool/builder.rs259
-rw-r--r--tokio/src/executor/thread_pool/current.rs85
-rw-r--r--tokio/src/executor/thread_pool/idle.rs229
-rw-r--r--tokio/src/executor/thread_pool/join.rs42
-rw-r--r--tokio/src/executor/thread_pool/mod.rs58
-rw-r--r--tokio/src/executor/thread_pool/owned.rs77
-rw-r--r--tokio/src/executor/thread_pool/park.rs182
-rw-r--r--tokio/src/executor/thread_pool/pool.rs111
-rw-r--r--tokio/src/executor/thread_pool/queue/global.rs195
-rw-r--r--tokio/src/executor/thread_pool/queue/inject.rs36
-rw-r--r--tokio/src/executor/thread_pool/queue/local.rs298
-rw-r--r--tokio/src/executor/thread_pool/queue/mod.rs41
-rw-r--r--tokio/src/executor/thread_pool/queue/worker.rs127
-rw-r--r--tokio/src/executor/thread_pool/set.rs209
-rw-r--r--tokio/src/executor/thread_pool/shared.rs104
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs48
-rw-r--r--tokio/src/executor/thread_pool/spawner.rs61
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_pool.rs138
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_queue.rs68
-rw-r--r--tokio/src/executor/thread_pool/tests/mod.rs11
-rw-r--r--tokio/src/executor/thread_pool/tests/queue.rs281
-rw-r--r--tokio/src/executor/thread_pool/tests/worker.rs68
-rw-r--r--tokio/src/executor/thread_pool/worker.rs415
-rw-r--r--tokio/src/executor/typed.rs178
-rw-r--r--tokio/src/executor/util/mod.rs5
-rw-r--r--tokio/src/executor/util/pad.rs52
-rw-r--r--tokio/src/executor/util/rand.rs52
-rw-r--r--tokio/src/fs/mod.rs7
-rw-r--r--tokio/src/lib.rs8
-rw-r--r--tokio/src/net/addr.rs4
-rw-r--r--tokio/src/net/driver/mod.rs4
-rw-r--r--tokio/src/net/driver/reactor/mod.rs2
-rw-r--r--tokio/src/runtime/current_thread/builder.rs3
-rw-r--r--tokio/src/runtime/current_thread/mod.rs4
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs11
-rw-r--r--tokio/src/runtime/mod.rs4
-rw-r--r--tokio/src/runtime/threadpool/builder.rs3
-rw-r--r--tokio/src/runtime/threadpool/mod.rs4
-rw-r--r--tokio/src/runtime/threadpool/spawner.rs3
-rw-r--r--tokio/src/timer/timer/mod.rs3
77 files changed, 10116 insertions, 135 deletions
diff --git a/tokio/src/executor.rs b/tokio/src/executor.rs
deleted file mode 100644
index 710ed2b9..00000000
--- a/tokio/src/executor.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-//! Task execution utilities.
-//!
-//! In the Tokio execution model, futures are lazy. When a future is created, no
-//! work is performed. In order for the work defined by the future to happen,
-//! the future must be submitted to an executor. A future that is submitted to
-//! an executor is called a "task".
-//!
-//! The executor is responsible for ensuring that [`Future::poll`] is
-//! called whenever the task is [notified]. Notification happens when the
-//! internal state of a task transitions from "not ready" to ready. For
-//! example, a socket might have received data and a call to `read` will now be
-//! able to succeed.
-//!
-//! The specific strategy used to manage the tasks is left up to the
-//! executor. There are two main flavors of executors: single-threaded and
-//! multi-threaded. Tokio provides implementation for both of these in the
-//! [`runtime`] module.
-//!
-//! # `Executor` trait.
-//!
-//! This module provides the [`Executor`] trait (re-exported from
-//! [`tokio-executor`]), which describes the API that all executors must
-//! implement.
-//!
-//! A free [`spawn`] function is provided that allows spawning futures onto the
-//! default executor (tracked via a thread-local variable) without referencing a
-//! handle. It is expected that all executors will set a value for the default
-//! executor. This value will often be set to the executor itself, but it is
-//! possible that the default executor might be set to a different executor.
-//!
-//! For example, a single threaded executor might set the default executor to a
-//! thread pool instead of itself, allowing futures to spawn new tasks onto the
-//! thread pool when those tasks are `Send`.
-//!
-//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
-//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify
-//! [`runtime`]: ../runtime/index.html
-//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1
-//! [`Executor`]: trait.Executor.html
-//! [`spawn`]: fn.spawn.html
-
-use std::future::Future;
-pub use tokio_executor::{DefaultExecutor, Executor, SpawnError, TypedExecutor};
-
-/// Return value from the `spawn` function.
-///
-/// Currently this value doesn't actually provide any functionality. However, it
-/// provides a way to add functionality later without breaking backwards
-/// compatibility.
-///
-/// See [`spawn`] for more details.
-///
-/// [`spawn`]: fn.spawn.html
-#[derive(Debug)]
-pub struct Spawn(());
-
-/// Spawns a future on the default executor.
-///
-/// In order for a future to do work, it must be spawned on an executor. The
-/// `spawn` function is the easiest way to do this. It spawns a future on the
-/// [default executor] for the current execution context (tracked using a
-/// thread-local variable).
-///
-/// The default executor is **usually** a thread pool.
-///
-/// # Examples
-///
-/// In this example, a server is started and `spawn` is used to start a new task
-/// that processes each received connection.
-///
-/// ```
-/// use tokio::net::TcpListener;
-///
-/// # async fn process<T>(_t: T) {}
-/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
-/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
-///
-/// loop {
-/// let (socket, _) = listener.accept().await?;
-///
-/// tokio::spawn(async move {
-/// // Process each socket concurrently.
-/// process(socket).await
-/// });
-/// }
-/// # }
-/// ```
-///
-/// [default executor]: struct.DefaultExecutor.html
-///
-/// # Panics
-///
-/// This function will panic if the default executor is not set or if spawning
-/// onto the default executor returns an error. To avoid the panic, use
-/// [`DefaultExecutor`].
-///
-/// [`DefaultExecutor`]: struct.DefaultExecutor.html
-pub fn spawn<F>(f: F) -> Spawn
-where
- F: Future<Output = ()> + 'static + Send,
-{
- ::tokio_executor::spawn(f);
- Spawn(())
-}
diff --git a/tokio/src/executor/blocking/builder.rs b/tokio/src/executor/blocking/builder.rs
new file mode 100644
index 00000000..05ea28c2
--- /dev/null
+++ b/tokio/src/executor/blocking/builder.rs
@@ -0,0 +1,58 @@
+use crate::executor::blocking::Pool;
+use crate::executor::loom::thread;
+
+use std::usize;
+
+/// Builds a blocking thread pool with custom configuration values.
+pub(crate) struct Builder {
+ /// Thread name
+ name: String,
+
+ /// Thread stack size
+ stack_size: Option<usize>,
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Builder {
+ name: "tokio-blocking-thread".to_string(),
+ stack_size: None,
+ }
+ }
+}
+
+impl Builder {
+ /// Set name of threads spawned by the pool
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ self.name = val.into();
+ self
+ }
+
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.stack_size = Some(val);
+ self
+ }
+
+ pub(crate) fn build(self) -> Pool {
+ let mut p = Pool::default();
+ let Builder { stack_size, name } = self;
+ p.new_thread = Box::new(move || {
+ let mut b = thread::Builder::new().name(name.clone());
+ if let Some(stack_size) = stack_size {
+ b = b.stack_size(stack_size);
+ }
+ b
+ });
+ p
+ }
+}
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
new file mode 100644
index 00000000..16faa03e
--- /dev/null
+++ b/tokio/src/executor/blocking/mod.rs
@@ -0,0 +1,332 @@
+//! Thread pool for blocking operations
+
+use crate::executor::loom::sync::{Arc, Condvar, Mutex};
+use crate::executor::loom::thread;
+#[cfg(feature = "blocking")]
+use tokio_sync::oneshot;
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fmt;
+#[cfg(feature = "blocking")]
+use std::future::Future;
+use std::ops::Deref;
+#[cfg(feature = "blocking")]
+use std::pin::Pin;
+#[cfg(feature = "blocking")]
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+#[cfg(feature = "rt-full")]
+mod builder;
+
+#[cfg(feature = "rt-full")]
+pub(crate) use builder::Builder;
+
+#[derive(Clone, Copy)]
+enum State {
+ Empty,
+ Ready(*const Arc<Pool>),
+}
+
+thread_local! {
+ /// Thread-local tracking the current executor
+ static BLOCKING: Cell<State> = Cell::new(State::Empty)
+}
+
+/// Set the blocking pool for the duration of the closure
+///
+/// If a blocking pool is already set, it will be restored when the closure returns or if it
+/// panics.
+#[allow(dead_code)] // we allow dead code since this won't be called if no executors are enabled
+pub(crate) fn with_pool<F, R>(pool: &Arc<Pool>, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // While scary, this is safe. The function takes a `&Pool`, which guarantees
+ // that the reference lives for the duration of `with_pool`.
+ //
+ // Because we are always clearing the TLS value at the end of the
+ // function, we can cast the reference to 'static which thread-local
+ // cells require.
+ BLOCKING.with(|cell| {
+ let was = cell.replace(State::Empty);
+
+ // Ensure that the pool is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<State>, State);
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ self.0.set(self.1);
+ }
+ }
+
+ let _reset = Reset(cell, was);
+ cell.set(State::Ready(pool as *const _));
+ f()
+ })
+}
+
+pub(crate) struct Pool {
+ shared: Mutex<Shared>,
+ condvar: Condvar,
+ new_thread: Box<dyn Fn() -> thread::Builder + Send + Sync + 'static>,
+}
+
+impl fmt::Debug for Pool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Pool").finish()
+ }
+}
+
+struct Shared {
+ queue: VecDeque<Box<dyn FnOnce() + Send>>,
+ num_th: u32,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+}
+
+const MAX_THREADS: u32 = 1_000;
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Result of a blocking operation running on the blocking thread pool.
+#[cfg(feature = "blocking")]
+#[derive(Debug)]
+pub struct Blocking<T> {
+ rx: oneshot::Receiver<T>,
+}
+
+impl Pool {
+ /// Run the provided function on an executor dedicated to blocking operations.
+ pub(crate) fn spawn(this: &Arc<Self>, f: Box<dyn FnOnce() + Send + 'static>) {
+ let should_spawn = {
+ let mut shared = this.shared.lock().unwrap();
+
+ if shared.shutdown {
+ // no need to even push this task; it would never get picked up
+ return;
+ }
+
+ shared.queue.push_back(f);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == MAX_THREADS {
+ // At max number of threads
+ false
+ } else {
+ shared.num_th += 1;
+ true
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ this.condvar.notify_one();
+ false
+ }
+ };
+
+ if should_spawn {
+ Pool::spawn_thread(Arc::clone(this), (this.new_thread)());
+ }
+ }
+
+ // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc
+ fn spawn_thread(this: Arc<Self>, builder: thread::Builder) {
+ builder
+ .spawn(move || {
+ let mut shared = this.shared.lock().unwrap();
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ run_task(task);
+ shared = this.shared.lock().unwrap();
+ if shared.shutdown {
+ break; // Need to increment idle before we exit
+ }
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = this.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ if timeout_result.timed_out() {
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ this.condvar.notify_one();
+ }
+ })
+ .unwrap();
+ }
+
+ /// Shut down all workers in the pool the next time they are idle.
+ ///
+ /// Blocks until all threads have exited.
+ pub(crate) fn shutdown(&self) {
+ let mut shared = self.shared.lock().unwrap();
+ shared.shutdown = true;
+ self.condvar.notify_all();
+
+ while shared.num_th > 0 {
+ shared = self.condvar.wait(shared).unwrap();
+ }
+ }
+}
+
+pub(crate) struct PoolWaiter(Arc<Pool>);
+
+impl From<Pool> for PoolWaiter {
+ fn from(p: Pool) -> Self {
+ Self::from(Arc::new(p))
+ }
+}
+
+impl From<Arc<Pool>> for PoolWaiter {
+ fn from(p: Arc<Pool>) -> Self {
+ Self(p)
+ }
+}
+
+impl Deref for PoolWaiter {
+ type Target = Arc<Pool>;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl Drop for PoolWaiter {
+ fn drop(&mut self) {
+ self.0.shutdown();
+ }
+}
+
+/// Run the provided closure on a thread where blocking is acceptable.
+///
+/// In general, issuing a blocking call or performing a lot of compute in a future without
+/// yielding is not okay, as it may prevent the executor from driving other futures forward.
+/// A closure that is run through this method will instead be run on a dedicated thread pool for
+/// such blocking tasks without holding up the main futures executor.
+///
+/// # Examples
+///
+/// ```
+/// # async fn docs() {
+/// tokio::executor::blocking::run(move || {
+/// // do some compute-heavy work or call synchronous code
+/// }).await;
+/// # }
+/// ```
+#[cfg(feature = "blocking")]
+pub fn run<F, R>(f: F) -> Blocking<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+{
+ let (tx, rx) = oneshot::channel();
+
+ BLOCKING.with(|current_pool| match current_pool.get() {
+ State::Ready(pool) => {
+ let pool = unsafe { &*pool };
+ Pool::spawn(
+ pool,
+ Box::new(move || {
+ // receiver may have gone away
+ let _ = tx.send(f());
+ }),
+ );
+ }
+ State::Empty => panic!("must be called from the context of Tokio runtime"),
+ });
+
+ Blocking { rx }
+}
+
+#[cfg(feature = "blocking")]
+impl<T> Future for Blocking<T> {
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ use std::task::Poll::*;
+
+ match Pin::new(&mut self.rx).poll(cx) {
+ Ready(Ok(v)) => Ready(v),
+ Ready(Err(_)) => panic!(
+ "the blocking operation has been dropped before completing. \
+ This should not happen and is a bug."
+ ),
+ Pending => Pending,
+ }
+ }
+}
+
+fn run_task(f: Box<dyn FnOnce() + Send>) {
+ use std::panic::{catch_unwind, AssertUnwindSafe};
+
+ let _ = catch_unwind(AssertUnwindSafe(|| f()));
+}
+
+impl Default for Pool {
+ fn default() -> Self {
+ Pool {
+ shared: Mutex::new(Shared {