summaryrefslogtreecommitdiffstats
path: root/src/termdict
diff options
context:
space:
mode:
authorPaul Masurel <paul.masurel@gmail.com>2017-08-27 18:44:37 +0900
committerPaul Masurel <paul.masurel@gmail.com>2017-08-27 18:44:37 +0900
commit69351fb4a59681ee309c344a6afa5dd7e14d1cc8 (patch)
tree6ac7ac795c0c4ec1708dd4339a6d903246e40d11 /src/termdict
parent3d0082d0202c8a0c0b6b82c503605c427b6d72bd (diff)
Toward a new codec
Diffstat (limited to 'src/termdict')
-rw-r--r--src/termdict/streamdict/delta_encoder.rs97
-rw-r--r--src/termdict/streamdict/mod.rs3
-rw-r--r--src/termdict/streamdict/streamer.rs27
-rw-r--r--src/termdict/streamdict/termdict.rs86
4 files changed, 153 insertions, 60 deletions
diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs
index 7418e4f..3415296 100644
--- a/src/termdict/streamdict/delta_encoder.rs
+++ b/src/termdict/streamdict/delta_encoder.rs
@@ -1,7 +1,4 @@
use postings::TermInfo;
-use common::VInt;
-use common::BinarySerializable;
-use std::io::{self, Write};
use std::mem;
/// Returns the len of the longest
@@ -20,23 +17,23 @@ fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
#[derive(Default)]
pub struct TermDeltaEncoder {
last_term: Vec<u8>,
+ prefix_len: usize,
}
impl TermDeltaEncoder {
- pub fn encode<'a, W: Write>(&mut self, term: &'a [u8], write: &mut W) -> io::Result<()> {
- let prefix_len = common_prefix_len(term, &self.last_term);
- self.last_term.truncate(prefix_len);
- self.last_term.extend_from_slice(&term[prefix_len..]);
- let suffix = &term[prefix_len..];
- VInt(prefix_len as u64).serialize(write)?;
- VInt(suffix.len() as u64).serialize(write)?;
- write.write_all(suffix)?;
- Ok(())
+ pub fn encode<'a>(&mut self, term: &'a [u8]) {
+ self.prefix_len = common_prefix_len(term, &self.last_term);
+ self.last_term.truncate(self.prefix_len);
+ self.last_term.extend_from_slice(&term[self.prefix_len..]);
}
pub fn term(&self) -> &[u8] {
&self.last_term[..]
}
+
+ pub fn prefix_suffix(&mut self) -> (usize, &[u8]) {
+ (self.prefix_len, &self.last_term[self.prefix_len..])
+ }
}
#[derive(Default)]
@@ -51,11 +48,7 @@ impl TermDeltaDecoder {
}
}
- pub fn decode(&mut self, cursor: &mut &[u8]) {
- let prefix_len: usize = deserialize_vint(cursor) as usize;
- let suffix_length: usize = deserialize_vint(cursor) as usize;
- let suffix = &cursor[..suffix_length];
- *cursor = &cursor[suffix_length..];
+ pub fn decode(&mut self, prefix_len: usize, suffix: &[u8]) {
self.term.truncate(prefix_len);
self.term.extend_from_slice(suffix);
}
@@ -65,11 +58,17 @@ impl TermDeltaDecoder {
}
}
-
+#[derive(Default)]
+pub struct DeltaTermInfo {
+ pub doc_freq: u32,
+ pub delta_postings_offset: u32,
+ pub delta_positions_offset: u32,
+ pub positions_inner_offset: u8,
+}
pub struct TermInfoDeltaEncoder {
term_info: TermInfo,
- has_positions: bool,
+ pub has_positions: bool,
}
impl TermInfoDeltaEncoder {
@@ -81,34 +80,22 @@ impl TermInfoDeltaEncoder {
}
}
- pub fn encode<W: Write>(&mut self, term_info: TermInfo, write: &mut W) -> io::Result<()> {
- VInt(term_info.doc_freq as u64).serialize(write)?;
- let delta_postings_offset = term_info.postings_offset - self.term_info.postings_offset;
- VInt(delta_postings_offset as u64).serialize(write)?;
+ pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
+ let mut delta_term_info = DeltaTermInfo {
+ doc_freq: term_info.doc_freq,
+ delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
+ delta_positions_offset: 0,
+ positions_inner_offset: 0,
+ };
if self.has_positions {
- let delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset;
- VInt(delta_positions_offset as u64).serialize(write)?;
- write.write(&[term_info.positions_inner_offset])?;
+ delta_term_info.delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset;
+ delta_term_info.positions_inner_offset = term_info.positions_inner_offset;
}
mem::replace(&mut self.term_info, term_info);
- Ok(())
+ delta_term_info
}
}
-fn deserialize_vint(data: &mut &[u8]) -> u64 {
- let mut res = 0;
- let mut shift = 0;
- for i in 0.. {
- let b = data[i];
- res |= ((b % 128u8) as u64) << shift;
- if b & 128u8 != 0u8 {
- *data = &data[(i + 1)..];
- break;
- }
- shift += 7;
- }
- res
-}
pub struct TermInfoDeltaDecoder {
term_info: TermInfo,
@@ -123,17 +110,27 @@ impl TermInfoDeltaDecoder {
}
}
- pub fn decode(&mut self, cursor: &mut &[u8]) {
- let doc_freq = deserialize_vint(cursor) as u32;
+ pub fn decode(&mut self, code: u8, cursor: &mut &[u8]) {
+ let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize;
+ let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize;
+ const MASK: [u32; 4] = [
+ 0xffu32,
+ 0xffffu32,
+ 0xffffffu32,
+ 0xffffffffu32,
+ ];
+ let doc_freq: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_docfreq];
+ *cursor = &cursor[num_bytes_docfreq + 1 ..];
+ let delta_postings_offset: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_postings_offset];
+ *cursor = &cursor[num_bytes_postings_offset + 1..];
self.term_info.doc_freq = doc_freq;
- let delta_postings = deserialize_vint(cursor) as u32;
- self.term_info.postings_offset += delta_postings;
+ self.term_info.postings_offset += delta_postings_offset;
if self.has_positions {
- let delta_positions = deserialize_vint(cursor) as u32;
- self.term_info.positions_offset += delta_positions;
- let position_inner_offset = cursor[0];
- *cursor = &cursor[1..];
- self.term_info.positions_inner_offset = position_inner_offset;
+ let num_bytes_positions_offset = ((code >> 5) & 3) as usize;
+ let delta_positions_offset: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_positions_offset];
+ self.term_info.positions_offset += delta_positions_offset;
+ self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset + 1];
+ *cursor = &cursor[num_bytes_positions_offset + 2..];
}
}
diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs
index 1c9a148..faf9c13 100644
--- a/src/termdict/streamdict/mod.rs
+++ b/src/termdict/streamdict/mod.rs
@@ -3,9 +3,10 @@ mod streamer;
mod delta_encoder;
pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder};
-pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder};
+pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder, DeltaTermInfo};
pub use self::termdict::TermDictionaryImpl;
pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::streamer::TermStreamerImpl;
pub use self::streamer::TermStreamerBuilderImpl;
+
diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs
index 1363bf5..4779a65 100644
--- a/src/termdict/streamdict/streamer.rs
+++ b/src/termdict/streamdict/streamer.rs
@@ -4,9 +4,9 @@ use std::cmp::max;
use super::TermDictionaryImpl;
use termdict::{TermStreamerBuilder, TermStreamer};
use postings::TermInfo;
+use common::BinarySerializable;
use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder};
-
fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
target_key: &[u8],
has_positions: bool)
@@ -155,8 +155,29 @@ impl<'a> TermStreamer for TermStreamerImpl<'a>
if self.cursor.is_empty() {
return false;
}
- self.term_delta_decoder.decode(&mut self.cursor);
- self.term_info_decoder.decode(&mut self.cursor);
+ let code: u8 = self.cursor[0];
+ let mut cursor: &[u8] = &self.cursor[1..];
+
+ let prefix_suffix_packed = (code & 1u8) == 1u8;
+ let (prefix_len, suffix_len): (usize, usize) =
+ if prefix_suffix_packed {
+ let b = cursor[0];
+ cursor = &cursor[1..];
+ let prefix_len = (b & 15u8) as usize;
+ let suffix_len = (b >> 4u8) as usize;
+ (prefix_len, suffix_len)
+ }
+ else {
+ let prefix_len = u32::deserialize(&mut cursor).unwrap();
+ let suffix_len = u32::deserialize(&mut cursor).unwrap();
+ (prefix_len as usize, suffix_len as usize)
+ };
+
+ let suffix = &cursor[..suffix_len];
+ self.term_delta_decoder.decode(prefix_len, suffix);
+ cursor = &cursor[suffix_len..];
+ self.term_info_decoder.decode(code, &mut cursor);
+ self.cursor = cursor;
true
}
diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs
index ab5f9df..e5487b5 100644
--- a/src/termdict/streamdict/termdict.rs
+++ b/src/termdict/streamdict/termdict.rs
@@ -2,18 +2,21 @@
use std::io::{self, Write};
use fst;
+
use fst::raw::Fst;
use directory::ReadOnlySource;
use common::BinarySerializable;
use common::CountingWriter;
use postings::TermInfo;
use schema::FieldType;
-use super::{TermDeltaEncoder, TermInfoDeltaEncoder};
+use super::{TermDeltaEncoder, TermInfoDeltaEncoder, DeltaTermInfo};
use fst::raw::Node;
use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use super::{TermStreamerImpl, TermStreamerBuilderImpl};
use termdict::TermStreamerBuilder;
+use std::mem::transmute;
+const PADDING_SIZE: usize = 16;
const INDEX_INTERVAL: usize = 1024;
fn convert_fst_error(e: fst::Error) -> io::Error {
@@ -75,17 +78,71 @@ impl<W> TermDictionaryBuilderImpl<W>
if self.len % INDEX_INTERVAL == 0 {
self.add_index_entry();
}
- self.term_delta_encoder.encode(key, &mut self.write)?;
- self.len += 1;
+ self.term_delta_encoder.encode(key);
Ok(())
}
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
- self.term_info_encoder.encode(term_info.clone(), &mut self.write)?;
+ let delta_term_info = self.term_info_encoder.encode(term_info.clone());
+ let (prefix_len, suffix) = self.term_delta_encoder.prefix_suffix();
+ write_term_kv(prefix_len, suffix, &delta_term_info, self.term_info_encoder.has_positions, &mut self.write)?;
+ self.len += 1;
Ok(())
}
}
+fn num_bytes_required(mut n: u32) -> u8 {
+ for i in 1u8..5u8 {
+ if n < 256u32 {
+ return i;
+ }
+ else {
+ n /= 256;
+ }
+ }
+ 0u8
+}
+
+fn write_term_kv<W: Write>(prefix_len: usize,
+ suffix: &[u8],
+ delta_term_info: &DeltaTermInfo,
+ has_positions: bool,
+ write: &mut W) -> io::Result<()> {
+ let suffix_len = suffix.len();
+ let mut code = 0u8;
+ let num_bytes_docfreq = num_bytes_required(delta_term_info.doc_freq);
+ let num_bytes_postings_offset = num_bytes_required(delta_term_info.delta_postings_offset);
+ let num_bytes_positions_offset = num_bytes_required(delta_term_info.delta_positions_offset);
+ code |= (num_bytes_docfreq - 1) << 1u8;
+ code |= (num_bytes_postings_offset - 1) << 3u8;
+ code |= (num_bytes_positions_offset - 1) << 5u8;
+ if (prefix_len < 16) && (suffix_len < 16) {
+ code |= 1u8;
+ write.write_all(&[code, (prefix_len as u8) | ((suffix_len as u8) << 4u8)])?;
+ }
+ else {
+ write.write_all(&[code])?;
+ (prefix_len as u32).serialize(write)?;
+ (suffix_len as u32).serialize(write)?;
+ }
+ write.write_all(suffix)?;
+ {
+ let bytes: [u8; 4] = unsafe { transmute(delta_term_info.doc_freq) };
+ write.write_all(&bytes[0..num_bytes_docfreq as usize])?;
+ }
+ {
+ let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_postings_offset) };
+ write.write_all(&bytes[0..num_bytes_postings_offset as usize])?;
+ }
+ if has_positions {
+ let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_positions_offset) };
+ write.write_all(&bytes[0..num_bytes_positions_offset as usize])?;
+ write.write_all(&[delta_term_info.positions_inner_offset])?;
+ }
+ Ok(())
+
+}
+
impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
where W: Write
{
@@ -116,7 +173,8 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
fn finish(mut self) -> io::Result<W> {
- self.add_index_entry();
+ self.write.write_all(&[0u8; PADDING_SIZE])?;
+ // self.add_index_entry();
let (mut w, split_len) = self.write.finish()?;
let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?;
w.write_all(&fst_write)?;
@@ -224,9 +282,10 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl
let fst_data = source.slice(split_len, length_offset);
let fst_index = open_fst_index(fst_data)?;
+ let len_without_padding = stream_data.len() - PADDING_SIZE;
Ok(TermDictionaryImpl {
has_positions: has_positions,
- stream_data: stream_data,
+ stream_data: stream_data.slice(0, len_without_padding),
fst_index: fst_index,
})
}
@@ -250,3 +309,18 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl
Self::StreamBuilder::new(self, self.has_positions)
}
}
+
+
+#[cfg(test)]
+mod tests {
+ use super::num_bytes_required;
+
+ #[test]
+ fn test_num_bytes_required() {
+ assert_eq!(num_bytes_required(0), 1);
+ assert_eq!(num_bytes_required(1), 1);
+ assert_eq!(num_bytes_required(255), 1);
+ assert_eq!(num_bytes_required(256), 2);
+ assert_eq!(num_bytes_required(u32::max_value()), 4);
+ }
+} \ No newline at end of file