Age | Commit message (Collapse) | Author |
|
More-or-less a half-rewrite of the current time driver, supporting the
use of intrusive futures for timer registration.
Fixes: #3028, #3069
|
|
## Motivation
Currently, the per-task `tracing` spans generated by tokio's `tracing`
feature flag include the `std::any::type_name` of the future that was
spawned. When future combinators and/or libraries like Tower are in use,
these future names can get _quite_ long. Furthermore, when formatting
the `tracing` spans with their parent spans as context, any other task
spans in the span context where the future was spawned from can _also_
include extremely long future names.
In some cases, this can result in extremely high memory use just to
store the future names. For example, in Linkerd, when we enable
`tokio=trace` to enable the task spans, there's a spawned task whose
future name is _232990 characters long_. A proxy with only 14 spawned
tasks generates a task list that's over 690 KB. Enabling task spans
under load results in the process getting OOM killed very quickly.
## Solution
This branch removes future type names from the spans generated by
`spawn`. As a replacement, to allow identifying which `spawn` call a
span corresponds to, the task span now contains the source code location
where `spawn` was called, when the compiler supports the
`#[track_caller]` attribute. Since `track_caller` was stabilized in Rust
1.46.0, and our minimum supported Rust version is 1.45.0, we can't
assume that `#[track_caller]` is always available. Instead, we have a
RUSTFLAGS cfg, `tokio_track_caller`, that guards whether or not we use
it. I've also added a `build.rs` that detects the compiler minor
version, and sets the cfg flag automatically if the current compiler
version is >= 1.46. This means users shouldn't have to enable
`tokio_track_caller` manually.
Here's the trace output from the `chat` example, before this change:
![Screenshot_20201030_110157](https://user-images.githubusercontent.com/2796466/97741071-6d408800-1a9f-11eb-9ed6-b25e72f58c7b.png)
...and after:
![Screenshot_20201030_110303](https://user-images.githubusercontent.com/2796466/97741112-7e899480-1a9f-11eb-9197-c5a3f9ea1c05.png)
Closes #3073
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
An off-by-one bug results in freeing the incorrect page. This
also adds an `asan` CI job.
Fixes: 3014
|
|
tokio:
merge rt-core and rt-util as rt
rename rt-threaded to rt-multi-thread
tokio-util:
rename rt-core to rt
Closes #2942
|
|
|
|
|
|
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Co-authored-by: Carl Lerche <me@carllerche.com>
|
|
Changes inherent methods to take `&self` instead of `&mut self`. This
brings the API in line with `std`.
This patch is implemented by using a `tokio::sync::Mutex` to guard the
internal `File` state. This is not an ideal implementation strategy
doesn't make a big impact compared to having to dispatch operations to a
background thread followed by a blocking syscall.
In the future, the implementation can be improved as we explore async
file-system APIs provided by the operating-system (iocp / io_uring).
Closes #2927
|
|
As tokio does not rely on poisoning, we can
avoid always unwrapping when locking by handling
the `PoisonError` in the Mutex shim.
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
|
|
Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.
Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.
Fixes: #2637
Refs: #2718 (intrusive waiters)
|
|
|
|
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
|
|
|
|
|
|
Co-authored-by: Mikail Bagishov <bagishov.mikail@yandex.ru>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
Co-authored-by: Alice Ryhl <alice@ryhl.io>
|
|
|
|
* sync: move CancellationToken to tokio-util
The `CancellationToken` utility is only available with the
`tokio_unstable` flag. This was done as the API is not final, but it
adds friction for users.
This patch moves `CancellationToken` to tokio-util where it is generally
available. The tokio-util crate does not have any constraints on
breaking change releases.
* fix clippy
* clippy again
|
|
The I/O driver uses a slab to store per-resource state. Doing this
provides two benefits. First, allocating state is streamlined. Second,
resources may be safely indexed using a `usize` type. The `usize` is
used passed to the OS's selector when registering for receiving events.
The original slab implementation used a `Vec` backed by `RwLock`. This
primarily caused contention when reading state. This implementation also
only **grew** the slab capacity but never shrank. In #1625, the slab was
rewritten to use a lock-free strategy. The lock contention was removed
but this implementation was still grow-only.
This change adds the ability to release memory. Similar to the previous
implementation, it structures the slab to use a vector of pages. This
enables growing the slab without having to move any previous entries. It
also adds the ability to release pages. This is done by introducing a
lock when allocating/releasing slab entries. This does not impact
benchmarks, primarily due to the existing implementation not being
"done" and also having a lock around allocating and releasing.
A `Slab::compact()` function is added. Pages are iterated. When a page
is found with no slots in use, the page is freed. The `compact()`
function is called occasionally by the I/O driver.
Fixes #2505
|
|
## Motivation
When debugging asynchronous systems, it can be very valuable to inspect
what tasks are currently active (see #2510). The [`tracing` crate] and
related libraries provide an interface for Rust libraries and
applications to emit and consume structured, contextual, and async-aware
diagnostic information. Because this diagnostic information is
structured and machine-readable, it is a better fit for the
task-tracking use case than textual logging — `tracing` spans can be
consumed to generate metrics ranging from a simple counter of active
tasks to histograms of poll durations, idle durations, and total task
lifetimes. This information is potentially valuable to both Tokio users
*and* to maintainers.
Additionally, `tracing` is maintained by the Tokio project and is
becoming widely adopted by other libraries in the "Tokio stack", such as
[`hyper`], [`h2`], and [`tonic`] and in [other] [parts] of the broader Rust
ecosystem. Therefore, it is suitable for use in Tokio itself.
[`tracing` crate]: https://github.com/tokio-rs/tracing
[`hyper`]: https://github.com/hyperium/hyper/pull/2204
[`h2`]: https://github.com/hyperium/h2/pull/475
[`tonic`]: https://github.com/hyperium/tonic/blob/570c606397e47406ec148fe1763586e87a8f5298/tonic/Cargo.toml#L48
[other]: https://github.com/rust-lang/chalk/pull/525
[parts]: https://github.com/rust-lang/compiler-team/issues/331
## Solution
This PR is an MVP for instrumenting Tokio with `tracing` spans. When the
"tracing" optional dependency is enabled, every spawned future will be
instrumented with a `tracing` span.
The generated spans are at the `TRACE` verbosity level, and have the
target "tokio::task", which may be used by consumers to filter whether
they should be recorded. They include fields for the type name of the
spawned future and for what kind of task the span corresponds to (a
standard `spawn`ed task, a local task spawned by `spawn_local`, or a
`blocking` task spawned by `spawn_blocking`). Because `tracing` has
separate concepts of "opening/closing" and "entering/exiting" a span, we
enter these spans every time the spawned task is polled. This allows
collecting data such as:
- the total lifetime of the task from `spawn` to `drop`
- the number of times the task was polled before it completed
- the duration of each individual time that the span was polled (and
therefore, aggregated metrics like histograms or averages of poll
durations)
- the total time a span was actively being polled, and the total time
it was alive but **not** being polled
- the time between when the task was `spawn`ed and the first poll
As an example, here is the output of a version of the `chat` example
instrumented with `tracing`:
![image](https://user-images.githubusercontent.com/2796466/87231927-e50f6900-c36f-11ea-8a90-6da9b93b9601.png)
And, with multiple connections actually sending messages:
![trace_example_1](https://user-images.githubusercontent.com/2796466/87231876-8d70fd80-c36f-11ea-91f1-0ad1a5b3112f.png)
I haven't added any `tracing` spans in the example, only converted the
existing `println!`s to `tracing::info` and `tracing::error` for
consistency. The span durations in the above output are generated by
`tracing-subscriber`. Of course, a Tokio-specific subscriber could
generate even more detailed statistics, but that's follow-up work once
basic tracing support has been added.
Note that the `Instrumented` type from `tracing-futures`, which attaches
a `tracing` span to a future, was reimplemented inside of Tokio to avoid
a dependency on that crate. `tracing-futures` has a feature flag that
enables an optional dependency on Tokio, and I believe that if another
crate in a dependency graph enables that feature while Tokio's `tracing`
support is also enabled, it would create a circular dependency that
Cargo wouldn't be able to handle. Also, it avoids a dependency for a
very small amount of code that is unlikely to ever change.
There is, of course, room for plenty of future work here. This might
include:
- instrumenting other parts of `tokio`, such as I/O resources and
channels (possibly via waker instrumentation)
- instrumenting the threadpool so that the state of worker threads
can be inspected
- writing `tracing-subscriber` `Layer`s to collect and display
Tokio-specific data from these traces
- using `track_caller` (when it's stable) to record _where_ a task
was `spawn`ed from
However, this is intended as an MVP to get us started on that path.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
As a first step towards structured concurrency, this change adds a
CancellationToken for graceful cancellation of tasks.
The task can be awaited by an arbitrary amount of tasks due to the usage
of an intrusive list.
The token can be cloned. In addition to this child tokens can be derived.
When the parent token gets cancelled, all child tokens will also get
cancelled.
|
|
## Motivation
When cancelling futures which are waiting to acquire semaphore permits,
there is a possible dangling pointer if notified futures are dropped
after the notified wakers have been split into a separate list. Because
these futures' wait queue nodes are no longer in the main list guarded
by the lock, their `Drop` impls will complete immediately, and they may
be dropped while still in the list of tasks to notify.
## Solution
This branch fixes this by popping from the wait list inside the lock.
The wakers of popped nodes are temporarily stored in a stack array,
so that they can be notified after the lock is released. Since the
size of the stack array is fixed, we may in some cases have to loop
multiple times, acquiring and releasing the lock, until all permits
have been released. This may also have the possible side advantage of
preventing a thread releasing a very large number of permits from
starving other threads that need to enqueue waiters.
I've also added a loom test that can reliably reproduce a segfault
on master, but passes on this branch (after a lot of iterations).
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.
Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
|
|
## Motivation
Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.
Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).
## Solution
@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring separate heap allocation, and cancelled futures may remove
their nodes from the list.
This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically.
The mutex is acquired only when accessing the wait list — if a task
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.
Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.
Fixes #2237
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
Since the original shell runtime was implemented, utilities have been
added to encapsulate `unsafe`. The shell runtime is now able to use
those utilities and not include its own `unsafe` code.
|
|
A refactor of the scheduler internals focusing on simplifying and
reducing unsafety. There are no fundamental logic changes.
* The state transitions of the core task component are refined and
reduced.
* `basic_scheduler` has most unsafety removed.
* `local_set` has most unsafety removed.
* `threaded_scheduler` limits most unsafety to its queue implementation.
|
|
Allow storing the intrusive linked-list pointers in an arbitrary
location in the node. This is in preparation for using the linked list
in the scheduler.
In order to make using the intrusive linked list more flexible, a trait
is introduced to abstract mapping an entry to raw pointers and the next
/ prev pointers. This also pushes more unsafety onto the user.
|
|
`Notify` provides a synchronization primitive similar to thread park /
unpark, except for tasks.
|
|
See discussion in #2222. This wake/notify call has been there in one
form or another since the very early days of tokio. Currently though, it
is not clear that it is needed; the contract for polling is that you
must keep polling until you get `Pending`, so doing a wakeup when we are
about to return `Ready` is premature.
|
|
`StreamMap` is similar to `StreamExt::merge` in that it combines source
streams into a single merged stream that yields values in the order that
they arrive from the source streams. However, `StreamMap` has a lot more
flexibility in usage patterns.
`StreamMap` can:
- Merge an arbitrary number of streams.
- Track which source stream the value was received from.
- Handle inserting and removing streams from the set of managed streams
at any point during iteration.
All source streams held by `StreamMap` are indexed using a key. This key
is included with the value when a source stream yields a value. The key
is also used to remove the stream from the `StreamMap` before the stream
has completed streaming.
Because the `StreamMap` API moves streams during runtime, both streams
and keys must be `Unpin`. In order to insert a `!Unpin` stream into a
`StreamMap`, use `pin!` to pin the stream to the stack or `Box::pin` to
pin the stream in the heap.
|
|
Provides a `select!` macro for concurrently waiting on multiple async
expressions. The macro has similar goals and syntax as the one provided
by the `futures` crate, but differs significantly in implementation.
First, this implementation does not require special traits to be
implemented on futures or streams (i.e., no `FuseFuture`). A design goal
is to be able to pass a "plain" async fn result into the select! macro.
Even without `FuseFuture`, this `select!` implementation is able to
handle all cases the `futures::select!` macro can handle. It does this
by supporting pre-poll conditions on branches and result pattern
matching. For pre-conditions, each branch is able to include a condition
that disables the branch if it evaluates to false. This allows the user
to guard futures that have already been polled, preventing double
polling. Pattern matching can be used to disable streams that complete.
A second big difference is the macro is implemented almost entirely as a
declarative macro. The biggest advantage to using this strategy is that
the user will not need to alter the rustc recursion limit except in the
most extreme cases.
The resulting future also tends to be smaller in many cases.
|
|
`cargo fmt` has a bug where it does not format modules scoped with
feature flags.
|
|
|
|
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
|
|
The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is
moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`.
Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to
`tokio::util::slab`, allowing it to eventually be used by other types.
|
|
Now, all types are under `runtime`. `executor::util` is moved to a top
level `util` module.
|
|
|
|
|
|
This also brings back the timer tests in the tokio crate.
|
|
|
|
- Move `tokio` into its own directory.
- Remove `path` dependencies.
- Run tests with once with crates.io dep and once with patched dep.
|