diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-01-29 12:22:21 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-29 12:22:21 -0800 |
commit | 116a18b849e3dd9e5fcddc92a299e87ff855936a (patch) | |
tree | 7626f58e4d59aba9f4d96d093ff86ad7fcb17e68 | |
parent | 9d6b99494b72e79b4afba5073a9ebef5bbbeca8a (diff) |
sync: reduce memory size of watch::Receiver (#2191)
This reduces the `mem::size_of::<watch::Receiver>()` from 4 words to 2.
- The `id` is now the pointer of the `Arc<WatchInner>`.
- The `ver` is moved into the `WatchInner`.
-rw-r--r-- | tokio/src/sync/watch.rs | 119 |
1 files changed, 59 insertions, 60 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 59e3eec0..402670f4 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -54,10 +54,10 @@ use crate::future::poll_fn; use crate::sync::task::AtomicWaker; -use fnv::FnvHashMap; +use fnv::FnvHashSet; use std::ops; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; @@ -71,13 +71,7 @@ pub struct Receiver<T> { shared: Arc<Shared<T>>, /// Pointer to the watcher's internal state - inner: Arc<WatchInner>, - - /// Watcher ID. - id: u64, - - /// Last observed version - ver: usize, + inner: Watcher, } /// Sends values to the associated [`Receiver`](struct.Receiver.html). @@ -138,14 +132,16 @@ struct Shared<T> { cancel: AtomicWaker, } -#[derive(Debug)] -struct Watchers { - next_id: u64, - watchers: FnvHashMap<u64, Arc<WatchInner>>, -} +type Watchers = FnvHashSet<Watcher>; + +/// The watcher's ID is based on the Arc's pointer. +#[derive(Clone, Debug)] +struct Watcher(Arc<WatchInner>); #[derive(Debug)] struct WatchInner { + /// Last observed version + version: AtomicUsize, waker: AtomicWaker, } @@ -179,21 +175,20 @@ const CLOSED: usize = 1; /// [`Sender`]: struct.Sender.html /// [`Receiver`]: struct.Receiver.html pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) { - const INIT_ID: u64 = 0; + const VERSION_0: usize = 0; + const VERSION_1: usize = 2; - let inner = Arc::new(WatchInner::new()); + // We don't start knowing VERSION_1 + let inner = Watcher::new_version(VERSION_0); // Insert the watcher - let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default()); - watchers.insert(INIT_ID, inner.clone()); + let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default()); + watchers.insert(inner.clone()); let shared = Arc::new(Shared { value: RwLock::new(init), - version: AtomicUsize::new(2), - watchers: Mutex::new(Watchers { - next_id: INIT_ID + 1, - watchers, - }), + version: AtomicUsize::new(VERSION_1), + watchers: Mutex::new(watchers), cancel: AtomicWaker::new(), }); @@ -201,12 +196,7 @@ pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) { shared: Arc::downgrade(&shared), }; - let rx = Receiver { - shared, - inner, - id: INIT_ID, - ver: 0, - }; + let rx = Receiver { shared, inner }; (tx, rx) } @@ -240,9 +230,8 @@ impl<T> Receiver<T> { let state = self.shared.version.load(SeqCst); let version = state & !CLOSED; - if version != self.ver { + if self.inner.version.swap(version, Relaxed) != version { let inner = self.shared.value.read().unwrap(); - self.ver = version; return Ready(Some(Ref { inner })); } @@ -312,42 +301,19 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> { impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { - let inner = Arc::new(WatchInner::new()); + let ver = self.inner.version.load(Relaxed); + let inner = Watcher::new_version(ver); let shared = self.shared.clone(); - let id = { - let mut watchers = shared.watchers.lock().unwrap(); - let id = watchers.next_id; + shared.watchers.lock().unwrap().insert(inner.clone()); - watchers.next_id += 1; - watchers.watchers.insert(id, inner.clone()); - - id - }; - - let ver = self.ver; - - Receiver { - shared, - inner, - id, - ver, - } + Receiver { shared, inner } } } impl<T> Drop for Receiver<T> { fn drop(&mut self) { - let mut watchers = self.shared.watchers.lock().unwrap(); - watchers.watchers.remove(&self.id); - } -} - -impl WatchInner { - fn new() -> Self { - WatchInner { - waker: AtomicWaker::new(), - } + self.shared.watchers.lock().unwrap().remove(&self.inner); } } @@ -399,7 +365,7 @@ impl<T> Sender<T> { fn notify_all<T>(shared: &Shared<T>) { let watchers = shared.watchers.lock().unwrap(); - for watcher in watchers.watchers.values() { + for watcher in watchers.iter() { // Notify the task watcher.waker.wake(); } @@ -431,3 +397,36 @@ impl<T> Drop for Shared<T> { self.cancel.wake(); } } + +// ===== impl Watcher ===== + +impl Watcher { + fn new_version(version: usize) -> Self { + Watcher(Arc::new(WatchInner { + version: AtomicUsize::new(version), + waker: AtomicWaker::new(), + })) + } +} + +impl std::cmp::PartialEq for Watcher { + fn eq(&self, other: &Watcher) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl std::cmp::Eq for Watcher {} + +impl std::hash::Hash for Watcher { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + (&*self.0 as *const WatchInner).hash(state) + } +} + +impl std::ops::Deref for Watcher { + type Target = WatchInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} |