diff options
author | Paul Masurel <paul.masurel@gmail.com> | 2017-05-23 09:25:45 +0900 |
---|---|---|
committer | Paul Masurel <paul.masurel@gmail.com> | 2017-05-25 18:17:37 +0900 |
commit | 7a6e62976bd2e571e303bb3880f230b16d7e38cd (patch) | |
tree | 0bab064815546b69adb0f71aa13cb8473c629b35 /src/termdict/merger.rs | |
parent | 2712930bd6876b2631da4795610099b156e483ec (diff) |
Added stream dictionary code, merge unit test
Diffstat (limited to 'src/termdict/merger.rs')
-rw-r--r-- | src/termdict/merger.rs | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs new file mode 100644 index 0000000..617233b --- /dev/null +++ b/src/termdict/merger.rs @@ -0,0 +1,157 @@ +use std::collections::BinaryHeap; +use core::SegmentReader; +use super::TermStreamer; +use common::BinarySerializable; +use postings::TermInfo; +use std::cmp::Ordering; +use fst::Streamer; + +pub struct HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + pub streamer: TermStreamer<'a, V>, + pub segment_ord: usize, +} + +impl<'a, V> PartialEq for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn eq(&self, other: &Self) -> bool { + self.segment_ord == other.segment_ord + } +} + +impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable {} + +impl<'a, V> PartialOrd for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn partial_cmp(&self, other: &HeapItem<'a, V>) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl<'a, V> Ord for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn cmp(&self, other: &HeapItem<'a, V>) -> Ordering { + (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) + } +} + +/// Given a list of sorted term streams, +/// returns an iterator over sorted unique terms. +/// +/// The item yield is actually a pair with +/// - the term +/// - a slice with the ordinal of the segments containing +/// the terms. +pub struct TermMerger<'a, V> + where V: 'a + BinarySerializable +{ + heap: BinaryHeap<HeapItem<'a, V>>, + current_streamers: Vec<HeapItem<'a, V>>, +} + +impl<'a, V> TermMerger<'a, V> + where V: 'a + BinarySerializable +{ + fn new(streams: Vec<TermStreamer<'a, V>>) -> TermMerger<'a, V> { + TermMerger { + heap: BinaryHeap::new(), + current_streamers: streams + .into_iter() + .enumerate() + .map(|(ord, streamer)| { + HeapItem { + streamer: streamer, + segment_ord: ord, + } + }) + .collect(), + } + } + + fn advance_segments(&mut self) { + let streamers = &mut self.current_streamers; + let heap = &mut self.heap; + for mut heap_item in streamers.drain(..) { + if heap_item.streamer.advance() { + heap.push(heap_item); + } + } + } + + + /// Advance the term iterator to the next term. + /// Returns true if there is indeed another term + /// False if there is none. + #[allow(while_let_loop)] + pub fn advance(&mut self) -> bool { + self.advance_segments(); + if let Some(head) = self.heap.pop() { + self.current_streamers.push(head); + loop { + if let Some(next_streamer) = self.heap.peek() { + if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { + break; + } + } else { + break; + } // no more streamer. + let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand + self.current_streamers.push(next_heap_it); + } + true + } else { + false + } + } + + /// Returns the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn key(&self) -> &[u8] { + self.current_streamers[0].streamer.key() + } + + /// Returns the sorted list of segment ordinals + /// that include the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn current_kvs(&self) -> &[HeapItem<'a, V>] { + &self.current_streamers[..] + } +} + + + +impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> + where TermInfo: BinarySerializable +{ + fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> { + TermMerger::new(segment_readers + .iter() + .map(|reader| reader.terms().stream()) + .collect()) + } +} + +impl<'a, V> Streamer<'a> for TermMerger<'a, V> + where V: BinarySerializable +{ + type Item = &'a [u8]; + + fn next(&'a mut self) -> Option<Self::Item> { + if self.advance() { + Some(self.current_streamers[0].streamer.key()) + } else { + None + } + + } +} |