summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-01-29 12:22:21 -0800
committerGitHub <noreply@github.com>2020-01-29 12:22:21 -0800
commit116a18b849e3dd9e5fcddc92a299e87ff855936a (patch)
tree7626f58e4d59aba9f4d96d093ff86ad7fcb17e68
parent9d6b99494b72e79b4afba5073a9ebef5bbbeca8a (diff)
sync: reduce memory size of watch::Receiver (#2191)
This reduces the `mem::size_of::<watch::Receiver>()` from 4 words to 2. - The `id` is now the pointer of the `Arc<WatchInner>`. - The `ver` is moved into the `WatchInner`.
-rw-r--r--tokio/src/sync/watch.rs119
1 files changed, 59 insertions, 60 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 59e3eec0..402670f4 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -54,10 +54,10 @@
use crate::future::poll_fn;
use crate::sync::task::AtomicWaker;
-use fnv::FnvHashMap;
+use fnv::FnvHashSet;
use std::ops;
use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
@@ -71,13 +71,7 @@ pub struct Receiver<T> {
shared: Arc<Shared<T>>,
/// Pointer to the watcher's internal state
- inner: Arc<WatchInner>,
-
- /// Watcher ID.
- id: u64,
-
- /// Last observed version
- ver: usize,
+ inner: Watcher,
}
/// Sends values to the associated [`Receiver`](struct.Receiver.html).
@@ -138,14 +132,16 @@ struct Shared<T> {
cancel: AtomicWaker,
}
-#[derive(Debug)]
-struct Watchers {
- next_id: u64,
- watchers: FnvHashMap<u64, Arc<WatchInner>>,
-}
+type Watchers = FnvHashSet<Watcher>;
+
+/// The watcher's ID is based on the Arc's pointer.
+#[derive(Clone, Debug)]
+struct Watcher(Arc<WatchInner>);
#[derive(Debug)]
struct WatchInner {
+ /// Last observed version
+ version: AtomicUsize,
waker: AtomicWaker,
}
@@ -179,21 +175,20 @@ const CLOSED: usize = 1;
/// [`Sender`]: struct.Sender.html
/// [`Receiver`]: struct.Receiver.html
pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
- const INIT_ID: u64 = 0;
+ const VERSION_0: usize = 0;
+ const VERSION_1: usize = 2;
- let inner = Arc::new(WatchInner::new());
+ // We don't start knowing VERSION_1
+ let inner = Watcher::new_version(VERSION_0);
// Insert the watcher
- let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
- watchers.insert(INIT_ID, inner.clone());
+ let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default());
+ watchers.insert(inner.clone());
let shared = Arc::new(Shared {
value: RwLock::new(init),
- version: AtomicUsize::new(2),
- watchers: Mutex::new(Watchers {
- next_id: INIT_ID + 1,
- watchers,
- }),
+ version: AtomicUsize::new(VERSION_1),
+ watchers: Mutex::new(watchers),
cancel: AtomicWaker::new(),
});
@@ -201,12 +196,7 @@ pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
shared: Arc::downgrade(&shared),
};
- let rx = Receiver {
- shared,
- inner,
- id: INIT_ID,
- ver: 0,
- };
+ let rx = Receiver { shared, inner };
(tx, rx)
}
@@ -240,9 +230,8 @@ impl<T> Receiver<T> {
let state = self.shared.version.load(SeqCst);
let version = state & !CLOSED;
- if version != self.ver {
+ if self.inner.version.swap(version, Relaxed) != version {
let inner = self.shared.value.read().unwrap();
- self.ver = version;
return Ready(Some(Ref { inner }));
}
@@ -312,42 +301,19 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> {
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
- let inner = Arc::new(WatchInner::new());
+ let ver = self.inner.version.load(Relaxed);
+ let inner = Watcher::new_version(ver);
let shared = self.shared.clone();
- let id = {
- let mut watchers = shared.watchers.lock().unwrap();
- let id = watchers.next_id;
+ shared.watchers.lock().unwrap().insert(inner.clone());
- watchers.next_id += 1;
- watchers.watchers.insert(id, inner.clone());
-
- id
- };
-
- let ver = self.ver;
-
- Receiver {
- shared,
- inner,
- id,
- ver,
- }
+ Receiver { shared, inner }
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
- let mut watchers = self.shared.watchers.lock().unwrap();
- watchers.watchers.remove(&self.id);
- }
-}
-
-impl WatchInner {
- fn new() -> Self {
- WatchInner {
- waker: AtomicWaker::new(),
- }
+ self.shared.watchers.lock().unwrap().remove(&self.inner);
}
}
@@ -399,7 +365,7 @@ impl<T> Sender<T> {
fn notify_all<T>(shared: &Shared<T>) {
let watchers = shared.watchers.lock().unwrap();
- for watcher in watchers.watchers.values() {
+ for watcher in watchers.iter() {
// Notify the task
watcher.waker.wake();
}
@@ -431,3 +397,36 @@ impl<T> Drop for Shared<T> {
self.cancel.wake();
}
}
+
+// ===== impl Watcher =====
+
+impl Watcher {
+ fn new_version(version: usize) -> Self {
+ Watcher(Arc::new(WatchInner {
+ version: AtomicUsize::new(version),
+ waker: AtomicWaker::new(),
+ }))
+ }
+}
+
+impl std::cmp::PartialEq for Watcher {
+ fn eq(&self, other: &Watcher) -> bool {
+ Arc::ptr_eq(&self.0, &other.0)
+ }
+}
+
+impl std::cmp::Eq for Watcher {}
+
+impl std::hash::Hash for Watcher {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ (&*self.0 as *const WatchInner).hash(state)
+ }
+}
+
+impl std::ops::Deref for Watcher {
+ type Target = WatchInner;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}