Age | Commit message (Collapse) | Author |
|
Currently when `RwLockWriteGuard::downgrade` the `MAX_READS - 1`
permits are added to the semaphore. When `RwLockWriteGuard::drop`
gets invoked however another `MAX_READS` permits are added. This
results in releasing more permits that were actually aquired when
downgrading a write to a read lock. This is why we need to call
`mem::forget` on the `RwLockWriteGuard` in order to avoid
invoking the destructor.
Fixes: #2941
|
|
|
|
* Add const constructors to `RwLock`, `Notify`, and `Semaphore`.
Referring to the types in `tokio::sync`.
Also add `const` to `new` for the remaining atomic integers in `src/loom` and `UnsafeCell`.
Builds upon previous work in #2790
Closes #2756
|
|
|
|
Previous docs look like they were based on the docs for
`insert_at`. Changed names of variables referred to and the
explanation of when the value will be returned and under what
condition it will be immediately available to make sense for a
Duration argument instead of an Instant.
|
|
Generally, this mimics the way `MappedRwLock*Guard`s are implemented in
`parking_lot`. By storing a raw pointer in the guards themselves
referencing the mapped data and maintaining type invariants through
`PhantomData`. I didn't try to think too much about this, so if someone
has objections I'd love to hear them.
I've also dropped the internal use of `ReleasingPermit`, since it made
the guards unecessarily large. The number of permits that need to be
released are already known by the guards themselves, and is instead
governed directly in the relevant `Drop` impls. This has the benefit of
making the guards as small as possible, for the non-mapped variants this
means a single reference is enough.
`fmt::Debug` impls have been adjusted to behave exactly like the
delegating impls in `parking_lot`. `fmt::Display` impls have been added
for all guard types which behave the same. This does change the format
of debug impls, for which I'm not sure if we provide any guarantees.
|
|
|
|
|
|
|
|
Simplifies coop implementation. Prunes unused code, create a `Budget`
type to track the current budget.
|
|
Included changes
- all simple references like `<type>.<name>.html` for these types
- enum
- fn
- struct
- trait
- type
- simple references for methods, like struct.DelayQueue.html#method.poll
Refs: #1473
|
|
Previously, the `Mutex::lock`, `RwLock::{read, write}`, and
`Semaphore::acquire` futures in `tokio::sync` implemented `Send + Sync`
automatically. This was by virtue of being implemented using a `poll_fn`
that only closed over `Send + Sync` types. However, this broke in
PR #2325, which rewrote those types using the new `batch_semaphore`.
Now, they await an `Acquire` future, which contains a `Waiter`, which
internally contains an `UnsafeCell`, and thus does not implement `Sync`.
Since removing previously implemented traits breaks existing code, this
inadvertantly caused a breaking change. There were tests ensuring that
the `Mutex`, `RwLock`, and `Semaphore` types themselves were `Send +
Sync`, but no tests that the _futures they return_ implemented those
traits.
I've fixed this by adding an explicit impl of `Sync` for the
`batch_semaphore::Acquire` future. Since the `Waiter` type held by this
struct is only accessed when borrowed mutably, it is safe for it to
implement `Sync`.
Additionally, I've added to the bounds checks for the effected
`tokio::sync` types to ensure that returned futures continue to
implement `Send + Sync` in the future.
|
|
## Motivation
Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.
Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).
## Solution
@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring separate heap allocation, and cancelled futures may remove
their nodes from the list.
This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically.
The mutex is acquired only when accessing the wait list — if a task
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.
Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.
Fixes #2237
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
Add RwLock::into_inner method that consumes the lock and returns
the wrapped value.
Fixes: #2320
|
|
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.
Consider a future like this one:
```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
while let Some(_) = input.next().await {}
}
```
It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.
This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:
```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
while let Some(_) = input.next().await {
tokio::coop::proceed().await;
}
}
```
The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.
The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.
The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".
At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.
Benchmarks (see tokio-rs/tokio#2160) suggest that the overhead of `coop`
is marginal.
|
|
|
|
Currently, the documentation for `tokio::sync::RwLock` states that it
has an unspecified priority policy dependent on the operating system.
This is incorrect: Tokio's `RwLock` is fairly queued. The incorrect
documentation appears to have been copied from the `std::sync::RwLock`
docs, for which this *is* the case.
This commit corrects the documentation to describe the actual priority
policy.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
|
|
|
|
Provides a `RwLock` based on a semaphore. The semaphore is initialized
with 32 permits. A read acquires a single permit and a write acquires all 32
permits. This ensures that reads (up to 32) may happen concurrently and
writes happen exclusively.
|