summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
AgeCommit message (Collapse)Author
2020-11-16sync: add `Notify::notify_waiters` (#3098)Zahari Dichev
This PR makes `Notify::notify_waiters` public. The method already exists, but it changes the way `notify_waiters`, is used. Previously in order for the consumer to register interest, in a notification triggered by `notify_waiters`, the `Notified` future had to be polled. This introduced friction when using the api as the future had to be pinned before polled. This change introduces a counter that tracks how many times `notified_waiters` has been called. Upon creation of the future the number of times is loaded. When first polled the future compares this number with the count state of the `Notify` type. This avoids the need for registering the waiter upfront. Fixes: #3066
2020-10-23sync: add mem::forget to RwLockWriteGuard::downgrade. (#2957)Zahari Dichev
Currently when `RwLockWriteGuard::downgrade` the `MAX_READS - 1` permits are added to the semaphore. When `RwLockWriteGuard::drop` gets invoked however another `MAX_READS` permits are added. This results in releasing more permits that were actually aquired when downgrading a write to a read lock. This is why we need to call `mem::forget` on the `RwLockWriteGuard` in order to avoid invoking the destructor. Fixes: #2941
2020-10-21sync: revert Clone impl for broadcast::Receiver (#3020)Carl Lerche
The `Receiver` handle maintains a position in the broadcast channel for itself. Cloning implies copying the state of the value. Intuitively, cloning a `broadcast::Receiver` would return a new receiver with an identical position. However, the current implementation returns a new `Receiver` positioned at the tail of the channel. This behavior subtlety is why `new_subscriber()` is used to create `Receiver` handles. An alternate API should consider the position issue. Refs: #2933
2020-10-19sync: implement Clone for broadcast::Receiver (#2933)Zephyr Shannon
2020-10-12sync: change chan `closed(&mut self)` to `closed(&self)` (#2939)Zahari Dichev
2020-10-09sync: move broadcast error types into broadcast::error module (#2937)Taiki Endo
Refs: #2928
2020-10-01chore: make #[doc(hidden)] apis private (#2901)Alice Ryhl
2020-09-25sync: add `mpsc::Sender::closed` future (#2840)Zahari Dichev
Adding closed future, makes it possible to select over closed and some other work, so that the task is woken when the channel is closed and can proactively cancel itself. Added a mpsc::Sender::closed future that will become ready when the receiver is closed.
2020-09-24sync: support mpsc send with `&self` (#2861)Carl Lerche
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
2020-09-22sync: Remove readiness assertion in `watch::Receiver::changed() (#2839)Zahari Dichev
*In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario.
2020-09-11sync: tweak `watch` API (#2814)Carl Lerche
Decouples getting the latest `watch` value from receiving the change notification. The `Receiver` async method becomes `Receiver::changed()`. The latest value is obtained from `Receiver::borrow()`. The implementation is updated to use `Notify`. This requires adding `Notify::notify_waiters`. This method is generally useful but is kept private for now.
2020-09-07sync: rename `Notify::notify()` -> `notify_one()` (#2822)Juan Alvarez
Closes: #2813
2020-08-23sync: move CancellationToken to tokio-util (#2721)Carl Lerche
* sync: move CancellationToken to tokio-util The `CancellationToken` utility is only available with the `tokio_unstable` flag. This was done as the API is not final, but it adds friction for users. This patch moves `CancellationToken` to tokio-util where it is generally available. The tokio-util crate does not have any constraints on breaking change releases. * fix clippy * clippy again
2020-05-31test: fix all clippy lints in tests (#2573)Mikail Bagishov
2020-05-03sync: move CancellationToken tests (#2477)Carl Lerche
In preparation of work on `CancellationToken` internals, the tests are moved into `tests/` and are updated to not depend on internals.
2020-05-02sync: add CancellationToken (#2263)Matthias Einwag
As a first step towards structured concurrency, this change adds a CancellationToken for graceful cancellation of tasks. The task can be awaited by an arbitrary amount of tasks due to the usage of an intrusive list. The token can be cloned. In addition to this child tokens can be derived. When the parent token gets cancelled, all child tokens will also get cancelled.
2020-03-27sync: fix possible dangling pointer in semaphore (#2340)Eliza Weisman
## Motivation When cancelling futures which are waiting to acquire semaphore permits, there is a possible dangling pointer if notified futures are dropped after the notified wakers have been split into a separate list. Because these futures' wait queue nodes are no longer in the main list guarded by the lock, their `Drop` impls will complete immediately, and they may be dropped while still in the list of tasks to notify. ## Solution This branch fixes this by popping from the wait list inside the lock. The wakers of popped nodes are temporarily stored in a stack array, so that they can be notified after the lock is released. Since the size of the stack array is fixed, we may in some cases have to loop multiple times, acquiring and releasing the lock, until all permits have been released. This may also have the possible side advantage of preventing a thread releasing a very large number of permits from starving other threads that need to enqueue waiters. I've also added a loom test that can reliably reproduce a segfault on master, but passes on this branch (after a lot of iterations). Signed-off-by: Eliza Weisman <eliza@buoyant.io>
2020-03-23sync: new internal semaphore based on intrusive lists (#2325)Eliza Weisman
## Motivation Many of Tokio's synchronization primitives (`RwLock`, `Mutex`, `Semaphore`, and the bounded MPSC channel) are based on the internal semaphore implementation, called `semaphore_ll`. This semaphore type provides a lower-level internal API for the semaphore implementation than the public `Semaphore` type, and supports "batch" operations, where waiters may acquire more than one permit at a time, and batches of permits may be released back to the semaphore. Currently, `semaphore_ll` uses an atomic singly-linked list for the waiter queue. The linked list implementation is specific to the semaphore. This implementation therefore requires a heap allocation for every waiter in the queue. These allocations are owned by the semaphore, rather than by the task awaiting permits from the semaphore. Critically, they are only _deallocated_ when permits are released back to the semaphore, at which point it dequeues as many waiters from the front of the queue as can be satisfied with the released permits. If a task attempts to acquire permits from the semaphore and is cancelled (such as by timing out), their waiter nodes remain in the list until they are dequeued while releasing permits. In cases where large numbers of tasks are cancelled while waiting for permits, this results in extremely high memory use for the semaphore (see #2237). ## Solution @Matthias247 has proposed that Tokio adopt the approach used in his `futures-intrusive` crate: using an _intrusive_ linked list to store the wakers of tasks waiting on a synchronization primitive. In an intrusive list, each list node is stored as part of the entry that node represents, rather than in a heap allocation that owns the entry. Because futures must be pinned in order to be polled, the necessary invariant of such a list --- that entries may not move while in the list --- may be upheld by making the waiter node `!Unpin`. In this approach, the waiter node can be stored inline in the future, rather than requiring separate heap allocation, and cancelled futures may remove their nodes from the list. This branch adds a new semaphore implementation that uses the intrusive list added to Tokio in #2210. The implementation is essentially a hybrid of the old `semaphore_ll` and the semaphore used in `futures-intrusive`: while a `Mutex` around the wait list is necessary, since the intrusive list is not thread-safe, the permit state is stored outside of the mutex and updated atomically. The mutex is acquired only when accessing the wait list — if a task can acquire sufficient permits without waiting, it does not need to acquire the lock. When releasing permits, we iterate over the wait list from the end of the queue until we run out of permits to release, and split off all the nodes that received enough permits to wake up into a separate list. Then, we can drain the new list and notify those wakers *after* releasing the lock. Because the split operation only modifies the pointers on the head node of the split-off list and the new tail node of the old list, it is O(1) and does not require an allocation to return a variable length number of waiters to notify. Because of the intrusive list invariants, the API provided by the new `batch_semaphore` is somewhat different than that of `semaphore_ll`. In particular, the `Permit` type has been removed. This type was primarily intended allow the reuse of a wait list node allocated on the heap. Since the intrusive list means we can avoid heap-allocating waiters, this is no longer necessary. Instead, acquiring permits is done by polling an `Acquire` future returned by the `Semaphore` type. The use of a future here ensures that the waiter node is always pinned while waiting to acquire permits, and that a reference to the semaphore is available to remove the waiter if the future is cancelled. Unfortunately, the current implementation of the bounded MPSC requires a `poll_acquire` operation, and has methods that call it while outside of a pinned context. Therefore, I've left the old `semaphore_ll` implementation in place to be used by the bounded MPSC, and updated the `Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation. Hopefully, a subsequent change can update the bounded MPSC to use the new semaphore as well. Fixes #2237 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
2020-02-26sync: adds Notify for basic task notification (#2210)Carl Lerche
`Notify` provides a synchronization primitive similar to thread park / unpark, except for tasks.
2020-02-14Test some more mpsc behavior with loom (#2246)Jon Gjengset
2020-01-22sync: fix broadcast bugs (#2135)kalcutter
Make sure the tail mutex is acquired when `condvar` is notified, otherwise the wakeup may be lost and the sender could be left waiting. Use `notify_all()` instead of `notify_one()` to ensure that the correct sender is woken. Finally, only do any of this when there are no more readers left. Additionally, calling `send()` is buggy and may cause a panic when the slot has another pending send.
2020-01-03sync: add RwLock (#1699)João Oliveira
Provides a `RwLock` based on a semaphore. The semaphore is initialized with 32 permits. A read acquires a single permit and a write acquires all 32 permits. This ensures that reads (up to 32) may happen concurrently and writes happen exclusively.
2020-01-03sync: add batch op support to internal semaphore (#2004)Carl Lerche
Extend internal semaphore to support batch operations. With this PR, consumers of the semaphore are able to atomically request more than one permit. This is useful for implementing a RwLock.
2019-12-21chore: fix formatting, remove old rustfmt.toml (#2007)Artem Vorotnikov
`cargo fmt` has a bug where it does not format modules scoped with feature flags.
2019-12-18sync: add broadcast channel (#1943)Carl Lerche
Adds a broadcast channel implementation. A broadcast channel is a multi-producer, multi-consumer channel where each consumer receives a clone of every value sent. This is useful for implementing pub / sub style patterns. Implemented as a ring buffer, a Vec of the specified capacity is allocated on initialization of the channel. Values are pushed into slots. When the channel is full, a send overwrites the oldest value. Receivers detect this and return an error on the next call to receive. This prevents unbounded buffering and does not make the channel vulnerable to the slowest consumer. Closes: #1585
2019-12-17sync: add Semaphore (#1973)Michael P. Jung
Provide an asynchronous Semaphore implementation. This is useful for synchronizing concurrent access to a shared resource.
2019-11-18chore: refine feature flags (#1785)Carl Lerche
Removes dependencies between Tokio feature flags. For example, `process` should not depend on `sync` simply because it uses the `mpsc` channel. Instead, feature flags represent **public** APIs that become available with the feature enabled. When the feature is not enabled, the functionality is removed. If another Tokio component requires the functionality, it is stays as `pub(crate)`. The threaded scheduler is now exposed under `rt-threaded`. This feature flag only enables the threaded scheduler and does not include I/O, networking, or time. Those features must be explictly enabled. A `full` feature flag is added that enables all features. `stdin`, `stdout`, `stderr` are exposed under `io-std`. Macros are used to scope code by feature flag.
2019-11-16make AtomicWaker private (#1782)Carl Lerche
2019-11-15Limit `futures` dependency to `Stream` via feature flag (#1774)Carl Lerche
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
2019-10-29sync: move into `tokio` crate (#1705)Carl Lerche
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.