summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/common/mod.rs1
-rw-r--r--src/compression/stream.rs2
-rw-r--r--src/core/segment_reader.rs28
-rw-r--r--src/error.rs1
-rw-r--r--src/fastfield/reader.rs9
-rw-r--r--src/indexer/segment_manager.rs2
-rw-r--r--src/lib.rs2
-rw-r--r--src/termdict/mod.rs3
-rw-r--r--src/termdict/streamdict/delta_encoder.rs48
-rw-r--r--src/termdict/streamdict/mod.rs44
-rw-r--r--src/termdict/streamdict/streamer.rs145
-rw-r--r--src/termdict/streamdict/term_block_encoder.rs164
-rw-r--r--src/termdict/streamdict/termdict.rs115
-rw-r--r--src/termdict/streamdict/terminfo_block_encoder.rs117
14 files changed, 200 insertions, 481 deletions
diff --git a/src/common/mod.rs b/src/common/mod.rs
index e8c8763..803fe8b 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -1,3 +1,4 @@
+
mod serialize;
mod timer;
mod vint;
diff --git a/src/compression/stream.rs b/src/compression/stream.rs
index 0af50ca..29d1803 100644
--- a/src/compression/stream.rs
+++ b/src/compression/stream.rs
@@ -18,7 +18,7 @@ impl CompressedIntStream {
}
}
- pub fn read(&mut self, mut output: &mut [u32]) {
+ pub fn read(&mut self, output: &mut [u32]) {
let mut num_els: usize = output.len();
let mut start: usize = 0;
loop {
diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs
index e3d2034..80bc252 100644
--- a/src/core/segment_reader.rs
+++ b/src/core/segment_reader.rs
@@ -4,10 +4,12 @@ use core::SegmentId;
use core::SegmentComponent;
use std::sync::RwLock;
use common::HasLen;
+use error::ErrorKind;
use core::SegmentMeta;
use fastfield::{self, FastFieldNotAvailableError};
use fastfield::DeleteBitSet;
use store::StoreReader;
+use directory::ReadOnlySource;
use schema::Document;
use DocId;
use std::str;
@@ -171,25 +173,39 @@ impl SegmentReader {
})
}
+
+ /// Returns a field reader associated to the field given in argument.
+ ///
+ /// The field reader is in charge of iterating through the
+ /// term dictionary associated to a specific field,
+ /// and opening the posting list associated to any term.
pub fn field_reader(&self, field: Field) -> Result<Arc<FieldReader>> {
if let Some(field_reader) = self.field_reader_cache.read()
- .unwrap() // TODO
+ .expect("Lock poisoned. This should never happen")
.get(&field) {
return Ok(field_reader.clone());
}
// TODO better error
- let termdict_source = self.termdict_composite
+ let termdict_source: ReadOnlySource = self.termdict_composite
.open_read(field)
- .ok_or("Field not found")?;
+ .ok_or_else(|| {
+ ErrorKind::SchemaError(
+ format!("Could not find {:?} term dictionary", field)
+ )
+ })?;
let postings_source = self.postings_composite
.open_read(field)
- .ok_or("field not found")?;
+ .ok_or_else(|| {
+ ErrorKind::SchemaError(format!("Could not find {:?} postings", field))
+ })?;
let positions_source = self.positions_composite
.open_read(field)
- .ok_or("field not found")?;
+ .ok_or_else(|| {
+ ErrorKind::SchemaError(format!("Could not find {:?} positions", field))
+ })?;
let field_reader = Arc::new(FieldReader::new(
termdict_source,
@@ -201,7 +217,7 @@ impl SegmentReader {
self.field_reader_cache
.write()
- .unwrap() // TODO
+ .expect("Field reader cache lock poisoned. This should never happen.")
.insert(field, field_reader.clone());
Ok(field_reader)
}
diff --git a/src/error.rs b/src/error.rs
index d6ce4a3..8b34571 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -10,6 +10,7 @@ use schema;
use fastfield::FastFieldNotAvailableError;
use serde_json;
+
error_chain!(
errors {
/// Path does not exist.
diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs
index a2992c3..0c59cba 100644
--- a/src/fastfield/reader.rs
+++ b/src/fastfield/reader.rs
@@ -136,9 +136,12 @@ impl From<Vec<u64>> for U64FastFieldReader {
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer = FastFieldSerializer::from_write(write).unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema);
- for val in vals {
- let mut fast_field_writer = fast_field_writers.get_field_writer(field).unwrap();
- fast_field_writer.add_val(val);
+ // TODO Error not unwrap
+ {
+ let fast_field_writer = fast_field_writers.get_field_writer(field).unwrap();
+ for val in vals {
+ fast_field_writer.add_val(val);
+ }
}
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs
index 35c264c..7a37f35 100644
--- a/src/indexer/segment_manager.rs
+++ b/src/indexer/segment_manager.rs
@@ -194,7 +194,7 @@ impl SegmentManager {
.writing
.remove(&after_merge_segment_entry.segment_id());
- let mut target_register: &mut SegmentRegister = {
+ let target_register: &mut SegmentRegister = {
if registers_lock
.uncommitted
.contains_all(before_merge_segment_ids) {
diff --git a/src/lib.rs b/src/lib.rs
index 6a1e6be..d8f2acc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -98,6 +98,8 @@ mod core;
mod compression;
mod indexer;
mod common;
+
+#[allow(unused_doc_comment)]
mod error;
mod analyzer;
mod datastruct;
diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs
index 1d1912f..da8b659 100644
--- a/src/termdict/mod.rs
+++ b/src/termdict/mod.rs
@@ -220,7 +220,7 @@ mod tests {
}
#[test]
- fn test_term_dictionary() {
+ fn test_term_dictionary_simple() {
let mut directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
@@ -347,6 +347,7 @@ mod tests {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
for &(ref id, ref i) in &ids {
+ println!("doc {}", id);
term_dictionary_builder.insert(id.as_bytes(), &make_term_info(*i)).unwrap();
}
term_dictionary_builder.finish().unwrap()
diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs
new file mode 100644
index 0000000..21e5aac
--- /dev/null
+++ b/src/termdict/streamdict/delta_encoder.rs
@@ -0,0 +1,48 @@
+pub fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
+ s1.iter()
+ .zip(s2.iter())
+ .take_while(|&(a, b)| a==b)
+ .count()
+}
+
+
+#[derive(Default)]
+pub struct DeltaEncoder {
+ last_term: Vec<u8>,
+}
+
+impl DeltaEncoder {
+ pub fn encode<'a>(&mut self, term: &'a [u8]) -> (usize, &'a [u8]) {
+ let prefix_len = common_prefix_len(term, &self.last_term);
+ self.last_term.truncate(prefix_len);
+ self.last_term.extend_from_slice(&term[prefix_len..]);
+ (prefix_len, &term[prefix_len..])
+ }
+
+ pub fn term(&self) -> &[u8] {
+ &self.last_term[..]
+ }
+}
+
+#[derive(Default)]
+pub struct DeltaDecoder {
+ term: Vec<u8>,
+}
+
+impl DeltaDecoder {
+ pub fn with_previous_term(term: Vec<u8>) -> DeltaDecoder {
+ DeltaDecoder {
+ term: Vec::from(term)
+ }
+ }
+
+ pub fn decode(&mut self, prefix_len: usize, suffix: &[u8]) -> &[u8] {
+ self.term.truncate(prefix_len);
+ self.term.extend_from_slice(suffix);
+ &self.term[..]
+ }
+
+ pub fn term(&self) -> &[u8] {
+ &self.term[..]
+ }
+}
diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs
index 4a4db76..96a2c41 100644
--- a/src/termdict/streamdict/mod.rs
+++ b/src/termdict/streamdict/mod.rs
@@ -1,49 +1,9 @@
-
mod termdict;
mod streamer;
-mod term_block_encoder;
-mod terminfo_block_encoder;
+mod delta_encoder;
+pub use self::delta_encoder::{DeltaEncoder, DeltaDecoder};
pub use self::termdict::TermDictionaryImpl;
pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::streamer::TermStreamerImpl;
pub use self::streamer::TermStreamerBuilderImpl;
-use self::term_block_encoder::{TermBlockEncoder, TermBlockDecoder};
-use self::terminfo_block_encoder::{TermInfoBlockEncoder, TermInfoBlockDecoder};
-
-use schema::FieldType;
-
-#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
-pub(crate) enum TermDeserializerOption {
- StrNoPositions,
- StrWithPositions,
- U64,
-}
-
-impl TermDeserializerOption {
-
- pub fn has_positions(&self) -> bool {
- match *self {
- TermDeserializerOption::StrWithPositions => true,
- _ => false
- }
- }
-
-}
-
-fn make_deserializer_options(field_type: &FieldType) -> TermDeserializerOption {
- match *field_type {
- FieldType::Str(ref text_options) => {
- let indexing_options = text_options.get_indexing_options();
- if indexing_options.is_position_enabled() {
- TermDeserializerOption::StrWithPositions
- }
- else {
- TermDeserializerOption::StrNoPositions
- }
- }
- _ => {
- TermDeserializerOption::U64
- }
- }
-} \ No newline at end of file
diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs
index b326334..8ed95fd 100644
--- a/src/termdict/streamdict/streamer.rs
+++ b/src/termdict/streamdict/streamer.rs
@@ -3,23 +3,22 @@
use std::cmp::max;
use super::TermDictionaryImpl;
use termdict::{TermStreamerBuilder, TermStreamer};
-use super::{TermBlockDecoder, TermInfoBlockDecoder};
use postings::TermInfo;
-use super::TermDeserializerOption;
+use super::delta_encoder::DeltaDecoder;
-pub(crate) fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
+fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
target_key: &[u8],
- deserializer_option: TermDeserializerOption)
+ has_positions: bool)
-> TermStreamerImpl<'a>
{
let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref());
let offset: usize = offset as usize;
TermStreamerImpl {
- remaining_in_block: 0,
- term_block_decoder: TermBlockDecoder::given_previous_term(&prev_key[..]),
- terminfo_block_decoder: TermInfoBlockDecoder::new(deserializer_option.has_positions()),
cursor: &term_dictionary.stream_data()[offset..],
+ delta_decoder: DeltaDecoder::with_previous_term(prev_key),
+ term_info: TermInfo::default(),
+ has_positions: has_positions,
}
}
@@ -28,13 +27,11 @@ pub(crate) fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
pub struct TermStreamerBuilderImpl<'a>
{
term_dictionary: &'a TermDictionaryImpl,
- block_start: &'a [u8],
origin: usize,
- cursor: usize,
offset_from: usize,
offset_to: usize,
current_key: Vec<u8>,
- deserializer_option: TermDeserializerOption,
+ has_positions: bool,
}
impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
@@ -43,60 +40,44 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
/// Limit the range to terms greater or equal to the bound
fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
- unimplemented!();
- /*
let target_key = bound.as_ref();
- let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option);
+ let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.lt(target_key);
- let (block_start, cursor, current_key) = get_offset(smaller_than, streamer);
- self.block_start = block_start;
+ let (offset_before, current_key) = get_offset(smaller_than, streamer);
self.current_key = current_key;
- self.cursor = cursor;
- //self.offset_from = ;
- */
+ self.offset_from = offset_before - self.origin;
self
}
/// Limit the range to terms strictly greater than the bound
fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
- unimplemented!();
- /*
let target_key = bound.as_ref();
- let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option);
+ let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.le(target_key);
- let (block_start, cursor, current_key) = get_offset(smaller_than, streamer);
- self.block_start = block_start;
+ let (offset_before, current_key) = get_offset(smaller_than, streamer);
self.current_key = current_key;
- self.cursor = cursor;
- //self.offset_from = offset_before - self.origin;
- */
+ self.offset_from = offset_before - self.origin;
self
}
/// Limit the range to terms lesser or equal to the bound
fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
- unimplemented!();
- /*
let target_key = bound.as_ref();
- let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option);
+ let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.lt(target_key);
let (offset_before, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
- */
}
/// Limit the range to terms lesser or equal to the bound
fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
- unimplemented!();
- /*
let target_key = bound.as_ref();
- let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option);
+ let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.le(target_key);
let (offset_before, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
- */
}
/// Build the streamer.
@@ -105,10 +86,10 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let start = self.offset_from;
let stop = max(self.offset_to, start);
TermStreamerImpl {
- remaining_in_block: 0,
cursor: &data[start..stop],
- term_block_decoder: TermBlockDecoder::given_previous_term(&self.current_key),
- terminfo_block_decoder: TermInfoBlockDecoder::new(self.deserializer_option.has_positions()),
+ delta_decoder: DeltaDecoder::with_previous_term(self.current_key),
+ term_info: TermInfo::default(),
+ has_positions: self.has_positions,
}
}
}
@@ -122,42 +103,37 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
/// - the term_buffer state to initialize the block)
fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P,
mut streamer: TermStreamerImpl<'a>)
- -> (&'a [u8], usize, Vec<u8>)
-{//&'a [u8]
- let mut block_start: &[u8] = streamer.cursor;
- let mut cursor = 0;
- let mut term_buffer: Vec<u8> = vec!();
-
- while streamer.advance() {
- let iter_key = streamer.key();
+ -> (usize, Vec<u8>)
+{
+ let mut prev: &[u8] = streamer.cursor;
+
+ let mut prev_data: Vec<u8> = Vec::from(streamer.delta_decoder.term());
+
+ while let Some((iter_key, _)) = streamer.next() {
if !predicate(iter_key.as_ref()) {
- return (block_start, streamer.term_block_decoder.cursor() - 1, term_buffer);
- }
- if streamer.remaining_in_block == 0 {
- block_start = streamer.cursor;
- term_buffer.clear();
- term_buffer.extend_from_slice(iter_key.as_ref());
+ return (prev.as_ptr() as usize, prev_data);
}
+ prev = streamer.cursor;
+ prev_data.clear();
+ prev_data.extend_from_slice(iter_key.as_ref());
}
- (block_start, streamer.term_block_decoder.cursor() - 1, term_buffer)
+ (prev.as_ptr() as usize, prev_data)
}
impl<'a> TermStreamerBuilderImpl<'a>
{
pub(crate) fn new(
term_dictionary: &'a TermDictionaryImpl,
- deserializer_option: TermDeserializerOption) -> Self {
+ has_positions: bool) -> Self {
let data = term_dictionary.stream_data();
let origin = data.as_ptr() as usize;
TermStreamerBuilderImpl {
term_dictionary: term_dictionary,
- block_start: term_dictionary.stream_data().as_ref(),
- cursor: 0,
origin: origin,
offset_from: 0,
offset_to: data.len(),
current_key: Vec::with_capacity(300),
- deserializer_option: deserializer_option,
+ has_positions: has_positions,
}
}
}
@@ -167,49 +143,56 @@ impl<'a> TermStreamerBuilderImpl<'a>
/// See [`TermStreamer`](./trait.TermStreamer.html)
pub struct TermStreamerImpl<'a>
{
- remaining_in_block: usize,
- term_block_decoder: TermBlockDecoder<'a>,
- terminfo_block_decoder: TermInfoBlockDecoder<'a>,
cursor: &'a [u8],
+ delta_decoder: DeltaDecoder,
+ term_info: TermInfo,
+ has_positions: bool
}
-impl<'a> TermStreamerImpl<'a>
-{
- fn load_block(&mut self) -> bool {
- self.remaining_in_block = self.cursor[0] as usize;
- if self.remaining_in_block == 0 {
- false
- }
- else {
- self.cursor = &self.cursor[1..];
- self.cursor = self.term_block_decoder.decode_block(self.cursor);
- self.cursor = self.terminfo_block_decoder.decode_block(self.cursor, self.remaining_in_block);
- true
+
+fn deserialize_vint(data: &mut &[u8]) -> u64 {
+ let mut res = 0;
+ let mut shift = 0;
+ for i in 0.. {
+ let b = data[i];
+ res |= ((b % 128u8) as u64) << shift;
+ if b & 128u8 != 0u8 {
+ *data = &data[(i + 1)..];
+ break;
}
+ shift += 7;
}
+ res
}
-
impl<'a> TermStreamer for TermStreamerImpl<'a>
{
fn advance(&mut self) -> bool {
- if self.remaining_in_block == 0 {
- if !self.load_block() {
- return false;
- }
+ if self.cursor.is_empty() {
+ return false;
+ }
+ let common_length: usize = deserialize_vint(&mut self.cursor) as usize;
+ let suffix_length: usize = deserialize_vint(&mut self.cursor) as usize;
+ self.delta_decoder.decode(common_length, &self.cursor[..suffix_length]);
+ self.cursor = &self.cursor[suffix_length..];
+
+ self.term_info.doc_freq = deserialize_vint(&mut self.cursor) as u32;
+ self.term_info.postings_offset = deserialize_vint(&mut self.cursor) as u32;
+
+ if self.has_positions {
+ self.term_info.positions_offset = deserialize_vint(&mut self.cursor) as u32;
+ self.term_info.positions_inner_offset = self.cursor[0];
+ self.cursor = &self.cursor[1..];
}
- self.remaining_in_block -= 1;
- self.term_block_decoder.advance();
- self.terminfo_block_decoder.advance();
true
}
fn key(&self) -> &[u8] {
- self.term_block_decoder.term()
+ self.delta_decoder.term()
}
fn value(&self) -> &TermInfo {
- self.terminfo_block_decoder.term_info()
+ &self.term_info
}
}
diff --git a/src/termdict/streamdict/term_block_encoder.rs b/src/termdict/streamdict/term_block_encoder.rs
deleted file mode 100644
index 157a3cf..0000000
--- a/src/termdict/streamdict/term_block_encoder.rs
+++ /dev/null
@@ -1,164 +0,0 @@
-use compression::{BlockEncoder, BlockDecoder, NUM_DOCS_PER_BLOCK};
-use std::io::{self, Write};
-
-fn compute_common_prefix_length(left: &[u8], right: &[u8]) -> usize {
- left.iter()
- .cloned()
- .zip(right.iter().cloned())
- .take_while(|&(b1, b2)| b1 == b2)
- .count()
-}
-
-
-pub struct TermBlockEncoder {
- block_encoder: BlockEncoder,
-
- pop_lens: [u32; NUM_DOCS_PER_BLOCK],
- push_lens: [u32; NUM_DOCS_PER_BLOCK],
- suffixes: Vec<u8>,
-
- previous_key: Vec<u8>,
- count: usize,
-}
-
-impl TermBlockEncoder {
- pub fn new() -> TermBlockEncoder {
- TermBlockEncoder {
- block_encoder: BlockEncoder::new(),
- pop_lens: [0u32; NUM_DOCS_PER_BLOCK],
- push_lens: [0u32; NUM_DOCS_PER_BLOCK],
- suffixes: Vec::with_capacity(NUM_DOCS_PER_BLOCK*5),
-
- previous_key: Vec::with_capacity(30),
-
- count: 0,
- }
- }
-
- pub fn encode(&mut self, key: &[u8]) {
- let common_prefix_len = compute_common_prefix_length(&self.previous_key, key);
- self.pop_lens[self.count] = (self.previous_key.len() - common_prefix_len) as u32;
- self.push_lens[self.count] = (key.len() - common_prefix_len) as u32;
- self.previous_key.clear();
- let suffix = &key[common_prefix_len..];
- self.suffixes.extend_from_slice(suffix);
- self.previous_key.extend_from_slice(key);
- self.count += 1;
- }
-
- pub fn len(&self) -> usize {
- self.count
- }
-
- pub fn flush<W: Write>(&mut self, output: &mut W) -> io::Result<()> {
- for i in self.count..NUM_DOCS_PER_BLOCK {
- self.pop_lens[i] = 0u32;
- self.push_lens[i] = 0u32;
- }
- output.write_all(self.block_encoder.compress_block_unsorted(&self.pop_lens))?;
- output.write_all(self.block_encoder.compress_block_unsorted(&self.push_lens))?;
- output.write_all(&self.suffixes[..])?;
- self.suffixes.clear();
- self.count = 0;
- Ok(())
- }
-}
-
-
-
-pub struct TermBlockDecoder<'a> {
- pop_lens_decoder: BlockDecoder,
- push_lens_decoder: BlockDecoder,
- suffixes: &'a [u8],
- current_key: Vec<u8>,
- cursor: usize,
-}
-
-
-impl<'a> TermBlockDecoder<'a> {
- pub fn new() -> TermBlockDecoder<'a> {
- TermBlockDecoder::given_previous_term(&[])
- }
-
- pub fn cursor(&self) -> usize {
- self.cursor
- }
-
- pub fn given_previous_term(previous_term: &[u8]) -> TermBlockDecoder<'a> {
- let mut current_key = Vec::with_capacity(30);
- current_key.extend_from_slice(previous_term);
- TermBlockDecoder {
- pop_lens_decoder: BlockDecoder::new(),
- push_lens_decoder: BlockDecoder::new(),
- current_key: current_key,
- suffixes: &[],
- cursor: 0,
- }
- }
-
- pub fn term(&self) -> &[u8] {
- &self.current_key
- }
-
- pub fn decode_block(&mut self, mut compressed_data: &'a [u8]) -> &'a [u8] {
- {
- let consumed_data_len = self.pop_lens_decoder.uncompress_block_unsorted(compressed_data);
- compressed_data = &compressed_data[consumed_data_len..];
- }
- {
- let consumed_data_len = self.push_lens_decoder.uncompress_block_unsorted(compressed_data);
- compressed_data = &compressed_data[consumed_data_len..];
- }
- let suffix_len: u32 = self.push_lens_decoder.output_array()[0..].iter().cloned().sum();
- let suffix_len: usize = suffix_len as usize;
- self.suffixes = &compressed_data[..suffix_len];
- self.cursor = 0;
- &compressed_data[suffix_len..]
- }
-
- pub fn advance(&mut self) {
- assert!(self.cursor < NUM_DOCS_PER_BLOCK);
- let pop_len = self.pop_lens_decoder.output(self.cursor) as usize;
- let push_len = self.push_lens_decoder.output(self.cursor) as usize;
- let previous_len = self.current_key.len();
- self.current_key.truncate(previous_len - pop_len);
- self.current_key.extend_from_slice(&self.suffixes[..push_len]);
- self.suffixes = &self.suffixes[push_len..];
- self.cursor += 1;
- }
-}
-
-
-
-#[cfg(test)]
-mod tests {
- use super::{TermBlockEncoder, TermBlockDecoder};
-
- #[test]
- fn test_encoding_terms() {
- let mut buffer: Vec<u8> = vec!();
- let mut terms = vec!();
- {
- let mut term_block_encoder = TermBlockEncoder::new();
- for i in 0..128 {
- terms.push(format!("term{}", i * 7231));
- }
- for term in &terms {
- term_block_encoder.encode(term.as_bytes());
- }
- term_block_encoder.flush(&mut buffer).unwrap();
- }
- assert_eq!(buffer.len(), 711);
-
- let mut block_decoder = TermBlockDecoder::new();
- assert_eq!(block_decoder.decode_block(&buffer[..]).len(), 0);
- for i in 0..128 {
- block_decoder.advance();
- assert_eq!(block_decoder.term(), terms[i].as_bytes());
- }
-
- }
-}
-
-
-
diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs
index 5c15652..9c0dfb8 100644
--- a/src/termdict/streamdict/termdict.rs
+++ b/src/termdict/streamdict/termdict.rs
@@ -6,18 +6,14 @@ use fst::raw::Fst;
use directory::ReadOnlySource;
use common::BinarySerializable;
use common::CountingWriter;
-use bincode;
-use std::cmp::Ordering;
use postings::TermInfo;
use schema::FieldType;
+use super::DeltaEncoder;
use fst::raw::Node;
-use compression::NUM_DOCS_PER_BLOCK;
-use super::make_deserializer_options;
-use super::TermDeserializerOption;
-use super::streamer::stream_before;
+use common::VInt;
use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
-use super::{TermBlockEncoder, TermInfoBlockEncoder};
use super::{TermStreamerImpl, TermStreamerBuilderImpl};
+use termdict::TermStreamerBuilder;
const INDEX_INTERVAL: usize = 1024;
@@ -25,17 +21,30 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
+fn has_positions(field_type: &FieldType) -> bool {
+ match *field_type {
+ FieldType::Str(ref text_options) => {
+ let indexing_options = text_options.get_indexing_options();
+ if indexing_options.is_position_enabled() {
+ true
+ }
+ else {
+ false
+ }
+ }
+ _ => {
+ false
+ }
+ }
+}
+
/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html)
pub struct TermDictionaryBuilderImpl<W>
{
+ has_positions: bool,
write: CountingWriter<W>,
-
- term_block_encoder: TermBlockEncoder,
- terminfo_block_encoder: TermInfoBlockEncoder,
-
+ delta_encoder: DeltaEncoder,
block_index: fst::MapBuilder<Vec<u8>>,
- last_key: Vec<u8>,
-
len: usize,
}
@@ -52,7 +61,7 @@ impl<W> TermDictionaryBuilderImpl<W>
{
fn add_index_entry(&mut self) {
self.block_index
- .insert(&self.last_key, self.write.written_bytes() as u64)
+ .insert(&self.delta_encoder.term(), self.write.written_bytes() as u64)
.unwrap();
}
@@ -67,27 +76,20 @@ impl<W> TermDictionaryBuilderImpl<W>
if self.len % INDEX_INTERVAL == 0 {
self.add_index_entry();
}
- self.last_key.clear();
- self.last_key.extend_from_slice(key);
- self.term_block_encoder.encode(key);
+ let (common_prefix_len, suffix) = self.delta_encoder.encode(key);
+ VInt(common_prefix_len as u64).serialize(&mut self.write)?;
+ VInt(suffix.len() as u64).serialize(&mut self.write)?;
+ self.write.write_all(suffix)?;
self.len += 1;
Ok(())
}
- fn flush_block(&mut self) -> io::Result<()> {
- let block_size = self.term_block_encoder.len();
- if block_size > 0 {
- self.write.write(&[block_size as u8])?;
- self.term_block_encoder.flush(&mut self.write)?;
- self.terminfo_block_encoder.flush(&mut self.write)?;
- }
- Ok(())
- }
-
pub(crate) fn insert_value(&mut self, value: &TermInfo) -> io::Result<()> {
- self.terminfo_block_encoder.encode(value);
- if self.len % NUM_DOCS_PER_BLOCK == 0 {
- self.flush_block()?;
+ VInt(value.doc_freq as u64).serialize(&mut self.write)?;
+ VInt(value.postings_offset as u64).serialize(&mut self.write)?;
+ if self.has_positions {
+ VInt(value.positions_offset as u64).serialize(&mut self.write)?;
+ self.write.write(&[value.positions_inner_offset])?;
}
Ok(())
}
@@ -98,21 +100,14 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
{
/// Creates a new `TermDictionaryBuilder`
fn new(mut write: W, field_type: FieldType) -> io::Result<Self> {
- let deserializer_options = make_deserializer_options(&field_type);
- {
- // serialize the field type.
- let data: Vec<u8> = bincode::serialize(&deserializer_options, bincode::Bounded(256u64))
- .expect("Failed to serialize field type within 256 bytes. This should never be a problem.");
- write.write_all(&[data.len() as u8])?;
- write.write_all(&data[..])?;
- }
- let has_positions = deserializer_options.has_positions();
+ let has_positions = has_positions(&field_type);
+ let has_positions_code = if has_positions { 255u8 } else { 0u8 };
+ write.write_all(&[has_positions_code])?;
Ok(TermDictionaryBuilderImpl {
- term_block_encoder: TermBlockEncoder::new(),
- terminfo_block_encoder: TermInfoBlockEncoder::new(has_positions),
+ has_positions: has_positions,
write: CountingWriter::wrap(write),
+ delta_encoder: DeltaEncoder::default(),
block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"),
- last_key: Vec::with_capacity(128),
len: 0,
})
}
@@ -129,9 +124,7 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
fn finish(mut self) -> io::Result<W> {
- self.flush_block()?;
self.add_index_entry();
- self.write.write_all(&[0u8])?;
let (mut w, split_len) = self.write.finish()?;
let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?;
w.write_all(&fst_write)?;
@@ -159,7 +152,7 @@ pub struct TermDictionaryImpl
{
stream_data: ReadOnlySource,
fst_index: fst::Map,
- deserializer_option: TermDeserializerOption,
+ has_positions: bool,
}
impl TermDictionaryImpl
@@ -224,13 +217,8 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl
/// Opens a `TermDictionary` given a data source.
fn from_source(mut source: ReadOnlySource) -> io::Result<Self> {
- // it won't take more than 100 bytes
- let deserialize_option_len = source.slice(0, 1).as_slice()[0] as usize;
- let deserialize_option_source = source.slice(1, 1 + deserialize_option_len);
- let deserialize_option_buffer: &[u8] = deserialize_option_source.as_slice();
- let deserializer_option: TermDeserializerOption = bincode::deserialize(deserialize_option_buffer)
- .expect("Field dictionary data is corrupted. Failed to deserialize field type.");
- source = source.slice_from(1 + deserialize_option_len);
+ let has_positions = source.slice(0, 1).as_ref()[0] == 255u8;
+ source = source.slice_from(1);
let total_len = source.len();
let length_offset = total_len - 8;
@@ -243,31 +231,28 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl
let fst_index = open_fst_index(fst_data)?;
Ok(TermDictionaryImpl {
+ has_positions: has_positions,
stream_data: stream_data,
fst_index: fst_index,
- deserializer_option: deserializer_option,
})
}
/// Lookups the value corresponding to the key.
fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<TermInfo> {
- let mut streamer = stream_before(self, target_key.as_ref(), self.deserializer_option);
- while streamer.advance() {
- let position = streamer.key().cmp(target_key.as_ref());
- match position {
- Ordering::Less => {}
- Ordering::Equal => return Some(streamer.value().clone()),
- Order