diff options
author | Paul Masurel <paul.masurel@gmail.com> | 2017-08-27 18:44:37 +0900 |
---|---|---|
committer | Paul Masurel <paul.masurel@gmail.com> | 2017-08-27 18:44:37 +0900 |
commit | 69351fb4a59681ee309c344a6afa5dd7e14d1cc8 (patch) | |
tree | 6ac7ac795c0c4ec1708dd4339a6d903246e40d11 /src/termdict | |
parent | 3d0082d0202c8a0c0b6b82c503605c427b6d72bd (diff) |
Toward a new codec
Diffstat (limited to 'src/termdict')
-rw-r--r-- | src/termdict/streamdict/delta_encoder.rs | 97 | ||||
-rw-r--r-- | src/termdict/streamdict/mod.rs | 3 | ||||
-rw-r--r-- | src/termdict/streamdict/streamer.rs | 27 | ||||
-rw-r--r-- | src/termdict/streamdict/termdict.rs | 86 |
4 files changed, 153 insertions, 60 deletions
diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs index 7418e4f..3415296 100644 --- a/src/termdict/streamdict/delta_encoder.rs +++ b/src/termdict/streamdict/delta_encoder.rs @@ -1,7 +1,4 @@ use postings::TermInfo; -use common::VInt; -use common::BinarySerializable; -use std::io::{self, Write}; use std::mem; /// Returns the len of the longest @@ -20,23 +17,23 @@ fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize { #[derive(Default)] pub struct TermDeltaEncoder { last_term: Vec<u8>, + prefix_len: usize, } impl TermDeltaEncoder { - pub fn encode<'a, W: Write>(&mut self, term: &'a [u8], write: &mut W) -> io::Result<()> { - let prefix_len = common_prefix_len(term, &self.last_term); - self.last_term.truncate(prefix_len); - self.last_term.extend_from_slice(&term[prefix_len..]); - let suffix = &term[prefix_len..]; - VInt(prefix_len as u64).serialize(write)?; - VInt(suffix.len() as u64).serialize(write)?; - write.write_all(suffix)?; - Ok(()) + pub fn encode<'a>(&mut self, term: &'a [u8]) { + self.prefix_len = common_prefix_len(term, &self.last_term); + self.last_term.truncate(self.prefix_len); + self.last_term.extend_from_slice(&term[self.prefix_len..]); } pub fn term(&self) -> &[u8] { &self.last_term[..] } + + pub fn prefix_suffix(&mut self) -> (usize, &[u8]) { + (self.prefix_len, &self.last_term[self.prefix_len..]) + } } #[derive(Default)] @@ -51,11 +48,7 @@ impl TermDeltaDecoder { } } - pub fn decode(&mut self, cursor: &mut &[u8]) { - let prefix_len: usize = deserialize_vint(cursor) as usize; - let suffix_length: usize = deserialize_vint(cursor) as usize; - let suffix = &cursor[..suffix_length]; - *cursor = &cursor[suffix_length..]; + pub fn decode(&mut self, prefix_len: usize, suffix: &[u8]) { self.term.truncate(prefix_len); self.term.extend_from_slice(suffix); } @@ -65,11 +58,17 @@ impl TermDeltaDecoder { } } - +#[derive(Default)] +pub struct DeltaTermInfo { + pub doc_freq: u32, + pub delta_postings_offset: u32, + pub delta_positions_offset: u32, + pub positions_inner_offset: u8, +} pub struct TermInfoDeltaEncoder { term_info: TermInfo, - has_positions: bool, + pub has_positions: bool, } impl TermInfoDeltaEncoder { @@ -81,34 +80,22 @@ impl TermInfoDeltaEncoder { } } - pub fn encode<W: Write>(&mut self, term_info: TermInfo, write: &mut W) -> io::Result<()> { - VInt(term_info.doc_freq as u64).serialize(write)?; - let delta_postings_offset = term_info.postings_offset - self.term_info.postings_offset; - VInt(delta_postings_offset as u64).serialize(write)?; + pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo { + let mut delta_term_info = DeltaTermInfo { + doc_freq: term_info.doc_freq, + delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset, + delta_positions_offset: 0, + positions_inner_offset: 0, + }; if self.has_positions { - let delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset; - VInt(delta_positions_offset as u64).serialize(write)?; - write.write(&[term_info.positions_inner_offset])?; + delta_term_info.delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset; + delta_term_info.positions_inner_offset = term_info.positions_inner_offset; } mem::replace(&mut self.term_info, term_info); - Ok(()) + delta_term_info } } -fn deserialize_vint(data: &mut &[u8]) -> u64 { - let mut res = 0; - let mut shift = 0; - for i in 0.. { - let b = data[i]; - res |= ((b % 128u8) as u64) << shift; - if b & 128u8 != 0u8 { - *data = &data[(i + 1)..]; - break; - } - shift += 7; - } - res -} pub struct TermInfoDeltaDecoder { term_info: TermInfo, @@ -123,17 +110,27 @@ impl TermInfoDeltaDecoder { } } - pub fn decode(&mut self, cursor: &mut &[u8]) { - let doc_freq = deserialize_vint(cursor) as u32; + pub fn decode(&mut self, code: u8, cursor: &mut &[u8]) { + let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize; + let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize; + const MASK: [u32; 4] = [ + 0xffu32, + 0xffffu32, + 0xffffffu32, + 0xffffffffu32, + ]; + let doc_freq: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_docfreq]; + *cursor = &cursor[num_bytes_docfreq + 1 ..]; + let delta_postings_offset: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_postings_offset]; + *cursor = &cursor[num_bytes_postings_offset + 1..]; self.term_info.doc_freq = doc_freq; - let delta_postings = deserialize_vint(cursor) as u32; - self.term_info.postings_offset += delta_postings; + self.term_info.postings_offset += delta_postings_offset; if self.has_positions { - let delta_positions = deserialize_vint(cursor) as u32; - self.term_info.positions_offset += delta_positions; - let position_inner_offset = cursor[0]; - *cursor = &cursor[1..]; - self.term_info.positions_inner_offset = position_inner_offset; + let num_bytes_positions_offset = ((code >> 5) & 3) as usize; + let delta_positions_offset: u32 = unsafe { *(cursor.as_ptr() as *const u32) } & MASK[num_bytes_positions_offset]; + self.term_info.positions_offset += delta_positions_offset; + self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset + 1]; + *cursor = &cursor[num_bytes_positions_offset + 2..]; } } diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index 1c9a148..faf9c13 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -3,9 +3,10 @@ mod streamer; mod delta_encoder; pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder}; -pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder}; +pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder, DeltaTermInfo}; pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; pub use self::streamer::TermStreamerBuilderImpl; + diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 1363bf5..4779a65 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -4,9 +4,9 @@ use std::cmp::max; use super::TermDictionaryImpl; use termdict::{TermStreamerBuilder, TermStreamer}; use postings::TermInfo; +use common::BinarySerializable; use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder}; - fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, target_key: &[u8], has_positions: bool) @@ -155,8 +155,29 @@ impl<'a> TermStreamer for TermStreamerImpl<'a> if self.cursor.is_empty() { return false; } - self.term_delta_decoder.decode(&mut self.cursor); - self.term_info_decoder.decode(&mut self.cursor); + let code: u8 = self.cursor[0]; + let mut cursor: &[u8] = &self.cursor[1..]; + + let prefix_suffix_packed = (code & 1u8) == 1u8; + let (prefix_len, suffix_len): (usize, usize) = + if prefix_suffix_packed { + let b = cursor[0]; + cursor = &cursor[1..]; + let prefix_len = (b & 15u8) as usize; + let suffix_len = (b >> 4u8) as usize; + (prefix_len, suffix_len) + } + else { + let prefix_len = u32::deserialize(&mut cursor).unwrap(); + let suffix_len = u32::deserialize(&mut cursor).unwrap(); + (prefix_len as usize, suffix_len as usize) + }; + + let suffix = &cursor[..suffix_len]; + self.term_delta_decoder.decode(prefix_len, suffix); + cursor = &cursor[suffix_len..]; + self.term_info_decoder.decode(code, &mut cursor); + self.cursor = cursor; true } diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index ab5f9df..e5487b5 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -2,18 +2,21 @@ use std::io::{self, Write}; use fst; + use fst::raw::Fst; use directory::ReadOnlySource; use common::BinarySerializable; use common::CountingWriter; use postings::TermInfo; use schema::FieldType; -use super::{TermDeltaEncoder, TermInfoDeltaEncoder}; +use super::{TermDeltaEncoder, TermInfoDeltaEncoder, DeltaTermInfo}; use fst::raw::Node; use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use super::{TermStreamerImpl, TermStreamerBuilderImpl}; use termdict::TermStreamerBuilder; +use std::mem::transmute; +const PADDING_SIZE: usize = 16; const INDEX_INTERVAL: usize = 1024; fn convert_fst_error(e: fst::Error) -> io::Error { @@ -75,17 +78,71 @@ impl<W> TermDictionaryBuilderImpl<W> if self.len % INDEX_INTERVAL == 0 { self.add_index_entry(); } - self.term_delta_encoder.encode(key, &mut self.write)?; - self.len += 1; + self.term_delta_encoder.encode(key); Ok(()) } pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { - self.term_info_encoder.encode(term_info.clone(), &mut self.write)?; + let delta_term_info = self.term_info_encoder.encode(term_info.clone()); + let (prefix_len, suffix) = self.term_delta_encoder.prefix_suffix(); + write_term_kv(prefix_len, suffix, &delta_term_info, self.term_info_encoder.has_positions, &mut self.write)?; + self.len += 1; Ok(()) } } +fn num_bytes_required(mut n: u32) -> u8 { + for i in 1u8..5u8 { + if n < 256u32 { + return i; + } + else { + n /= 256; + } + } + 0u8 +} + +fn write_term_kv<W: Write>(prefix_len: usize, + suffix: &[u8], + delta_term_info: &DeltaTermInfo, + has_positions: bool, + write: &mut W) -> io::Result<()> { + let suffix_len = suffix.len(); + let mut code = 0u8; + let num_bytes_docfreq = num_bytes_required(delta_term_info.doc_freq); + let num_bytes_postings_offset = num_bytes_required(delta_term_info.delta_postings_offset); + let num_bytes_positions_offset = num_bytes_required(delta_term_info.delta_positions_offset); + code |= (num_bytes_docfreq - 1) << 1u8; + code |= (num_bytes_postings_offset - 1) << 3u8; + code |= (num_bytes_positions_offset - 1) << 5u8; + if (prefix_len < 16) && (suffix_len < 16) { + code |= 1u8; + write.write_all(&[code, (prefix_len as u8) | ((suffix_len as u8) << 4u8)])?; + } + else { + write.write_all(&[code])?; + (prefix_len as u32).serialize(write)?; + (suffix_len as u32).serialize(write)?; + } + write.write_all(suffix)?; + { + let bytes: [u8; 4] = unsafe { transmute(delta_term_info.doc_freq) }; + write.write_all(&bytes[0..num_bytes_docfreq as usize])?; + } + { + let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_postings_offset) }; + write.write_all(&bytes[0..num_bytes_postings_offset as usize])?; + } + if has_positions { + let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_positions_offset) }; + write.write_all(&bytes[0..num_bytes_positions_offset as usize])?; + write.write_all(&[delta_term_info.positions_inner_offset])?; + } + Ok(()) + +} + impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W> where W: Write { @@ -116,7 +173,8 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W> /// Finalize writing the builder, and returns the underlying /// `Write` object. fn finish(mut self) -> io::Result<W> { - self.add_index_entry(); + self.write.write_all(&[0u8; PADDING_SIZE])?; + // self.add_index_entry(); let (mut w, split_len) = self.write.finish()?; let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?; w.write_all(&fst_write)?; @@ -224,9 +282,10 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl let fst_data = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_data)?; + let len_without_padding = stream_data.len() - PADDING_SIZE; Ok(TermDictionaryImpl { has_positions: has_positions, - stream_data: stream_data, + stream_data: stream_data.slice(0, len_without_padding), fst_index: fst_index, }) } @@ -250,3 +309,18 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl Self::StreamBuilder::new(self, self.has_positions) } } + + +#[cfg(test)] +mod tests { + use super::num_bytes_required; + + #[test] + fn test_num_bytes_required() { + assert_eq!(num_bytes_required(0), 1); + assert_eq!(num_bytes_required(1), 1); + assert_eq!(num_bytes_required(255), 1); + assert_eq!(num_bytes_required(256), 2); + assert_eq!(num_bytes_required(u32::max_value()), 4); + } +}
\ No newline at end of file |