summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/sync/mod.rs417
-rw-r--r--tokio/src/sync/watch.rs32
2 files changed, 438 insertions, 11 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index beb96920..029d626a 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -1,19 +1,414 @@
#![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
-//! Future-aware synchronization
+//! Synchronization primitives for use in asynchronous contexts.
//!
-//! This module is enabled with the **`sync`** feature flag.
+//! Tokio programs tend to be organized as a set of [tasks] where each task
+//! operates independently and may be executed on separate physical threads. The
+//! synchronization primitives provided in this module permit these independent
+//! tasks to communicate together.
//!
-//! Tasks sometimes need to communicate with each other. This module contains
-//! basic abstractions for doing so:
+//! [tasks]: crate::task
//!
-//! - [oneshot](oneshot/index.html), a way of sending a single value
-//! from one task to another.
-//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
-//! sending values between tasks.
-//! - [`Mutex`](struct.Mutex.html), an asynchronous `Mutex`-like type.
-//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
-//! only stores the **most recently** sent value.
+//! # Message passing
+//!
+//! The most common form of synchronization in a Tokio program is message
+//! passing. Two tasks operate independently and send messages to each other to
+//! synchronize. Doing so has the advantage of avoiding shared state.
+//!
+//! Message passing is implemented using channels. A channel supports sending a
+//! message from one producer task to one or more consumer tasks. There are a
+//! few flavors of channels provided by Tokio. Each channel flavor supports
+//! different message passing patterns. When a channel supports multiple
+//! producers, many separate tasks may **send** messages. When a channel
+//! supports muliple consumers, many different separate tasks may **receive**
+//! messages.
+//!
+//! Tokio provides many different channel flavors as different message passing
+//! patterns are best handled with different implementations.
+//!
+//! ## `oneshot` channel
+//!
+//! The [`oneshot` channel][oneshot] supports sending a **single** value from a
+//! single producer to a single consumer. This channel is usually used to send
+//! the result of a computation to a waiter.
+//!
+//! **Example:** using a `oneshot` channel to receive the result of a
+//! computation.
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//!
+//! async fn some_computation() -> String {
+//! "represents the result of the computation".to_string()
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (tx, rx) = oneshot::channel();
+//!
+//! tokio::spawn(async move {
+//! let res = some_computation().await;
+//! tx.send(res).unwrap();
+//! });
+//!
+//! // Do other work while the computation is happening in the background
+//!
+//! // Wait for the computation result
+//! let res = rx.await.unwrap();
+//! }
+//! ```
+//!
+//! Note, if the task produces the the computation result as its final action
+//! before terminating, the [`JoinHandle`] can be used to receive the
+//! computation result instead of allocating resources for the `oneshot`
+//! channel. Awaiting on [`JoinHandle`] returns `Result`. If the task panics,
+//! the `Joinhandle` yields `Err` with the panic cause.
+//!
+//! **Example:**
+//!
+//! ```
+//! async fn some_computation() -> String {
+//! "the result of the computation".to_string()
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let join_handle = tokio::spawn(async move {
+//! some_computation().await
+//! });
+//!
+//! // Do other work while the computation is happening in the background
+//!
+//! // Wait for the computation result
+//! let res = join_handle.await.unwrap();
+//! }
+//! ```
+//!
+//! [`JoinHandle`]: crate::task::JoinHandle
+//!
+//! ## `mpsc` channel
+//!
+//! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
+//! producers to a single consumer. This channel is often used to send work to a
+//! task or to receive the result of many computations.
+//!
+//! **Example:** using an mpsc to incrementally stream the results of a series
+//! of computations.
+//!
+//! ```
+//! use tokio::sync::mpsc;
+//!
+//! async fn some_computation(input: u32) -> String {
+//! format!("the result of computation {}", input)
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (mut tx, mut rx) = mpsc::channel(100);
+//!
+//! tokio::spawn(async move {
+//! for i in 0..10 {
+//! let res = some_computation(i).await;
+//! tx.send(res).await.unwrap();
+//! }
+//! });
+//!
+//! while let Some(res) = rx.recv().await {
+//! println!("got = {}", res);
+//! }
+//! }
+//! ```
+//!
+//! The argument to `mpsc::channel` is the channel capacity. This is the maximum
+//! number of values that can be stored in the channel pending receipt at any
+//! given time. Properly setting this value is key in implementing robust
+//! programs as the channel capacity plays a critical part in handling back
+//! pressure.
+//!
+//! A common concurrency pattern for resource management is to spawn a task
+//! dedicated to managing that resource and using message passing betwen other
+//! tasks to interact with the resource. The resource may be anything that may
+//! not be concurrently used. Some examples include a socket and program state.
+//! For example, if multiple tasks need to send data over a single socket, spawn
+//! a task to manage the socket and use a channel to synchronize.
+//!
+//! **Example:** sending data from many tasks over a single socket using message
+//! passing.
+//!
+//! ```no_run
+//! use tokio::io::{self, AsyncWriteExt};
+//! use tokio::net::TcpStream;
+//! use tokio::sync::mpsc;
+//!
+//! #[tokio::main]
+//! async fn main() -> io::Result<()> {
+//! let mut socket = TcpStream::connect("www.example.com:1234").await?;
+//! let (tx, mut rx) = mpsc::channel(100);
+//!
+//! for _ in 0..10 {
+//! // Each task needs its own `tx` handle. This is done by cloning the
+//! // original handle.
+//! let mut tx = tx.clone();
+//!
+//! tokio::spawn(async move {
+//! tx.send(&b"data to write"[..]).await.unwrap();
+//! });
+//! }
+//!
+//! // The `rx` half of the channel returns `None` once **all** `tx` clones
+//! // drop. To ensure `None` is returned, drop the handle owned by the
+//! // current task. If this `tx` handle is not dropped, there will always
+//! // be a single outstanding `tx` handle.
+//! drop(tx);
+//!
+//! while let Some(res) = rx.recv().await {
+//! socket.write_all(res).await?;
+//! }
+//!
+//! Ok(())
+//! }
+//! ```
+//!
+//! The [`mpsc`][mpsc] and [`oneshot`][oneshot] channels can be combined to
+//! provide a request / response type synchronization pattern with a shared
+//! resource. A task is spawned to synchronize a resource and waits on commands
+//! received on a [`mpsc`][mpsc] channel. Each command includes a
+//! [`oneshot`][oneshot] `Sender` on which the result of the command is sent.
+//!
+//! **Example:** use a task to synchronize a `u64` counter. Each task sends an
+//! "fetch and increment" command. The counter value **before** the increment is
+//! sent over the provided `oneshot` channel.
+//!
+//! ```
+//! use tokio::sync::{oneshot, mpsc};
+//! use Command::Increment;
+//!
+//! enum Command {
+//! Increment,
+//! // Other commands can be added here
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
+//!
+//! // Spawn a task to manage the counter
+//! tokio::spawn(async move {
+//! let mut counter: u64 = 0;
+//!
+//! while let Some((cmd, response)) = cmd_rx.recv().await {
+//! match cmd {
+//! Increment => {
+//! let prev = counter;
+//! counter += 1;
+//! response.send(prev).unwrap();
+//! }
+//! }
+//! }
+//! });
+//!
+//! let mut join_handles = vec![];
+//!
+//! // Spawn tasks that will send the increment command.
+//! for _ in 0..10 {
+//! let mut cmd_tx = cmd_tx.clone();
+//!
+//! join_handles.push(tokio::spawn(async move {
+//! let (resp_tx, resp_rx) = oneshot::channel();
+//!
+//! cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
+//! let res = resp_rx.await.unwrap();
+//!
+//! println!("previous value = {}", res);
+//! }));
+//! }
+//!
+//! // Wait for all tasks to complete
+//! for join_handle in join_handles.drain(..) {
+//! join_handle.await.unwrap();
+//! }
+//! }
+//! ```
+//!
+//! ## `broadcast` channel
+//!
+//! The [`broadcast` channel[broadcast] supports sending **many** values from
+//! **many** producers to **many** consumers. Each consumer will receive
+//! **each** value. This channel can be used to implement "fan out" style
+//! patterns common with pub / sub or "chat" systems.
+//!
+//! This channel tends to be used less often than `oneshot` and `mpsc` but still
+//! has its use cases.
+//!
+//! Basic usage
+//!
+//! ```
+//! use tokio::sync::broadcast;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (tx, mut rx1) = broadcast::channel(16);
+//! let mut rx2 = tx.subscribe();
+//!
+//! tokio::spawn(async move {
+//! assert_eq!(rx1.recv().await.unwrap(), 10);
+//! assert_eq!(rx1.recv().await.unwrap(), 20);
+//! });
+//!
+//! tokio::spawn(async move {
+//! assert_eq!(rx2.recv().await.unwrap(), 10);
+//! assert_eq!(rx2.recv().await.unwrap(), 20);
+//! });
+//!
+//! tx.send(10).unwrap();
+//! tx.send(20).unwrap();
+//! }
+//! ```
+//!
+//! ## `watch` channel
+//!
+//! The [`watch` channel][watch] supports sending **many** values from a
+//! **single** producer to **many** consumers. However, only the **most recent**
+//! value is stored in the channel. Consumers are notified when a new value is
+//! sent, but there is no guarantee that consumers will see **all** values.
+//!
+//! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
+//!
+//! Use cases for the [`watch` channel] include broadcasting configuration
+//! changes or signalling program state changes, such as transitioning to
+//! shutdown.
+//!
+//! **Example:** use a `watch` channel to notify tasks of configuration changes.
+//! In this example, a configuration file is checked periodically. When the file
+//! changes, the configuration changes are signalled to consumers.
+//!
+//! ```
+//! use tokio::sync::watch;
+//! use tokio::time::{self, Duration, Instant};
+//!
+//! use std::io;
+//!
+//! #[derive(Debug, Clone, Eq, PartialEq)]
+//! struct Config {
+//! timeout: Duration,
+//! }
+//!
+//! impl Config {
+//! async fn load_from_file() -> io::Result<Config> {
+//! // file loading and deserialization logic here
+//! # Ok(Config { timeout: Duration::from_secs(1) })
+//! }
+//! }
+//!
+//! async fn my_async_operation() {
+//! // Do something here
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! // Load initial configuration value
+//! let mut config = Config::load_from_file().await.unwrap();
+//!
+//! // Create the watch channel, initialized with the loaded configuration
+//! let (tx, rx) = watch::channel(config.clone());
+//!
+//! // Spawn a task to monitor the file.
+//! tokio::spawn(async move {
+//! loop {
+//! // Wait 10 seconds between checks
+//! time::delay_for(Duration::from_secs(10)).await;
+//!
+//! // Load the configuration file
+//! let new_config = Config::load_from_file().await.unwrap();
+//!
+//! // If the configuration changed, send the new config value
+//! // on the watch channel.
+//! if new_config != config {
+//! tx.broadcast(new_config.clone()).unwrap();
+//! config = new_config;
+//! }
+//! }
+//! });
+//!
+//! let mut handles = vec![];
+//!
+//! // Spawn tasks that runs the async operation for at most `timeout`. If
+//! // the timeout elapses, restart the operation.
+//! //
+//! // The task simultaneously watches the `Config` for changes. When the
+//! // timeout duration changes, the timeout is updated without restarting
+//! // the in-flight operation.
+//! for _ in 0..5 {
+//! // Clone a config watch handle for use in this task
+//! let mut rx = rx.clone();
+//!
+//! let handle = tokio::spawn(async move {
+//! // Start the initial operation and pin the future to the stack.
+//! // Pinning to the stack is required to resume the operation
+//! // across multiple calls to `select!`
+//! let op = my_async_operation();
+//! tokio::pin!(op);
+//!
+//! // Receive the **initial** configuration value. As this is the
+//! // first time the config is received from the watch, it will
+//! // always complete immediatedly.
+//! let mut conf = rx.recv().await.unwrap();
+//!
+//! let mut op_start = Instant::now();
+//! let mut delay = time::delay_until(op_start + conf.timeout);
+//!
+//! loop {
+//! tokio::select! {
+//! _ = &mut delay => {
+//! // The operation elapsed. Restart it
+//! op.set(my_async_operation());
+//!
+//! // Track the new start time
+//! op_start = Instant::now();
+//!
+//! // Restart the timeout
+//! delay = time::delay_until(op_start + conf.timeout);
+//! }
+//! new_conf = rx.recv() => {
+//! conf = new_conf.unwrap();
+//!
+//! // The configuration has been updated. Update the
+//! // `delay` using the new `timeout` value.
+//! delay.reset(op_start + conf.timeout);
+//! }
+//! _ = &mut op => {
+//! // The operation completed!
+//! return
+//! }
+//! }
+//! }
+//! });
+//!
+//! handles.push(handle);
+//! }
+//!
+//! for handle in handles.drain(..) {
+//! handle.await.unwrap();
+//! }
+//! }
+//! ```
+//!
+//! # State synchronization
+//!
+//! The remainding synchronization primitives focus on synchronizing state.
+//! These are asynchronous equivalents to versions provided by `std`. They
+//! operate in a similar way as their `std` counterparts parts but will wait
+//! asynchronously instead of blocking the thread.
+//!
+//! * [`Barrier`][Barrier] Ensures multiple tasks will wait for each other to
+//! reach a point in the program, before continuing execution all together.
+//!
+//! * [`Mutex`][Mutex] Mutual Exclusion mechanism, which ensures that at most
+//! one thread at a time is able to access some data.
+//!
+//! * [`RwLock`][RwLock] Provides a mutual exclusion mechanism which allows
+//! multiple readers at the same time, while allowing only one writer at a
+//! time. In some cases, this can be more efficient than a mutex.
cfg_sync! {
mod barrier;
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 3e945563..59e3eec0 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -258,6 +258,38 @@ impl<T> Receiver<T> {
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
+ ///
+ /// If this is the first time the function is called on a `Receiver`
+ /// instance, then the function completes immediately with the **current**
+ /// value held by the channel. On the next call, the function waits until
+ /// a new value is sent in the channel.
+ ///
+ /// `None` is returned if the `Sender` half is dropped.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = watch::channel("hello");
+ ///
+ /// let v = rx.recv().await.unwrap();
+ /// assert_eq!(v, "hello");
+ ///
+ /// tokio::spawn(async move {
+ /// tx.broadcast("goodbye").unwrap();
+ /// });
+ ///
+ /// // Waits for the new task to spawn and send the value.
+ /// let v = rx.recv().await.unwrap();
+ /// assert_eq!(v, "goodbye");
+ ///
+ /// let v = rx.recv().await;
+ /// assert!(v.is_none());
+ /// }
+ /// ```
pub async fn recv(&mut self) -> Option<T> {
poll_fn(|cx| {
let v_ref = ready!(self.poll_recv_ref(cx));