Age | Commit message (Collapse) | Author |
|
The `Receiver` handle maintains a position in the broadcast channel for
itself. Cloning implies copying the state of the value. Intuitively,
cloning a `broadcast::Receiver` would return a new receiver with an
identical position. However, the current implementation returns a new
`Receiver` positioned at the tail of the channel.
This behavior subtlety is why `new_subscriber()` is used to create
`Receiver` handles. An alternate API should consider the position issue.
Refs: #2933
|
|
|
|
Refs: #2928
|
|
Removes deprecated APIs and makes some small breaking changes.
|
|
As tokio does not rely on poisoning, we can
avoid always unwrapping when locking by handling
the `PoisonError` in the Mutex shim.
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
|
|
Fixes #2781.
|
|
|
|
|
|
The documentation build failed with errors such as
error: `[read]` public documentation for `take` links to a private item
--> tokio/src/io/util/async_read_ext.rs:1078:9
|
1078 | / /// Creates an adaptor which reads at most `limit` bytes from it.
1079 | | ///
1080 | | /// This function returns a new instance of `AsyncRead` which will read
1081 | | /// at most `limit` bytes, after which it will always return EOF
... |
1103 | | /// }
1104 | | /// ```
| |_______________^
|
note: the lint level is defined here
--> tokio/src/lib.rs:13:9
|
13 | #![deny(intra_doc_link_resolution_failure)]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= note: the link appears in this line:
bytes read and future calls to [`read()`][read] may succeed.
|
|
Previously, in the broadcast channel, receiver wakers were passed to the
sender via an atomic stack with allocated nodes. When a message was
sent, the stack was drained. This caused a problem when many receivers
pushed a waiter node then dropped. The waiter node remained indefinitely
in cases where no values were sent.
This patch switches broadcast to use the intrusive linked-list waiter
strategy used by `Notify` and `Semaphore.
|
|
Replace an ad hoc read/write lock with RwLock. Use
The parking_lot RwLock when possible.
|
|
Broadcast uses a ring buffer to store values sent to the channel. In order to
deal with slow receivers, the oldest values are overwritten with new values
once the buffer wraps. A receiver should be able to calculate how many values
it has missed.
Additionally, when the broadcast closes, a final value of `None` is sent to
the channel. If the buffer has wrapped, this value overwrites the oldest
value.
This is an issue mainly in a single capacity broadcast when a value is sent
and then the sender is dropped. The original value is immediately overwritten
with `None` meaning that receivers assume they have lagged behind.
**Solution**
A value of `None` is no longer sent to the channel when the final sender has
been dropped. This solves the single capacity broadcast case by completely
removing the behavior of overwriting values when the channel is closed.
Now, when the final sender is dropped a closed bit is set on the next slot
that the channel is supposed to send to.
In the case of a fast receiver, if it finds a slot where the closed bit is
set, it knows the channel is closed without locking the tail.
In the case of a slow receiver, it must first find out if it has missed any
values. This is similar to before, but must be able to account for channel
closure.
If the channel is not closed, the oldest value may be located at index `n`. If
the channel is closed, the oldest value is located at index `n - 1`.
Knowing the index where the oldest value is located, a receiver can calculate
how many values it may have missed and starts to catch up.
Closes #2425
|
|
This reverts commit 826fc21abfd04779cde92e4cc33de2adb42cf327.
The code was intentional. Holding the lock while notifying is
unnecessary. Also change the code to use `drop` so clippy doesn't
confuse people against their will.
|
|
Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.
Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
|
|
|
|
|
|
Make sure the tail mutex is acquired when `condvar` is notified,
otherwise the wakeup may be lost and the sender could be left waiting.
Use `notify_all()` instead of `notify_one()` to ensure that the correct
sender is woken. Finally, only do any of this when there are no more
readers left.
Additionally, calling `send()` is buggy and may cause a panic when
the slot has another pending send.
|
|
|
|
* tokio: remove documentation stating `Receiver` is clone-able.
The documentation for `broadcast` stated that both `Sender` and
`Receiver` are clonable. This isn't the case: `Receiver`s cannot be
cloned (and shouldn't be cloned).
In addition, mention that `Receiver` is `Sync`, and mention that both
`Receiver` and `Sender` are `Send`.
Fixes: #2032
* Clarify that Sender and Receiver are only Send and Sync if T is Send or Sync.
|
|
|
|
`cargo fmt` has a bug where it does not format modules scoped with
feature flags.
|
|
Adds a broadcast channel implementation. A broadcast channel is a
multi-producer, multi-consumer channel where each consumer receives a
clone of every value sent. This is useful for implementing pub / sub
style patterns.
Implemented as a ring buffer, a Vec of the specified capacity is
allocated on initialization of the channel. Values are pushed into
slots.
When the channel is full, a send overwrites the oldest value. Receivers
detect this and return an error on the next call to receive. This
prevents unbounded buffering and does not make the channel vulnerable to
the slowest consumer.
Closes: #1585
|