//! A single-producer, multi-consumer channel that only retains the *last* sent //! value. //! //! This channel is useful for watching for changes to a value from multiple //! points in the code base, for example, changes to configuration values. //! //! # Usage //! //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer //! and sender halves of the channel. The channel is created with an initial //! value. `Receiver::poll` will always be ready upon creation and will yield //! either this initial value or the latest value that has been sent by //! `Sender`. //! //! Calls to [`Receiver::poll`] and [`Receiver::poll_ref`] will always yield //! the latest value. //! //! # Examples //! //! ``` //! # extern crate futures; //! extern crate tokio; //! //! use tokio::prelude::*; //! use tokio::sync::watch; //! //! # tokio::run(futures::future::lazy(|| { //! let (mut tx, rx) = watch::channel("hello"); //! //! tokio::spawn(rx.for_each(|value| { //! println!("received = {:?}", value); //! Ok(()) //! }).map_err(|_| ())); //! //! tx.broadcast("world").unwrap(); //! # Ok(()) //! # })); //! ``` //! //! # Closing //! //! [`Sender::poll_close`] allows the producer to detect when all [`Sender`] //! handles have been dropped. This indicates that there is no further interest //! in the values being produced and work can be stopped. //! //! # Thread safety //! //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other //! threads and can be used in a concurrent environment. Clones of [`Receiver`] //! handles may be moved to separate threads and also used concurrently. //! //! [`Sender`]: struct.Sender.html //! [`Receiver`]: struct.Receiver.html //! [`channel`]: fn.channel.html //! [`Sender::poll_close`]: struct.Sender.html#method.poll_close //! [`Receiver::poll`]: struct.Receiver.html#method.poll //! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref use fnv::FnvHashMap; use futures::task::AtomicTask; use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; /// Receives values from the associated `Sender`. /// /// Instances are created by the [`channel`](fn.channel.html) function. #[derive(Debug)] pub struct Receiver { /// Pointer to the shared state shared: Arc>, /// Pointer to the watcher's internal state inner: Arc, /// Watcher ID. id: u64, /// Last observed version ver: usize, } /// Sends values to the associated `Receiver`. /// /// Instances are created by the [`channel`](fn.channel.html) function. #[derive(Debug)] pub struct Sender { shared: Weak>, } /// Returns a reference to the inner value /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long lived borrows could cause the produce half to block. It is recommended /// to keep the borrow as short lived as possible. #[derive(Debug)] pub struct Ref<'a, T: 'a> { inner: RwLockReadGuard<'a, T>, } pub mod error { //! Watch error types use std::fmt; /// Error produced when receiving a value fails. #[derive(Debug)] pub struct RecvError { pub(crate) _p: (), } /// Error produced when sending a value fails. #[derive(Debug)] pub struct SendError { pub(crate) inner: T, } // ===== impl RecvError ===== impl fmt::Display for RecvError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use std::error::Error; write!(fmt, "{}", self.description()) } } impl ::std::error::Error for RecvError { fn description(&self) -> &str { "channel closed" } } // ===== impl SendError ===== impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use std::error::Error; write!(fmt, "{}", self.description()) } } impl ::std::error::Error for SendError { fn description(&self) -> &str { "channel closed" } } } #[derive(Debug)] struct Shared { /// The most recent value value: RwLock, /// The current version /// /// The lowest bit represents a "closed" state. The rest of the bits /// represent the current version. version: AtomicUsize, /// All watchers watchers: Mutex, /// Task to notify when all watchers drop cancel: AtomicTask, } #[derive(Debug)] struct Watchers { next_id: u64, watchers: FnvHashMap>, } #[derive(Debug)] struct WatchInner { task: AtomicTask, } const CLOSED: usize = 1; /// Create a new watch channel, returning the "send" and "receive" handles. /// /// All values sent by `Sender` will become visible to the `Receiver` handles. /// Only the last value sent is made available to the `Receiver` half. All /// intermediate values are dropped. /// /// # Examples /// /// ``` /// # extern crate futures; /// extern crate tokio; /// /// use tokio::prelude::*; /// use tokio::sync::watch; /// /// # tokio::run(futures::future::lazy(|| { /// let (mut tx, rx) = watch::channel("hello"); /// /// tokio::spawn(rx.for_each(|value| { /// println!("received = {:?}", value); /// Ok(()) /// }).map_err(|_| ())); /// /// tx.broadcast("world").unwrap(); /// # Ok(()) /// # })); /// ``` pub fn channel(init: T) -> (Sender, Receiver) { const INIT_ID: u64 = 0; let inner = Arc::new(WatchInner::new()); // Insert the watcher let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default()); watchers.insert(INIT_ID, inner.clone()); let shared = Arc::new(Shared { value: RwLock::new(init), version: AtomicUsize::new(2), watchers: Mutex::new(Watchers { next_id: INIT_ID + 1, watchers, }), cancel: AtomicTask::new(), }); let tx = Sender { shared: Arc::downgrade(&shared), }; let rx = Receiver { shared, inner, id: INIT_ID, ver: 0, }; (tx, rx) } impl Receiver { /// Returns a reference to the most recently sent value /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow /// as short lived as possible. /// /// # Examples /// /// ``` /// # extern crate tokio; /// # use tokio::sync::watch; /// let (_, rx) = watch::channel("hello"); /// assert_eq!(*rx.get_ref(), "hello"); /// ``` pub fn get_ref(&self) -> Ref { let inner = self.shared.value.read().unwrap(); Ref { inner } } /// Attempts to receive the latest value sent via the channel. /// /// If a new, unobserved, value has been sent, a reference to it is /// returned. If no new value has been sent, then `NotReady` is returned and /// the current task is notified once a new value is sent. /// /// Only the **most recent** value is returned. If the receiver is falling /// behind the sender, intermediate values are dropped. pub fn poll_ref(&mut self) -> Poll>, error::RecvError> { // Make sure the task is up to date self.inner.task.register(); let state = self.shared.version.load(SeqCst); let version = state & !CLOSED; if version != self.ver { // Track the latest version self.ver = version; let inner = self.shared.value.read().unwrap(); return Ok(Some(Ref { inner }).into()); } if CLOSED == state & CLOSED { // The `Store` handle has been dropped. return Ok(None.into()); } Ok(Async::NotReady) } } impl Stream for Receiver { type Item = T; type Error = error::RecvError; fn poll(&mut self) -> Poll, error::RecvError> { let item = try_ready!(self.poll_ref()); Ok(Async::Ready(item.map(|v_ref| v_ref.clone()))) } } impl Clone for Receiver { fn clone(&self) -> Self { let inner = Arc::new(WatchInner::new()); let shared = self.shared.clone(); let id = { let mut watchers = shared.watchers.lock().unwrap(); let id = watchers.next_id; watchers.next_id += 1; watchers.watchers.insert(id, inner.clone()); id }; let ver = self.ver; Receiver { shared: shared, inner, id, ver, } } } impl Drop for Receiver { fn drop(&mut self) { let mut watchers = self.shared.watchers.lock().unwrap(); watchers.watchers.remove(&self.id); } } impl WatchInner { fn new() -> Self { WatchInner { task: AtomicTask::new(), } } } impl Sender { /// Broadcast a new value via the channel, notifying all receivers. pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError> { let shared = match self.shared.upgrade() { Some(shared) => shared, // All `Watch` handles have been canceled None => return Err(error::SendError { inner: value }), }; // Replace the value { let mut lock = shared.value.write().unwrap(); *lock = value; } // Update the version. 2 is used so that the CLOSED bit is not set. shared.version.fetch_add(2, SeqCst); // Notify all watchers notify_all(&*shared); // Return the old value Ok(()) } /// Returns `Ready` when all receivers have dropped. /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. pub fn poll_close(&mut self) -> Poll<(), ()> { match self.shared.upgrade() { Some(shared) => { shared.cancel.register(); Ok(Async::NotReady) } None => Ok(Async::Ready(())), } } } impl Sink for Sender { type SinkItem = T; type SinkError = error::SendError; fn start_send(&mut self, item: T) -> StartSend> { let _ = self.broadcast(item)?; Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), error::SendError> { Ok(().into()) } } /// Notify all watchers of a change fn notify_all(shared: &Shared) { let watchers = shared.watchers.lock().unwrap(); for watcher in watchers.watchers.values() { // Notify the task watcher.task.notify(); } } impl Drop for Sender { fn drop(&mut self) { if let Some(shared) = self.shared.upgrade() { shared.version.fetch_or(CLOSED, SeqCst); notify_all(&*shared); } } } // ===== impl Ref ===== impl<'a, T: 'a> ops::Deref for Ref<'a, T> { type Target = T; fn deref(&self) -> &T { self.inner.deref() } } // ===== impl Shared ===== impl Drop for Shared { fn drop(&mut self) { self.cancel.notify(); } }