summaryrefslogtreecommitdiffstats
path: root/tokio/src/task
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2020-01-17 14:42:52 -0500
committerGitHub <noreply@github.com>2020-01-17 14:42:52 -0500
commit619d730d610bbfd0a13285178d6bf3d89a29d6a3 (patch)
tree07d9f9e6b1e58f694b0090722e9351690c3dd88e /tokio/src/task
parent476bf0084a7abb86ed2338095da4c7297453f00c (diff)
task: Introduce a new pattern for task-local storage (#2126)
This PR introduces a new pattern for task-local storage. It allows for storage and retrieval of data in an asynchronous context. It does so using a new pattern based on past experience. A quick example: ```rust tokio::task_local! { static FOO: u32; } FOO.scope(1, async move { some_async_fn().await; assert_eq!(FOO.get(), 1); }).await; ``` ## Background of task-local storage The goal for task-local storage is to be able to provide some ambiant context in an asynchronous context. One primary use case is for distributed tracing style systems where a request identifier is made available during the context of a request / response exchange. In a synchronous context, thread-local storage would be used for this. However, with asynchronous Rust, logic is run in a "task", which is decoupled from an underlying thread. A task may run on many threads and many tasks may be multiplexed on a single thread. This hints at the need for task-local storage. ### Early attempt Futures 0.1 included a [task-local storage][01] strategy. This was based around using the "runtime task" (more on this later) as the scope. When a task was spawned with `tokio::spawn`, a task-local map would be created and assigned with that task. Any task-local value that was stored would be stored in this map. Whenever the runtime polled the task, it would set the task context enabling access to find the value. There are two main problems with this strategy which ultimetly lead to the removal of runtime task-local storage: 1) In asynchronous Rust, a "task" is not a clear-cut thing. 2) The implementation did not leverage the significant optimizations that the compiler provides for thread-local storage. ### What is a "task"? With synchronous Rust, a "thread" is a clear concept: the construct you get with `thread::spawn`. With asynchronous Rust, there is no strict definition of a "task". A task is most commonly the construct you get when calling `tokio::spawn`. The construct obtained with `tokio::spawn` will be referred to as the "runtime task". However, it is also possible to multiplex asynchronous logic within the context of a runtime task. APIs such as [`task::LocalSet`][local-set] , [`FuturesUnordered`][futures-unordered], [`select!`][select], and [`join!`][join] provide the ability to embed a mini scheduler within a single runtime task. Revisiting the primary use case, setting a request identifier for the duration of a request response exchange, here is a scenario in which using the "runtime task" as the scope for task-local storage would fail: ```rust task_local!(static REQUEST_ID: Cell<u64> = Cell::new(0)); let request1 = get_request().await; let request2 = get_request().await; let (response1, response2) = join!{ async { REQUEST_ID.with(|cell| cell.set(request1.identifier())); process(request1) }, async { REQUEST_ID.with(|cell| cell.set(request2.identifier())); process(request2) }, }; ``` `join!` multiplexes the execution of both branches on the same runtime task. Given this, if `REQUEST_ID` is scoped by the runtime task, the request ID would leak across the request / response exchange processing. This is not a theoretical problem, but was hit repeatedly in practice. For example, Hyper's HTTP/2.0 implementation multiplexes many request / response exchanges on the same runtime task. ### Compiler thread-local optimizations A second smaller problem with the original task-local storage strategy is that it required re-implementing "thread-local storage" like constructs but without being able to get the compiler to help optimize. A discussion of how the compiler optimizes thread-local storage is out of scope for this PR description, but suffice to say a task-local storage implementation should be able to leverage thread-locals as much as possible. ## A new task-local strategy Introduced in this PR is a new strategy for dealing with task-local storage. Instead of using the runtime task as the thread-local scope, the proposed task-local API allows the user to define any arbitrary scope. This solves the problem of binding task-locals to the runtime task: ```rust tokio::task_local!(static FOO: u32); FOO.scope(1, async move { some_async_fn().await; assert_eq!(FOO.get(), 1); }).await; ``` The `scope` function establishes a task-local scope for the `FOO` variable. It takes a value to initialize `FOO` with and an async block. The `FOO` task-local is then available for the duration of the provided block. `scope` returns a new future that must then be awaited on. `tokio::task_local` will define a new thread-local. The future returned from `scope` will set this thread-local at the start of `poll` and unset it at the end of `poll`. `FOO.get` is a simple thread-local access with no special logic. This strategy solves both problems. Task-locals can be scoped at any level and can leverage thread-local compiler optimizations. Going back to the previous example: ```rust task_local! { static REQUEST_ID: u64; } let request1 = get_request().await; let request2 = get_request().await; let (response1, response2) = join!{ async { let identifier = request1.identifier(); REQUEST_ID.scope(identifier, async { process(request1).await }).await }, async { let identifier = request2.identifier(); REQUEST_ID.scope(identifier, async { process(request2).await }).await }, }; ``` There is no longer a problem with request identifiers leaking. ## Disadvantages The primary disadvantage of this strategy is that the "set and forget" pattern with thread-locals is not possible. ```rust thread_local! { static FOO: Cell<usize> = Cell::new(0); } thread::spawn(|| { FOO.with(|cell| cell.set(123)); do_work(); }); ``` In this example, `FOO` is set at the start of the thread and automatically cleared when the thread terminates. While this is nice in some cases, it only really logically makes sense because the scope of a "thread" is clear (the thread). A similar pattern can be done with the proposed stratgy but would require an explicit setting of the scope at the root of `tokio::spawn`. Additionally, one should only do this if the runtime task is the appropriate scope for the specific task-local variable. Another disadvantage is that this new method does not support lazy initialization but requires an explicit `LocalKey::scope` call to set the task-local value. In this case since task-local's are different from thread-locals it is fine. [01]: https://docs.rs/futures/0.1.29/futures/task/struct.LocalKey.html [local-set]: # [futures-unordered]: https://docs.rs/futures/0.3.1/futures/stream/struct.FuturesUnordered.html [select]: https://docs.rs/futures/0.3.1/futures/macro.select.html [join]: https://docs.rs/futures/0.3.1/futures/macro.join.html
Diffstat (limited to 'tokio/src/task')
-rw-r--r--tokio/src/task/mod.rs3
-rw-r--r--tokio/src/task/task_local.rs240
2 files changed, 243 insertions, 0 deletions
diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs
index f762a561..efeb5f0e 100644
--- a/tokio/src/task/mod.rs
+++ b/tokio/src/task/mod.rs
@@ -257,6 +257,9 @@ cfg_rt_core! {
cfg_rt_util! {
mod local;
pub use local::{spawn_local, LocalSet};
+
+ mod task_local;
+ pub use task_local::LocalKey;
}
cfg_rt_core! {
diff --git a/tokio/src/task/task_local.rs b/tokio/src/task/task_local.rs
new file mode 100644
index 00000000..6423175b
--- /dev/null
+++ b/tokio/src/task/task_local.rs
@@ -0,0 +1,240 @@
+use pin_project_lite::pin_project;
+use std::cell::RefCell;
+use std::error::Error;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{fmt, thread};
+
+/// Declare a new task local storage key of type [`tokio::task::LocalKey`].
+///
+/// # Syntax
+///
+/// The macro wraps any number of static declarations and makes them task locals.
+/// Publicity and attributes for each static are allowed.
+///
+/// # Examples
+///
+/// ```
+/// # use tokio::task_local;
+/// task_local! {
+/// pub static FOO: u32;
+///
+/// #[allow(unused)]
+/// static BAR: f32;
+/// }
+/// # fn main() {}
+/// ```
+///
+/// See [LocalKey documentation][`tokio::task::LocalKey`] for more
+/// information.
+///
+/// [`tokio::task::LocalKey`]: ../tokio/task/struct.LocalKey.html
+#[macro_export]
+macro_rules! task_local {
+ // empty (base case for the recursion)
+ () => {};
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ $crate::task_local!($($rest)*);
+ };
+
+ ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => {
+ $crate::__task_local_inner!($(#[$attr])* $vis $name, $t);
+ }
+}
+
+#[doc(hidden)]
+#[macro_export]
+macro_rules! __task_local_inner {
+ ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
+ static $name: $crate::task::LocalKey<$t> = {
+ std::thread_local! {
+ static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
+ }
+
+ $crate::task::LocalKey { inner: __KEY }
+ };
+ };
+}
+
+/// A key for task-local data.
+///
+/// This type is generated by `task_local!` macro and unlike `thread_local!` it has
+/// no concept of lazily initialization. Instead, it is designed to provide task local
+/// storage the future that is passed to `set`.
+///
+/// # Initialization and Destruction
+///
+/// Initialization is done via `set` which is an `async fn` that wraps another
+/// [`std::future::Future`] and will set the value on each `Future::poll` call.
+/// Once the `set` future is dropped the corresponding task local value is also
+/// dropped.
+///
+/// # Examples
+///
+/// ```
+/// # async fn dox() {
+/// tokio::task_local! {
+/// static FOO: u32;
+/// }
+///
+/// FOO.scope(1, async move {
+/// assert_eq!(FOO.get(), 1);
+/// }).await;
+///
+/// FOO.scope(2, async move {
+/// assert_eq!(FOO.get(), 2);
+///
+/// FOO.scope(3, async move {
+/// assert_eq!(FOO.get(), 3);
+/// }).await;
+/// }).await;
+/// # }
+/// ```
+pub struct LocalKey<T: 'static> {
+ #[doc(hidden)]
+ pub inner: thread::LocalKey<RefCell<Option<T>>>,
+}
+
+impl<T: 'static> LocalKey<T> {
+ /// Sets a value `T` as the task local value for the future `F`.
+ ///
+ /// This will run the provided future to completion and set the
+ /// provided value as the task local under this key. Once the returned
+ /// future is dropped so will the value passed be dropped.
+ ///
+ /// # async fn dox() {
+ /// tokio::task_local! {
+ /// static FOO: u32;
+ /// }
+ ///
+ /// FOO.scope(1, async move {
+ /// println!("task local value: {}", FOO.get());
+ /// }).await;
+ /// # }
+ pub async fn scope<F>(&'static self, value: T, f: F) -> F::Output
+ where
+ F: Future,
+ {
+ TaskLocalFuture {
+ local: &self,
+ slot: Some(value),
+ future: f,
+ }
+ .await
+ }
+
+ /// Access this task-local key, running the provided closure with a reference
+ /// passed to the value.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if not called within a future that has not been
+ /// set via `LocalKey::set`.
+ pub fn with<F, R>(&'static self, f: F) -> R
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.try_with(f).expect(
+ "cannot access a Task Local Storage value \
+ without setting it via `LocalKey::set`",
+ )
+ }
+
+ /// Access this task-local key, running the provided closure with a reference
+ /// passed to the value. Unlike `with` this function will return a `Result<R, AccessError>`
+ /// instead of panicking.
+ pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
+ where
+ F: FnOnce(&T) -> R,
+ {
+ self.inner.with(|v| {
+ if let Some(val) = v.borrow().as_ref() {
+ Ok(f(val))
+ } else {
+ Err(AccessError { _private: () })
+ }
+ })
+ }
+}
+
+impl<T: Copy + 'static> LocalKey<T> {
+ /// Get a copy of the task-local value if it implements
+ /// the `Copy` trait.
+ pub fn get(&'static self) -> T {
+ self.with(|v| *v)
+ }
+}
+
+impl<T: 'static> fmt::Debug for LocalKey<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("LocalKey { .. }")
+ }
+}
+
+pin_project! {
+ struct TaskLocalFuture<T: StaticLifetime, F> {
+ local: &'static LocalKey<T>,
+ slot: Option<T>,
+ #[pin]
+ future: F,
+ }
+}
+
+impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ struct Guard<'a, T: 'static> {
+ local: &'static LocalKey<T>,
+ slot: &'a mut Option<T>,
+ prev: Option<T>,
+ }
+
+ impl<T> Drop for Guard<'_, T> {
+ fn drop(&mut self) {
+ let value = self.local.inner.with(|c| c.replace(self.prev.take()));
+ *self.slot = value;
+ }
+ }
+
+ let mut project = self.project();
+ let val = project.slot.take();
+
+ let prev = project.local.inner.with(|c| c.replace(val));
+
+ let _guard = Guard {
+ prev,
+ slot: &mut project.slot,
+ local: *project.local,
+ };
+
+ project.future.poll(cx)
+ }
+}
+
+// Required to make `pin_project` happy.
+trait StaticLifetime: 'static {}
+impl<T: 'static> StaticLifetime for T {}
+
+/// An error returned by [`LocalKey::try_with`](struct.LocalKey.html#method.try_with).
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct AccessError {
+ _private: (),
+}
+
+impl fmt::Debug for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AccessError").finish()
+ }
+}
+
+impl fmt::Display for AccessError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt("task-local value not set", f)
+ }
+}
+
+impl Error for AccessError {}