diff options
Diffstat (limited to 'openpgp/src/wot/priority_queue.rs')
-rw-r--r-- | openpgp/src/wot/priority_queue.rs | 504 |
1 files changed, 504 insertions, 0 deletions
diff --git a/openpgp/src/wot/priority_queue.rs b/openpgp/src/wot/priority_queue.rs new file mode 100644 index 00000000..0cacc22b --- /dev/null +++ b/openpgp/src/wot/priority_queue.rs @@ -0,0 +1,504 @@ +use std::clone::Clone; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::collections::HashSet; +use std::fmt::Debug; +use std::hash::Hash; +use std::iter::FromIterator; + +const TRACE: bool = false; + +/// In order to use a BinaryHeap, the *values* need to be Ord. This +/// means when comparing two `Elements`, we only compare the values, +/// not the keys. +/// +/// We also want the keys to be ord so we can efficiently dedup it. +#[derive(Debug)] +struct Element<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + key: K, + value: V, +} + +impl<K, V> Ord for Element<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + fn cmp(&self, other: &Self) -> Ordering { + self.value.cmp(&other.value) + .then(self.key.cmp(&other.key).reverse()) + } +} + +impl<K, V> PartialOrd for Element<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl<K, V> PartialEq for Element<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + fn eq(&self, other: &Self) -> bool { + self.cmp(&other) == Ordering::Equal + } +} + +impl<K, V> Eq for Element<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ +} + +/// A dedupping max-priority queue. +/// +/// This data structure implements a priority queue for key-value +/// pairs. When an element is popped from the priority queue, the +/// element with the largest value is popped. (If there are multiple +/// such values, one is returned.) +/// +/// When inserting a key into this priority queue, if there is already +/// an element with the same key, then the larger value is kept. +pub(super) struct PriorityQueue<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + // When pending is larger than this, then convert pending into a + // BH. + threshold: usize, + + // The elements in the queue, sorted by value. + bh: BinaryHeap<Element<K, V>>, + + // Elements that we haven't added to the binary heap yet. + pending: Vec<Element<K, V>>, + + // Keys that are present in bh or pending. If None, then we have + // a duplicate. + have_keys: Option<HashSet<K>>, + + // We're tidy if: + // + // - We only have elements in bh. + // - We only have elements in pending, and they are sorted and + // deduped. + is_tidy: bool, +} + +const THRESHOLD: usize = 16; + +impl<K, V> PriorityQueue<K, V> + where K: Ord + Hash + Clone + Debug, + V: Ord + Debug, +{ + pub fn new() -> Self { + Self::with_threshold(THRESHOLD) + } + + pub fn with_threshold(threshold: usize) -> Self { + Self { + threshold, + + bh: BinaryHeap::new(), + pending: Vec::with_capacity(threshold), + + have_keys: Some(HashSet::new()), + + is_tidy: true, + } + } + + fn tidy(&mut self) { + tracer!(TRACE, "PriorityQueue::tidy", 0); + + if self.is_tidy { + assert!(self.bh.is_empty() + || self.pending.len() == 0); + assert!(self.have_keys.is_some()); + &self.pending[..].windows(2).for_each(|v| { + assert!(v[0] <= v[1]); + }); + return; + } + + t!("pre: bh: {} elements; pending:\n{}", + self.bh.iter().count(), + self.pending.iter().enumerate().map(|(i, e)| { + format!(" {}. {:?}: {:?}", i, e.key, e.value) + }) + .collect::<Vec<_>>() + .join("\n")); + + // If there are no duplicates, it is safe to just merge + // pending into bh. + if self.have_keys.is_some() + && (! self.bh.is_empty() + || self.pending.len() > self.threshold) + { + t!(" No duplicates, merging pending into bh"); + self.bh.extend(self.pending.drain(..)); + self.is_tidy = true; + return; + } + + if self.have_keys.is_none() { + t!(" Have duplicates (moving bh to pending)."); + if ! self.bh.is_empty() { + let bh = std::mem::replace(&mut self.bh, BinaryHeap::new()); + self.pending.append(&mut bh.into_sorted_vec()); + } + } + + // We need to dedup by key, not value. But pending needs + // to be sorted by value. Since pending is probably + // nearly sorted, we sort the keys in a separate vector. + + // Assume that the values are nearly sorted. + let mut keys: Vec<(&K, usize)> + = self.pending.iter().enumerate().map(|(i, e)| { + (&e.key, i) + }) + .collect(); + // Sort by the keys. + keys.sort_by_key(|a| a.0); + + // Now dedup pending. For a given key, we want to keep + // the maximum value. + keys.dedup_by(|a, b| { + if a.0 == b.0 { + // a will be remove. Store the larger value in b. + if self.pending[a.1].value > self.pending[b.1].value { + b.1 = a.1 + } + true + } else { + false + } + }); + + if keys.len() != self.pending.len() { + // We deduped something. + let mut retain = vec![ false; self.pending.len() ]; + for (_, i) in keys.into_iter() { + retain[i] = true; + } + + let mut i = 0; + self.pending.retain(|_| (retain[i], i += 1).0); + } + + self.pending.sort_by(|a, b| { + a.value.cmp(&b.value) + // Make it deterministic by also considering keys. We + // want the minimal key to be returned first. Since + // we return the maximum, negate the comparison. + .then(a.key.cmp(&b.key).reverse()) + }); + + self.have_keys + = Some(HashSet::from_iter( + self.pending.iter().map(|e| e.key.clone()))); + self.is_tidy = true; + + t!("pending (post):\n{}", + self.pending.iter().enumerate().map(|(i, e)| { + format!(" {}. {:?}: {:?}", i, e.key, e.value) + }) + .collect::<Vec<_>>() + .join("\n")); + } + + pub fn push(&mut self, key: K, value: V) { + tracer!(TRACE, "PriorityQueue::push", 0); + t!("<{:?}, {:?}>", key, value); + + if ! self.bh.is_empty() || self.pending.len() > 0 { + // Have already have at least one element. We're adding + // another. It's probably out of order; it could be a + // duplicate. + self.is_tidy = false; + + // Note that this key is in the queue. + if let Some(ref mut have_keys) = self.have_keys { + if have_keys.replace(key.clone()).is_some() { + // DUP! + t!("DUP!"); + self.have_keys = None; + } + } + } else { + // bh and pending are empty. + assert!(self.have_keys.as_ref().expect("some").is_empty()); + self.is_tidy = true; + self.have_keys + = Some(HashSet::from_iter(std::iter::once(key.clone()))); + } + + self.pending.push(Element { key, value }); + } + + pub fn pop(&mut self) -> Option<(K, V)> { + tracer!(TRACE, "PriorityQueue::pop", 0); + + self.tidy(); + if let Some((key, value)) = self.pending.pop() + .or_else(|| self.bh.pop()) + .map(|e| (e.key, e.value)) + { + t!(" => <{:?}, {:?}>", key, value); + if let Some(ref mut have_keys) = self.have_keys { + let was_present = have_keys.remove(&key); + assert!(was_present); + } else if self.bh.is_empty() + && self.pending.len() == 1 + { + // We don't have any elements left; we clearly don't + // have any duplicates. + self.have_keys = Some(HashSet::new()); + } + + Some((key, value)) + } else { + t!(" => None"); + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const THRESHOLDS: &[usize] = &[ 1, 4, 16, 32 ]; + + #[test] + fn simple() { + for &t in THRESHOLDS.iter() { + let mut pq: PriorityQueue<isize, isize> + = PriorityQueue::with_threshold(t); + + pq.push(0, 0); + pq.push(1, 1); + pq.push(2, 2); + pq.push(3, 3); + pq.push(4, 4); + pq.push(5, 5); + + assert_eq!(pq.pop(), Some((5, 5))); + assert_eq!(pq.pop(), Some((4, 4))); + assert_eq!(pq.pop(), Some((3, 3))); + assert_eq!(pq.pop(), Some((2, 2))); + assert_eq!(pq.pop(), Some((1, 1))); + assert_eq!(pq.pop(), Some((0, 0))); + assert_eq!(pq.pop(), None); + assert_eq!(pq.pop(), None); + + let mut pq: PriorityQueue<isize, isize> + = PriorityQueue::with_threshold(t); + + pq.push(0, 0); + pq.push(1, -1); + pq.push(2, -2); + pq.push(3, -3); + pq.push(4, -4); + pq.push(5, -5); + + assert_eq!(pq.pop(), Some((0, 0))); + assert_eq!(pq.pop(), Some((1, -1))); + assert_eq!(pq.pop(), Some((2, -2))); + assert_eq!(pq.pop(), Some((3, -3))); + assert_eq!(pq.pop(), Some((4, -4))); + assert_eq!(pq.pop(), Some((5, -5))); + assert_eq!(pq.pop(), None); + assert_eq!(pq.pop(), None); + + let mut pq: PriorityQueue<isize, isize> + = PriorityQueue::with_threshold(t); + + pq.push(0, 0); + pq.push(1, 1); + pq.push(5, 5); + pq.push(2, 2); + pq.push(4, 4); + pq.push(3, 3); + + assert_eq!(pq.pop(), Some((5, 5))); + assert_eq!(pq.pop(), Some((4, 4))); + assert_eq!(pq.pop(), Some((3, 3))); + assert_eq!(pq.pop(), Some((2, 2))); + assert_eq!(pq.pop(), Some((1, 1))); + assert_eq!(pq.pop(), Some((0, 0))); + assert_eq!(pq.pop(), None); + assert_eq!(pq.pop(), None); + + let mut pq: PriorityQueue<isize, isize> + = PriorityQueue::with_threshold(t); + assert_eq!(pq.pop(), None); + + pq.push(0, 0); + pq.push(0, 0); + assert_eq!(pq.pop(), Some((0, 0))); + assert_eq!(pq.pop(), None); + + let mut pq: PriorityQueue<isize, isize> + = PriorityQueue::with_threshold(t); + assert_eq!(pq.pop(), None); + + pq.push(0, 0); + pq.push(0, 0); + assert_eq!(pq.pop(), Some((0, 0))); + pq.push(0, 0); + assert_eq!(pq.pop(), Some((0, 0))); + assert_eq!(pq.pop(), None); + } + } + + #[test] + fn duplicates() { + let mut pq: PriorityQueue<isize, isize> = PriorityQueue::new(); + + // Push different keys with the same value. + for i in 0..20 { + pq.push(i, 0); + } + // Push the same keys with their own value. This should + // overwrite the old keys. + for i in 0..20 { + pq.push(i, i); + } + + // Push different keys with the same value. + for i in 0..20 { + pq.push(i, 0); + } + + for i in (0..20).rev() { + assert_eq!(pq.pop(), Some((i, i))); + } + assert_eq!(pq.pop(), None); + assert_eq!(pq.pop(), None); + } + + #[test] + fn push_pop() { + let mut pq: PriorityQueue<isize, isize> = PriorityQueue::new(); + + // Push different keys with the same value. + for i in 0..10 { + pq.push(i, 0); + } + // Push the same keys with their own value. This should + // overwrite the old keys. + for i in (0..10).rev() { + pq.push(i, i); + assert_eq!(pq.pop(), Some((i, i))); + } + assert_eq!(pq.pop(), None); + assert_eq!(pq.pop(), None); + } + + // Use a u8 so we have a change of a few duplicates. + fn pq(e: Vec<(u8, u8)>, threshold: usize) -> bool { + tracer!(TRACE, "pq", 0); + t!("\n\nGot {} elements; threshold: {}", e.len(), threshold); + t!("elements: {:?}", e); + + let mut expected = e.clone(); + + // Sort by keys. + expected.sort_by(|a, b| { + a.0.cmp(&b.0) + }); + // Dedup keys. + expected.dedup_by(|a, b| { + if a.0 == b.0 { + if a.1 > b.1 { + b.1 = a.1; + } + + true + } else { + false + } + }); + // Sort by value (largest first) then by key. + expected.sort_by(|a, b| { + a.1.cmp(&b.1).reverse() + .then(a.0.cmp(&b.0)) + }); + + let mut pq: PriorityQueue<u8, u8> + = PriorityQueue::with_threshold(threshold); + + // Add everything to the priority queue. Every third + // push, do a pop. Then add those again in the next + // round. + let mut popped = e.clone(); + for _i in 0..5 { + let topush = popped; + popped = Vec::new(); + for (j, (k, v)) in topush.iter().enumerate() { + pq.push(*k, *v); + + if j % 3 == 1 || j % 7 == 1 { + // Pop one. + let (k, v) = pq.pop().unwrap(); + assert!(e.contains(&(k, v))); + popped.push((k, v)); + } + } + } + for (k, v) in popped.into_iter() { + pq.push(k, v); + } + + + let mut got = Vec::new(); + while let Some((k, v)) = pq.pop() { + got.push((k, v)); + } + + t!(" e: {:?}", e); + t!("expected: {:?}", expected); + t!(" got: {:?}", got); + + if got == expected { + true + } else { + t!("BAD"); + false + } + } + + quickcheck! { + fn pq1(e: Vec<(u8, u8)>) -> bool { + pq(e, 1) + } + } + quickcheck! { + fn pq4(e: Vec<(u8, u8)>) -> bool { + pq(e, 4) + } + } + quickcheck! { + fn pq16(e: Vec<(u8, u8)>) -> bool { + pq(e, 16) + } + } + quickcheck! { + fn pq64(e: Vec<(u8, u8)>) -> bool { + pq(e, 64) + } + } + + #[test] + fn extra() { + pq([(75, 0), (75, 0)].to_vec(), 1); + } +} |