summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul Masurel <paul.masurel@gmail.com>2019-01-13 14:41:56 +0900
committerGitHub <noreply@github.com>2019-01-13 14:41:56 +0900
commit98ca703daacede2350e08bb5c9ad71cbadbe3753 (patch)
tree70c2d7b13cf4fc7cd2ccb68c061c1229658790a6
parentb9d25cda5d3f52c63ac6aa84e91a61c190fb6246 (diff)
More efficient indexing (#462)
* Using unrolled u32 VInt and caching Vec s * cargo fmt * Exposing a io::Write in the Expull thing * expull as a writer. clippy + format * inline the first block * simplified -if let Some- * vint reader iterator
-rw-r--r--src/common/mod.rs2
-rw-r--r--src/common/vint.rs108
-rw-r--r--src/indexer/index_writer.rs35
-rw-r--r--src/postings/postings_writer.rs14
-rw-r--r--src/postings/recorder.rs205
-rw-r--r--src/postings/stacker/expull.rs316
-rw-r--r--src/postings/stacker/memory_arena.rs13
-rw-r--r--src/postings/stacker/term_hashmap.rs51
-rw-r--r--src/schema/facet.rs2
-rw-r--r--src/termdict/term_info_store.rs17
10 files changed, 558 insertions, 205 deletions
diff --git a/src/common/mod.rs b/src/common/mod.rs
index 2942438..82d4cbb 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -10,7 +10,7 @@ pub(crate) use self::bitset::TinySet;
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::counting_writer::CountingWriter;
pub use self::serialize::{BinarySerializable, FixedSize};
-pub use self::vint::VInt;
+pub use self::vint::{read_vint_u32, serialize_vint_u32, write_u32_vint, VInt};
pub use byteorder::LittleEndian as Endianness;
use std::io;
diff --git a/src/common/vint.rs b/src/common/vint.rs
index 7b782a9..59356c6 100644
--- a/src/common/vint.rs
+++ b/src/common/vint.rs
@@ -1,4 +1,5 @@
use super::BinarySerializable;
+use byteorder::{ByteOrder, LittleEndian};
use std::io;
use std::io::Read;
use std::io::Write;
@@ -9,6 +10,83 @@ pub struct VInt(pub u64);
const STOP_BIT: u8 = 128;
+pub fn serialize_vint_u32(val: u32) -> (u64, usize) {
+ const START_2: u64 = 1 << 7;
+ const START_3: u64 = 1 << 14;
+ const START_4: u64 = 1 << 21;
+ const START_5: u64 = 1 << 28;
+
+ const STOP_1: u64 = START_2 - 1;
+ const STOP_2: u64 = START_3 - 1;
+ const STOP_3: u64 = START_4 - 1;
+ const STOP_4: u64 = START_5 - 1;
+
+ const MASK_1: u64 = 127;
+ const MASK_2: u64 = MASK_1 << 7;
+ const MASK_3: u64 = MASK_2 << 7;
+ const MASK_4: u64 = MASK_3 << 7;
+ const MASK_5: u64 = MASK_4 << 7;
+
+ let val = u64::from(val);
+ const STOP_BIT: u64 = 128u64;
+ match val {
+ 0...STOP_1 => (val | STOP_BIT, 1),
+ START_2...STOP_2 => (
+ (val & MASK_1) | ((val & MASK_2) << 1) | (STOP_BIT << (8)),
+ 2,
+ ),
+ START_3...STOP_3 => (
+ (val & MASK_1) | ((val & MASK_2) << 1) | ((val & MASK_3) << 2) | (STOP_BIT << (8 * 2)),
+ 3,
+ ),
+ START_4...STOP_4 => (
+ (val & MASK_1)
+ | ((val & MASK_2) << 1)
+ | ((val & MASK_3) << 2)
+ | ((val & MASK_4) << 3)
+ | (STOP_BIT << (8 * 3)),
+ 4,
+ ),
+ _ => (
+ (val & MASK_1)
+ | ((val & MASK_2) << 1)
+ | ((val & MASK_3) << 2)
+ | ((val & MASK_4) << 3)
+ | ((val & MASK_5) << 4)
+ | (STOP_BIT << (8 * 4)),
+ 5,
+ ),
+ }
+}
+
+fn vint_len(data: &[u8]) -> usize {
+ for i in 0..5.min(data.len()) {
+ if data[i] >= STOP_BIT {
+ return i + 1;
+ }
+ }
+ panic!("Corrupted data. Invalid VInt 32");
+}
+
+pub fn read_vint_u32(data: &mut &[u8]) -> u32 {
+ let vlen = vint_len(*data);
+ let mut result = 0u32;
+ let mut shift = 0u64;
+ for b in data[..vlen].iter().cloned().map(|b| b & 127u8) {
+ result |= (b as u32) << shift;
+ shift += 7;
+ }
+ *data = &data[vlen..];
+ result
+}
+
+pub fn write_u32_vint<W: io::Write>(val: u32, writer: &mut W) -> io::Result<()> {
+ let (val, num_bytes) = serialize_vint_u32(val);
+ let mut buffer = [0u8; 8];
+ LittleEndian::write_u64(&mut buffer, val);
+ writer.write_all(&buffer[..num_bytes])
+}
+
impl VInt {
pub fn val(&self) -> u64 {
self.0
@@ -24,7 +102,7 @@ impl VInt {
output.extend(&buffer[0..num_bytes]);
}
- fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
+ pub fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
let mut remaining = self.0;
for (i, b) in buffer.iter_mut().enumerate() {
let next_byte: u8 = (remaining % 128u64) as u8;
@@ -64,7 +142,7 @@ impl BinarySerializable for VInt {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Reach end of buffer while reading VInt",
- ))
+ ));
}
}
}
@@ -74,7 +152,9 @@ impl BinarySerializable for VInt {
#[cfg(test)]
mod tests {
+ use super::serialize_vint_u32;
use super::VInt;
+ use byteorder::{ByteOrder, LittleEndian};
use common::BinarySerializable;
fn aux_test_vint(val: u64) {
@@ -108,4 +188,28 @@ mod tests {
}
aux_test_vint(10);
}
+
+ fn aux_test_serialize_vint_u32(val: u32) {
+ let mut buffer = [0u8; 10];
+ let mut buffer2 = [0u8; 10];
+ let len_vint = VInt(val as u64).serialize_into(&mut buffer);
+ let (vint, len) = serialize_vint_u32(val);
+ assert_eq!(len, len_vint, "len wrong for val {}", val);
+ LittleEndian::write_u64(&mut buffer2, vint);
+ assert_eq!(&buffer[..len], &buffer2[..len], "array wrong for {}", val);
+ }
+
+ #[test]
+ fn test_vint_u32() {
+ aux_test_serialize_vint_u32(0);
+ aux_test_serialize_vint_u32(1);
+ aux_test_serialize_vint_u32(5);
+ for i in 1..3 {
+ let power_of_128 = 1u32 << (7 * i);
+ aux_test_serialize_vint_u32(power_of_128 - 1u32);
+ aux_test_serialize_vint_u32(power_of_128);
+ aux_test_serialize_vint_u32(power_of_128 + 1u32);
+ }
+ aux_test_serialize_vint_u32(u32::max_value());
+ }
}
diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs
index e453dc1..5f5d450 100644
--- a/src/indexer/index_writer.rs
+++ b/src/indexer/index_writer.rs
@@ -52,17 +52,19 @@ type DocumentReceiver = channel::Receiver<AddOperation>;
///
/// Returns (the heap size in bytes, the hash table size in number of bits)
fn initial_table_size(per_thread_memory_budget: usize) -> usize {
+ assert!(per_thread_memory_budget > 1_000);
let table_size_limit: usize = per_thread_memory_budget / 3;
- (1..)
+ if let Some(limit) = (1..)
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
.last()
- .unwrap_or_else(|| {
- panic!(
- "Per thread memory is too small: {}",
- per_thread_memory_budget
- )
- })
- .min(19) // we cap it at 512K
+ {
+ limit.min(19) // we cap it at 2^19 = 512K.
+ } else {
+ unreachable!(
+ "Per thread memory is too small: {}",
+ per_thread_memory_budget
+ );
+ }
}
/// `IndexWriter` is the user entry-point to add document to an index.
@@ -302,7 +304,7 @@ fn index_documents(
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
- let segment_entry: SegmentEntry = if delete_cursor.get().is_some() {
+ let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let segment_reader = SegmentReader::open(segment)?;
let mut deleted_bitset = BitSet::with_capacity(num_docs as usize);
@@ -313,18 +315,17 @@ fn index_documents(
&doc_to_opstamps,
last_docstamp,
)?;
- SegmentEntry::new(segment_meta, delete_cursor, {
- if may_have_deletes {
- Some(deleted_bitset)
- } else {
- None
- }
- })
+ if may_have_deletes {
+ Some(deleted_bitset)
+ } else {
+ None
+ }
} else {
// if there are no delete operation in the queue, no need
// to even open the segment.
- SegmentEntry::new(segment_meta, delete_cursor, None)
+ None
};
+ let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
Ok(segment_updater.add_segment(generation, segment_entry))
}
diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs
index 694ca94..c408c15 100644
--- a/src/postings/postings_writer.rs
+++ b/src/postings/postings_writer.rs
@@ -1,6 +1,8 @@
use super::stacker::{Addr, MemoryArena, TermHashMap};
-use postings::recorder::{NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder};
+use postings::recorder::{
+ BufferLender, NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder,
+};
use postings::UnorderedTermId;
use postings::{FieldSerializer, InvertedIndexSerializer};
use schema::IndexRecordOption;
@@ -213,7 +215,7 @@ pub trait PostingsWriter {
/// The `SpecializedPostingsWriter` is just here to remove dynamic
/// dispatch to the recorder information.
-pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
+pub(crate) struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
}
@@ -245,8 +247,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
term_index.mutate_or_create(term, |opt_recorder: Option<Rec>| {
- if opt_recorder.is_some() {
- let mut recorder = opt_recorder.unwrap();
+ if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(heap);
@@ -255,7 +256,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
recorder.record_position(position, heap);
recorder
} else {
- let mut recorder = Rec::new(heap);
+ let mut recorder = Rec::new();
recorder.new_doc(doc, heap);
recorder.record_position(position, heap);
recorder
@@ -270,10 +271,11 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
termdict_heap: &MemoryArena,
heap: &MemoryArena,
) -> io::Result<()> {
+ let mut buffer_lender = BufferLender::default();
for &(term_bytes, addr, _) in term_addrs {
let recorder: Rec = termdict_heap.read(addr);
serializer.new_term(&term_bytes[4..])?;
- recorder.serialize(serializer, heap)?;
+ recorder.serialize(&mut buffer_lender, serializer, heap)?;
serializer.close_term()?;
}
Ok(())
diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs
index 37a186e..915539a 100644
--- a/src/postings/recorder.rs
+++ b/src/postings/recorder.rs
@@ -1,10 +1,51 @@
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
+use common::{read_vint_u32, write_u32_vint};
use postings::FieldSerializer;
-use std::{self, io};
+use std::io;
use DocId;
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
-const POSITION_END: u32 = std::u32::MAX;
+const POSITION_END: u32 = 0;
+
+#[derive(Default)]
+pub(crate) struct BufferLender {
+ buffer_u8: Vec<u8>,
+ buffer_u32: Vec<u32>,
+}
+
+impl BufferLender {
+ pub fn lend_u8(&mut self) -> &mut Vec<u8> {
+ self.buffer_u8.clear();
+ &mut self.buffer_u8
+ }
+ pub fn lend_all(&mut self) -> (&mut Vec<u8>, &mut Vec<u32>) {
+ self.buffer_u8.clear();
+ self.buffer_u32.clear();
+ (&mut self.buffer_u8, &mut self.buffer_u32)
+ }
+}
+
+pub struct VInt32Reader<'a> {
+ data: &'a [u8],
+}
+
+impl<'a> VInt32Reader<'a> {
+ fn new(data: &'a [u8]) -> VInt32Reader<'a> {
+ VInt32Reader { data }
+ }
+}
+
+impl<'a> Iterator for VInt32Reader<'a> {
+ type Item = u32;
+
+ fn next(&mut self) -> Option<u32> {
+ if self.data.is_empty() {
+ None
+ } else {
+ Some(read_vint_u32(&mut self.data))
+ }
+ }
+}
/// Recorder is in charge of recording relevant information about
/// the presence of a term in a document.
@@ -15,9 +56,9 @@ const POSITION_END: u32 = std::u32::MAX;
/// * the document id
/// * the term frequency
/// * the term positions
-pub trait Recorder: Copy + 'static {
+pub(crate) trait Recorder: Copy + 'static {
///
- fn new(heap: &mut MemoryArena) -> Self;
+ fn new() -> Self;
/// Returns the current document
fn current_doc(&self) -> u32;
/// Starts recording information about a new document
@@ -29,7 +70,12 @@ pub trait Recorder: Copy + 'static {
/// Close the document. It will help record the term frequency.
fn close_doc(&mut self, heap: &mut MemoryArena);
/// Pushes the postings information to the serializer.
- fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()>;
+ fn serialize(
+ &self,
+ buffer_lender: &mut BufferLender,
+ serializer: &mut FieldSerializer,
+ heap: &MemoryArena,
+ ) -> io::Result<()>;
}
/// Only records the doc ids
@@ -40,9 +86,9 @@ pub struct NothingRecorder {
}
impl Recorder for NothingRecorder {
- fn new(heap: &mut MemoryArena) -> Self {
+ fn new() -> Self {
NothingRecorder {
- stack: ExpUnrolledLinkedList::new(heap),
+ stack: ExpUnrolledLinkedList::new(),
current_doc: u32::max_value(),
}
}
@@ -53,16 +99,23 @@ impl Recorder for NothingRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
- self.stack.push(doc, heap);
+ let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {}
fn close_doc(&mut self, _heap: &mut MemoryArena) {}
- fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
- for doc in self.stack.iter(heap) {
- serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)?;
+ fn serialize(
+ &self,
+ buffer_lender: &mut BufferLender,
+ serializer: &mut FieldSerializer,
+ heap: &MemoryArena,
+ ) -> io::Result<()> {
+ let buffer = buffer_lender.lend_u8();
+ self.stack.read_to_end(heap, buffer);
+ for doc in VInt32Reader::new(&buffer[..]) {
+ serializer.write_doc(doc as u32, 0u32, &EMPTY_ARRAY)?;
}
Ok(())
}
@@ -77,9 +130,9 @@ pub struct TermFrequencyRecorder {
}
impl Recorder for TermFrequencyRecorder {
- fn new(heap: &mut MemoryArena) -> Self {
+ fn new() -> Self {
TermFrequencyRecorder {
- stack: ExpUnrolledLinkedList::new(heap),
+ stack: ExpUnrolledLinkedList::new(),
current_doc: u32::max_value(),
current_tf: 0u32,
}
@@ -91,7 +144,7 @@ impl Recorder for TermFrequencyRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
- self.stack.push(doc, heap);
+ let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {
@@ -100,24 +153,24 @@ impl Recorder for TermFrequencyRecorder {
fn close_doc(&mut self, heap: &mut MemoryArena) {
debug_assert!(self.current_tf > 0);
- self.stack.push(self.current_tf, heap);
+ let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap));
self.current_tf = 0;
}
- fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
- // the last document has not been closed...
- // its term freq is self.current_tf.
- let mut doc_iter = self
- .stack
- .iter(heap)
- .chain(Some(self.current_tf).into_iter());
-
- while let Some(doc) = doc_iter.next() {
- let term_freq = doc_iter
- .next()
- .expect("The IndexWriter recorded a doc without a term freq.");
- serializer.write_doc(doc, term_freq, &EMPTY_ARRAY)?;
+ fn serialize(
+ &self,
+ buffer_lender: &mut BufferLender,
+ serializer: &mut FieldSerializer,
+ heap: &MemoryArena,
+ ) -> io::Result<()> {
+ let buffer = buffer_lender.lend_u8();
+ self.stack.read_to_end(heap, buffer);
+ let mut u32_it = VInt32Reader::new(&buffer[..]);
+ while let Some(doc) = u32_it.next() {
+ let term_freq = u32_it.next().unwrap_or(self.current_tf);
+ serializer.write_doc(doc as u32, term_freq, &EMPTY_ARRAY)?;
}
+
Ok(())
}
}
@@ -128,11 +181,10 @@ pub struct TFAndPositionRecorder {
stack: ExpUnrolledLinkedList,
current_doc: DocId,
}
-
impl Recorder for TFAndPositionRecorder {
- fn new(heap: &mut MemoryArena) -> Self {
+ fn new() -> Self {
TFAndPositionRecorder {
- stack: ExpUnrolledLinkedList::new(heap),
+ stack: ExpUnrolledLinkedList::new(),
current_doc: u32::max_value(),
}
}
@@ -143,33 +195,88 @@ impl Recorder for TFAndPositionRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
- self.stack.push(doc, heap);
+ let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
}
fn record_position(&mut self, position: u32, heap: &mut MemoryArena) {
- self.stack.push(position, heap);
+ let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap));
}
fn close_doc(&mut self, heap: &mut MemoryArena) {
- self.stack.push(POSITION_END, heap);
- }
-
- fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
- let mut doc_positions = Vec::with_capacity(100);
- let mut positions_iter = self.stack.iter(heap);
- while let Some(doc) = positions_iter.next() {
- let mut prev_position = 0;
- doc_positions.clear();
- for position in &mut positions_iter {
- if position == POSITION_END {
- break;
- } else {
- doc_positions.push(position - prev_position);
- prev_position = position;
+ let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap));
+ }
+
+ fn serialize(
+ &self,
+ buffer_lender: &mut BufferLender,
+ serializer: &mut FieldSerializer,
+ heap: &MemoryArena,
+ ) -> io::Result<()> {
+ let (buffer_u8, buffer_positions) = buffer_lender.lend_all();
+ self.stack.read_to_end(heap, buffer_u8);
+ let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
+ while let Some(doc) = u32_it.next() {
+ let mut prev_position_plus_one = 1u32;
+ buffer_positions.clear();
+ loop {
+ match u32_it.next() {
+ Some(POSITION_END) | None => {
+ break;
+ }
+ Some(position_plus_one) => {
+ let delta_position = position_plus_one - prev_position_plus_one;
+ buffer_positions.push(delta_position);
+ prev_position_plus_one = position_plus_one;
+ }
}
}
- serializer.write_doc(doc, doc_positions.len() as u32, &doc_positions)?;
+ serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions)?;
}
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+
+ use super::write_u32_vint;
+ use super::BufferLender;
+ use super::VInt32Reader;
+
+ #[test]
+ fn test_buffer_lender() {
+ let mut buffer_lender = BufferLender::default();
+ {
+ let buf = buffer_lender.lend_u8();
+ assert!(buf.is_empty());
+ buf.push(1u8);
+ }
+ {
+ let buf = buffer_lender.lend_u8();
+ assert!(buf.is_empty());
+ buf.push(1u8);
+ }
+ {
+ let (_, buf) = buffer_lender.lend_all();
+ assert!(buf.is_empty());
+ buf.push(1u32);
+ }
+ {
+ let (_, buf) = buffer_lender.lend_all();
+ assert!(buf.is_empty());
+ buf.push(1u32);
+ }
+ }
+
+ #[test]
+ fn test_vint_u32() {
+ let mut buffer = vec![];
+ let vals = [0, 1, 324_234_234, u32::max_value()];
+ for &i in &vals {
+ assert!(write_u32_vint(i, &mut buffer).is_ok());
+ }
+ assert_eq!(buffer.len(), 1 + 1 + 5 + 5);
+ let res: Vec<u32> = VInt32Reader::new(&buffer[..]).collect();
+ assert_eq!(&res[..], &vals[..]);
+ }
+}
diff --git a/src/postings/stacker/expull.rs b/src/postings/stacker/expull.rs
index 759325a..58fa6e1 100644
--- a/src/postings/stacker/expull.rs
+++ b/src/postings/stacker/expull.rs
@@ -1,28 +1,37 @@
use super::{Addr, MemoryArena};
-use common::is_power_of_2;
+use postings::stacker::memory_arena::load;
+use postings::stacker::memory_arena::store;
+use std::io;
use std::mem;
const MAX_BLOCK_LEN: u32 = 1u32 << 15;
+const FIRST_BLOCK: usize = 16;
+const INLINED_BLOCK_LEN: usize = FIRST_BLOCK + mem::size_of::<Addr>();
-const FIRST_BLOCK: u32 = 4u32;
+enum CapacityResult {
+ Available(u32),
+ NeedAlloc(u32),
+}
-#[inline]
-pub fn jump_needed(len: u32) -> Option<usize> {
+fn len_to_capacity(len: u32) -> CapacityResult {
match len {
- 0...3 => None,
- 4...MAX_BLOCK_LEN => {
- if is_power_of_2(len as usize) {
- Some(len as usize)
+ 0...15 => CapacityResult::Available(FIRST_BLOCK as u32 - len),
+ 16...MAX_BLOCK_LEN => {
+ let cap = 1 << (32u32 - (len - 1u32).leading_zeros());
+ let available = cap - len;
+ if available == 0 {
+ CapacityResult::NeedAlloc(len)
} else {
- None
+ CapacityResult::Available(available)
}
}
n => {
- if n % MAX_BLOCK_LEN == 0 {
- Some(MAX_BLOCK_LEN as usize)
+ let available = n % MAX_BLOCK_LEN;
+ if available == 0 {
+ CapacityResult::NeedAlloc(MAX_BLOCK_LEN)
} else {
- None
+ CapacityResult::Available(MAX_BLOCK_LEN - available)
}
}
}
@@ -52,70 +61,119 @@ pub fn jump_needed(len: u32) -> Option<usize> {
#[derive(Debug, Clone, Copy)]
pub struct ExpUnrolledLinkedList {
len: u32,
- head: Addr,
tail: Addr,
+ inlined_data: [u8; INLINED_BLOCK_LEN as usize],
}
-impl ExpUnrolledLinkedList {
- pub fn new(heap: &mut MemoryArena) -> ExpUnrolledLinkedList {
- let addr = heap.allocate_space((FIRST_BLOCK as usize) * mem::size_of::<u32>());
- ExpUnrolledLinkedList {
- len: 0u32,
- head: addr,
- tail: addr,
- }
- }
+pub struct ExpUnrolledLinkedListWriter<'a> {
+ eull: &'a mut ExpUnrolledLinkedList,
+ heap: &'a mut MemoryArena,
+}
- pub fn iter<'a>(&self, heap: &'a MemoryArena) -> ExpUnrolledLinkedListIterator<'a> {
- ExpUnrolledLinkedListIterator {
- heap,
- addr: self.head,
- len: self.len,
- consumed: 0,
+fn ensure_capacity<'a>(
+ eull: &'a mut ExpUnrolledLinkedList,
+ heap: &'a mut MemoryArena,
+) -> &'a mut [u8] {
+ if eull.len <= FIRST_BLOCK as u32 {
+ // We are still hitting the inline block.
+ if eull.len < FIRST_BLOCK as u32 {
+ return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK];
}
+ // We need to allocate a new block!
+ let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::<Addr>());
+ store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr);
+ eull.tail = new_block_addr;
+ return heap.slice_mut(eull.tail, FIRST_BLOCK);
}
+ let len = match len_to_capacity(eull.len) {
+ CapacityResult::NeedAlloc(new_block_len) => {
+ let new_block_addr: Addr =
+ heap.allocate_space(new_block_len as usize + mem::size_of::<Addr>());
+ heap.write_at(eull.tail, new_block_addr);
+ eull.tail = new_block_addr;
+ new_block_len
+ }
+ CapacityResult::Available(available) => available,
+ };
+ heap.slice_mut(eull.tail, len as usize)
+}
- /// Appends a new element to the current stack.
- ///
- /// If the current block end is reached, a new block is allocated.
- pub fn push(&mut self, val: u32, heap: &mut MemoryArena) {
- self.len += 1;
- if let Some(new_block_len) = jump_needed(self.len) {
- // We need to allocate another block.
- // We also allocate an extra `u32` to store the pointer
- // to the future next block.
- let new_block_size: usize = (new_block_len + 1) * mem::size_of::<u32>();
- let new_block_addr: Addr = heap.allocate_space(new_block_size);
- heap.write_at(self.tail, new_block_addr);
- self.tail = new_block_addr;
+impl<'a> ExpUnrolledLinkedListWriter<'a> {
+ pub fn extend_from_slice(&mut self, mut buf: &[u8]) {
+ if buf.is_empty() {
+ // we need to cut early, because `ensure_capacity`
+ // allocates if there is no capacity at all right now.
+ return;
+ }
+ while !buf.is_empty() {
+ let add_len: usize;
+ {
+ let output_buf = ensure_capacity(self.eull, self.heap);
+ add_len = buf.len().min(output_buf.len());
+ output_buf[..add_len].copy_from_slice(&buf[..add_len]);
+ }
+ self.eull.len += add_len as u32;
+ self.eull.tail = self.eull.tail.offset(add_len as u32);
+ buf = &buf[add_len..];
}
- heap.write_at(self.tail, val);
- self.tail = self.tail.offset(mem::size_of::<u32>() as u32);
}
}
-pub struct ExpUnrolledLinkedListIterator<'a> {
- heap: &'a MemoryArena,
- addr: Addr,
- len: u32,
- consumed: u32,
+impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ // There is no use case to only write the capacity.
+ // This is not IO after all, so we write the whole
+ // buffer even if the contract of `.write` is looser.
+ self.extend_from_slice(buf);
+ Ok(buf.len())
+ }
+
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ self.extend_from_slice(buf);
+ Ok(())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
}
-impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
- type Item = u32;
+impl ExpUnrolledLinkedList {
+ pub fn new() -> ExpUnrolledLinkedList {
+ ExpUnrolledLinkedList {
+ len: 0u32,
+ tail: Addr::null_pointer(),
+ inlined_data: [0u8; INLINED_BLOCK_LEN as usize],
+ }
+ }
- fn next(&mut self) -> Option<u32> {
- if self.consumed == self.len {
- None
- } else {
- self.consumed += 1;
- let addr: Addr = if jump_needed(self.consumed).is_some() {
- self.heap.read(self.addr)
- } else {
- self.addr
- };
- self.addr = addr.offset(mem::size_of::<u32>() as u32);
- Some(self.heap.read(addr))
+ #[inline(always)]
+ pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
+ ExpUnrolledLinkedListWriter { eull: self, heap }
+ }
+
+ pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec<u8>) {
+ let len = self.len as usize;
+ if len <= FIRST_BLOCK {
+ output.extend_from_slice(&self.inlined_data[..len]);
+ return;
+ }
+ output.extend_from_slice(&self.inlined_data[..FIRST_BLOCK]);
+ let mut cur = FIRST_BLOCK;
+ let mut addr = load(&self.inlined_data[FIRST_BLOCK..]);
+ loop {
+ let cap = match len_to_capacity(cur as u32) {
+ CapacityResult::Available(capacity) => capacity,
+ CapacityResult::NeedAlloc(capacity) => capacity,
+ } as usize;
+ let data = heap.slice(addr, cap);
+ if cur + cap >= len {
+ output.extend_from_slice(&data[..(len - cur)]);
+ return;
+ }
+ output.extend_from_slice(data);
+ cur += cap;
+ addr = heap.read(addr.offset(cap as u32));
}
}
}
@@ -124,39 +182,126 @@ impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
mod tests {
use super::super::MemoryArena;
- use super::jump_needed;
+ use super::len_to_capacity;
use super::*;
+ use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
#[test]
+ #[test]
fn test_stack() {
let mut heap = MemoryArena::new();
- let mut stack = ExpUnrolledLinkedList::new(&mut heap);
- stack.push(1u32, &mut heap);
- stack.push(2u32, &mut heap);
- stack.push(4u32, &mut heap);
- stack.push(8u32, &mut heap);
+ let mut stack = ExpUnrolledLinkedList::new();
+ stack.writer(&mut heap).extend_from_slice(&[1u8]);
+ stack.writer(&mut heap).extend_from_slice(&[2u8]);
+ stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]);
+ stack.writer(&mut heap).extend_from_slice(&[5u8]);
{
- let mut it = stack.iter(&heap);
- assert_eq!(it.next().unwrap(), 1u32);
- assert_eq!(it.next().unwrap(), 2u32);
- assert_eq!(it.next().unwrap(), 4u32);
- assert_eq!(it.next().unwrap(), 8u32);
- assert!(it.next().is_none());
+ let mut buffer = Vec::new();
+ stack.read_to_end(&heap, &mut buffer);
+ assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]);
+ }
+ }
+
+ #[test]
+ fn test_stack_long() {
+ let mut heap = MemoryArena::new();
+ let mut stack = ExpUnrolledLinkedList::new();
+ let source: Vec<u32> = (0..100).collect();
+ for &el in &source {
+ assert!(stack
+ .writer(&mut heap)
+ .write_u32::<LittleEndian>(el)
+ .is_ok());
+ }
+ let mut buffer = Vec::new();
+ stack.read_to_end(&heap, &mut buffer);
+ let mut result = vec![];
+ let mut remaining = &buffer[..];
+ while !remaining.is_empty() {
+ result.push(LittleEndian::read_u32(&remaining[..4]));
+ remaining = &remaining[4..];
+ }
+ assert_eq!(&result[..], &source[..]);
+ }
+
+ #[test]
+ fn test_stack_interlaced() {
+ let mut heap = MemoryArena::new();
+ let mut stack = ExpUnrolledLinkedList::new();
+ let mut stack2 = ExpUnrolledLinkedList::new();
+
+ let mut vec1: Vec<u8> = vec![];
+ let mut vec2: Vec<u8> = vec![];
+
+ for i in 0..9 {
+ assert!(stack.writer(&mut heap).write_u32::<LittleEndian>(i).is_ok());
+ assert!(vec1.write_u32::<LittleEndian>(i).is_ok());
+ if i % 2 == 0 {
+ assert!(stack2
+ .writer(&mut heap)
+ .write_u32::<LittleEndian>(i)
+ .is_ok());
+ assert!(vec2.write_u32::<LittleEndian>(i).is_ok());
+ }
}
+ let mut res1 = vec![];
+ let mut res2 = vec![];
+ stack.read_to_end(&heap, &mut res1);
+ stack2.read_to_end(&heap, &mut res2);
+ assert_eq!(&vec1[..], &res1[..]);
+ assert_eq!(&vec2[..], &res2[..]);
}
#[test]
fn test_jump_if_needed() {
- let mut block_len = 4u32;
- let mut i = 0;
- while i < 10_000_000 {
- assert!(jump_needed(i + block_len - 1).is_none());
- assert!(jump_needed(i + block_len + 1).is_none());
- assert!(jump_needed(i + block_len).is_some());
- let new_block_len = jump_needed(i + block_len).unwrap();
- i += block_len;
- block_len = new_block_len as u32;
+ let mut available = 16u32;
+ for i in 0..10_000_000 {
+ match len_to_capacity(i) {
+ CapacityResult::NeedAlloc(cap) => {
+ assert_eq!(available, 0, "Failed len={}: Expected 0 got {}", i, cap);
+ available = cap;
+ }
+ CapacityResult::Available(cap) => {
+ assert_eq!(
+ available, cap,
+ "Failed len={}: Expected {} Got {}",
+ i, available, cap
+ );
+ }
+ }
+ available -= 1;
+ }
+ }
+
+ #[test]
+ fn test_jump_if_needed_progression() {
+ let mut v = vec![];
+ for i in 0.. {
+ if v.len() >= 10 {
+