diff options
author | Paul Masurel <paul.masurel@gmail.com> | 2017-05-31 00:05:52 +0900 |
---|---|---|
committer | Paul Masurel <paul.masurel@gmail.com> | 2017-05-31 08:31:20 +0900 |
commit | 8d4778f94d67e0d71bc93ded4048d2a4f644de3c (patch) | |
tree | f0de1e81ba744f864d6d9063975484ed364043df | |
parent | 1d5464351ddc6dcdd41e815c36d8779ed1ed60de (diff) |
issue/181 BinarySerializable does not return the len + Generics over Read+Write
-rw-r--r-- | src/common/bitpacker.rs | 17 | ||||
-rw-r--r-- | src/common/counting_writer.rs (renamed from src/termdict/streamdict/counting_writer.rs) | 0 | ||||
-rw-r--r-- | src/common/mod.rs | 2 | ||||
-rw-r--r-- | src/common/serialize.rs | 67 | ||||
-rw-r--r-- | src/common/vint.rs | 7 | ||||
-rw-r--r-- | src/datastruct/skip/skiplist_builder.rs | 14 | ||||
-rw-r--r-- | src/fastfield/reader.rs | 14 | ||||
-rw-r--r-- | src/fastfield/serializer.rs | 38 | ||||
-rw-r--r-- | src/postings/serializer.rs | 51 | ||||
-rw-r--r-- | src/postings/term_info.rs | 8 | ||||
-rw-r--r-- | src/schema/field.rs | 2 | ||||
-rw-r--r-- | src/schema/field_value.rs | 5 | ||||
-rw-r--r-- | src/schema/value.rs | 16 | ||||
-rw-r--r-- | src/store/writer.rs | 28 | ||||
-rw-r--r-- | src/termdict/mod.rs | 2 | ||||
-rw-r--r-- | src/termdict/streamdict/mod.rs | 2 | ||||
-rw-r--r-- | src/termdict/streamdict/streamer.rs | 1 | ||||
-rw-r--r-- | src/termdict/streamdict/termdict.rs | 2 |
18 files changed, 131 insertions, 145 deletions
diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index b259bd3..f625e07 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -37,7 +37,6 @@ pub struct BitPacker { mini_buffer: u64, mini_buffer_written: usize, num_bits: usize, - written_size: usize, } impl BitPacker { @@ -46,7 +45,6 @@ impl BitPacker { mini_buffer: 0u64, mini_buffer_written: 0, num_bits: num_bits, - written_size: 0, } } @@ -54,14 +52,14 @@ impl BitPacker { let val_u64 = val as u64; if self.mini_buffer_written + self.num_bits > 64 { self.mini_buffer |= val_u64.wrapping_shl(self.mini_buffer_written as u32); - self.written_size += self.mini_buffer.serialize(output)?; + self.mini_buffer.serialize(output)?; self.mini_buffer = val_u64.wrapping_shr((64 - self.mini_buffer_written) as u32); self.mini_buffer_written = self.mini_buffer_written + (self.num_bits as usize) - 64; } else { self.mini_buffer |= val_u64 << self.mini_buffer_written; self.mini_buffer_written += self.num_bits; if self.mini_buffer_written == 64 { - self.written_size += self.mini_buffer.serialize(output)?; + self.mini_buffer.serialize(output)?; self.mini_buffer_written = 0; self.mini_buffer = 0u64; } @@ -74,18 +72,16 @@ impl BitPacker { let num_bytes = (self.mini_buffer_written + 7) / 8; let arr: [u8; 8] = unsafe { mem::transmute::<u64, [u8; 8]>(self.mini_buffer) }; output.write_all(&arr[..num_bytes])?; - self.written_size += num_bytes; self.mini_buffer_written = 0; } Ok(()) } - pub fn close<TWrite: Write>(&mut self, output: &mut TWrite) -> io::Result<usize> { + pub fn close<TWrite: Write>(&mut self, output: &mut TWrite) -> io::Result<()> { self.flush(output)?; // Padding the write file to simplify reads. output.write_all(&[0u8; 7])?; - self.written_size += 7; - Ok(self.written_size) + Ok(()) } } @@ -163,9 +159,8 @@ mod test { for &val in &vals { bitpacker.write(val, &mut data).unwrap(); } - let num_bytes = bitpacker.close(&mut data).unwrap(); - assert_eq!(num_bytes, (num_bits * len + 7) / 8 + 7); - assert_eq!(data.len(), num_bytes); + bitpacker.close(&mut data).unwrap(); + assert_eq!(data.len(), (num_bits * len + 7) / 8 + 7); let bitunpacker = BitUnpacker::new(data, num_bits); for (i, val) in vals.iter().enumerate() { assert_eq!(bitunpacker.get(i), *val); diff --git a/src/termdict/streamdict/counting_writer.rs b/src/common/counting_writer.rs index db13e36..db13e36 100644 --- a/src/termdict/streamdict/counting_writer.rs +++ b/src/common/counting_writer.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index e8e9fac..0af9d24 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,7 @@ mod serialize; mod timer; mod vint; +mod counting_writer; pub mod bitpacker; pub use self::serialize::BinarySerializable; @@ -8,6 +9,7 @@ pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; pub use self::vint::VInt; +pub use self::counting_writer::CountingWriter; use std::io; diff --git a/src/common/serialize.rs b/src/common/serialize.rs index ffd6982..ee86247 100644 --- a/src/common/serialize.rs +++ b/src/common/serialize.rs @@ -6,14 +6,16 @@ use std::io::Read; use std::io; use common::VInt; + + pub trait BinarySerializable: fmt::Debug + Sized { - fn serialize(&self, writer: &mut Write) -> io::Result<usize>; + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>; fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>; } impl BinarySerializable for () { - fn serialize(&self, _: &mut Write) -> io::Result<usize> { - Ok(0) + fn serialize<W: Write>(&self, _: &mut W) -> io::Result<()> { + Ok(()) } fn deserialize<R: Read>(_: &mut R) -> io::Result<Self> { Ok(()) @@ -21,18 +23,18 @@ impl BinarySerializable for () { } impl<T: BinarySerializable> BinarySerializable for Vec<T> { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - let mut total_size = try!(VInt(self.len() as u64).serialize(writer)); + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + VInt(self.len() as u64).serialize(writer)?; for it in self { - total_size += try!(it.serialize(writer)); + it.serialize(writer)?; } - Ok(total_size) + Ok(()) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<Vec<T>> { - let num_items = try!(VInt::deserialize(reader)).val(); + let num_items = VInt::deserialize(reader)?.val(); let mut items: Vec<T> = Vec::with_capacity(num_items as usize); for _ in 0..num_items { - let item = try!(T::deserialize(reader)); + let item = T::deserialize(reader)?; items.push(item); } Ok(items) @@ -41,17 +43,18 @@ impl<T: BinarySerializable> BinarySerializable for Vec<T> { impl<Left: BinarySerializable, Right: BinarySerializable> BinarySerializable for (Left, Right) { - fn serialize(&self, write: &mut Write) -> io::Result<usize> { - Ok(try!(self.0.serialize(write)) + try!(self.1.serialize(write))) + fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> { + self.0.serialize(write)?; + self.1.serialize(write) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { - Ok((try!(Left::deserialize(reader)), try!(Right::deserialize(reader)))) + Ok((Left::deserialize(reader)?, Right::deserialize(reader)?)) } } impl BinarySerializable for u32 { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - writer.write_u32::<Endianness>(*self).map(|_| 4) + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + writer.write_u32::<Endianness>(*self) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<u32> { @@ -61,28 +64,27 @@ impl BinarySerializable for u32 { impl BinarySerializable for u64 { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - writer.write_u64::<Endianness>(*self).map(|_| 8) + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + writer.write_u64::<Endianness>(*self) } - fn deserialize<R: Read>(reader: &mut R) -> io::Result<u64> { + fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { reader.read_u64::<Endianness>() } } impl BinarySerializable for i64 { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - writer.write_i64::<Endianness>(*self).map(|_| 8) + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + writer.write_i64::<Endianness>(*self) } - fn deserialize<R: Read>(reader: &mut R) -> io::Result<i64> { + fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { reader.read_i64::<Endianness>() } } impl BinarySerializable for u8 { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - try!(writer.write_u8(*self)); - Ok(1) + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + writer.write_u8(*self) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<u8> { reader.read_u8() @@ -90,20 +92,18 @@ impl BinarySerializable for u8 { } impl BinarySerializable for String { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { let data: &[u8] = self.as_bytes(); - let mut size = try!(VInt(data.len() as u64).serialize(writer)); - size += data.len(); - try!(writer.write_all(data)); - Ok(size) + VInt(data.len() as u64).serialize(writer)?; + writer.write_all(data) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<String> { - let string_length = try!(VInt::deserialize(reader)).val() as usize; + let string_length = VInt::deserialize(reader)?.val() as usize; let mut result = String::with_capacity(string_length); - try!(reader - .take(string_length as u64) - .read_to_string(&mut result)); + reader + .take(string_length as u64) + .read_to_string(&mut result)?; Ok(result) } } @@ -117,9 +117,8 @@ mod test { fn serialize_test<T: BinarySerializable + Eq>(v: T, num_bytes: usize) { let mut buffer: Vec<u8> = Vec::new(); - if num_bytes != 0 { - assert_eq!(v.serialize(&mut buffer).unwrap(), num_bytes); + v.serialize(&mut buffer).unwrap(); assert_eq!(buffer.len(), num_bytes); } else { v.serialize(&mut buffer).unwrap(); diff --git a/src/common/vint.rs b/src/common/vint.rs index 3201b85..c1a0145 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -16,10 +16,10 @@ impl VInt { } impl BinarySerializable for VInt { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { let mut remaining = self.0; - let mut written: usize = 0; let mut buffer = [0u8; 10]; + let mut written = 0; loop { let next_byte: u8 = (remaining % 128u64) as u8; remaining /= 128u64; @@ -32,8 +32,7 @@ impl BinarySerializable for VInt { written += 1; } } - try!(writer.write_all(&buffer[0..written])); - Ok(written) + writer.write_all(&buffer[0..written]) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { diff --git a/src/datastruct/skip/skiplist_builder.rs b/src/datastruct/skip/skiplist_builder.rs index 34c5d8a..eaa439d 100644 --- a/src/datastruct/skip/skiplist_builder.rs +++ b/src/datastruct/skip/skiplist_builder.rs @@ -18,7 +18,7 @@ impl<T: BinarySerializable> LayerBuilder<T> { } fn write(&self, output: &mut Write) -> Result<(), io::Error> { - try!(output.write_all(&self.buffer)); + output.write_all(&self.buffer)?; Ok(()) } @@ -36,8 +36,8 @@ impl<T: BinarySerializable> LayerBuilder<T> { self.remaining -= 1; self.len += 1; let offset = self.written_size() as u32; - try!(doc_id.serialize(&mut self.buffer)); - try!(value.serialize(&mut self.buffer)); + doc_id.serialize(&mut self.buffer)?; + value.serialize(&mut self.buffer)?; Ok(if self.remaining == 0 { self.remaining = self.period; Some((doc_id, offset)) @@ -89,7 +89,7 @@ impl<T: BinarySerializable> SkipListBuilder<T> { } } - pub fn write<W: Write>(self, output: &mut Write) -> io::Result<()> { + pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> { let mut size: u32 = 0; let mut layer_sizes: Vec<u32> = Vec::new(); size += self.data_layer.buffer.len() as u32; @@ -98,10 +98,10 @@ impl<T: BinarySerializable> SkipListBuilder<T> { size += layer.buffer.len() as u32; layer_sizes.push(size); } - try!(layer_sizes.serialize(output)); - try!(self.data_layer.write(output)); + layer_sizes.serialize(output)?; + self.data_layer.write(output)?; for layer in self.skip_layers.iter().rev() { - try!(layer.write(output)); + layer.write(output)?; } Ok(()) } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index ee1fc1c..94f1873 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -12,6 +12,7 @@ use fastfield::FastFieldsWriter; use common::bitpacker::compute_num_bits; use common::bitpacker::BitUnpacker; use schema::FieldType; +use error::ResultExt; use common; use owning_ref::OwningRef; @@ -125,9 +126,16 @@ impl From<Vec<u64>> for U64FastFieldReader { fast_field_writers.serialize(&mut serializer).unwrap(); serializer.close().unwrap(); } - let source = directory.open_read(path).unwrap(); - let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); - fast_field_readers.open_reader(field).unwrap() + directory + .open_read(path) + .chain_err(|| "Failed to open the file") + .and_then(|source| FastFieldsReader::from_source(source) + .chain_err(|| "Failed to read the file.")) + .and_then(|ff_readers| ff_readers + .open_reader(field) + .ok_or_else(|| {"Failed to find the requested field".into() })) + .expect("This should never happen, please report.") + } } diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index 7f97b3b..ef6ffed 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -2,9 +2,9 @@ use common::BinarySerializable; use directory::WritePtr; use schema::Field; use common::bitpacker::{compute_num_bits, BitPacker}; +use common::CountingWriter; use std::io::{self, Write, Seek, SeekFrom}; - /// `FastFieldSerializer` is in charge of serializing /// fastfields on disk. /// @@ -26,8 +26,7 @@ use std::io::{self, Write, Seek, SeekFrom}; /// * `close_field()` /// * `close()` pub struct FastFieldSerializer { - write: WritePtr, - written_size: usize, + write: CountingWriter<WritePtr>, fields: Vec<(Field, u32)>, min_value: u64, field_open: bool, @@ -37,12 +36,12 @@ pub struct FastFieldSerializer { impl FastFieldSerializer { /// Constructor - pub fn new(mut write: WritePtr) -> io::Result<FastFieldSerializer> { + pub fn new(write: WritePtr) -> io::Result<FastFieldSerializer> { // just making room for the pointer to header. - let written_size: usize = try!(0u32.serialize(&mut write)); + let mut counting_writer = CountingWriter::wrap(write); + 0u32.serialize(&mut counting_writer)?; Ok(FastFieldSerializer { - write: write, - written_size: written_size, + write: counting_writer, fields: Vec::new(), min_value: 0, field_open: false, @@ -61,11 +60,11 @@ impl FastFieldSerializer { } self.min_value = min_value; self.field_open = true; - self.fields.push((field, self.written_size as u32)); - let write: &mut Write = &mut self.write; - self.written_size += try!(min_value.serialize(write)); + self.fields.push((field, self.write.written_bytes() as u32)); + let write = &mut self.write; + min_value.serialize(write)?; let amplitude = max_value - min_value; - self.written_size += try!(amplitude.serialize(write)); + amplitude.serialize(write)?; let num_bits = compute_num_bits(amplitude); self.bit_packer = BitPacker::new(num_bits as usize); Ok(()) @@ -88,7 +87,7 @@ impl FastFieldSerializer { // adding some padding to make sure we // can read the last elements with our u64 // cursor - self.written_size += self.bit_packer.close(&mut self.write)?; + self.bit_packer.close(&mut self.write)?; Ok(()) } @@ -96,15 +95,16 @@ impl FastFieldSerializer { /// Closes the serializer /// /// After this call the data must be persistently save on disk. - pub fn close(mut self) -> io::Result<usize> { + pub fn close(self) -> io::Result<usize> { if self.field_open { return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); } - let header_offset: usize = self.written_size; - self.written_size += try!(self.fields.serialize(&mut self.write)); - try!(self.write.seek(SeekFrom::Start(0))); - try!((header_offset as u32).serialize(&mut self.write)); - try!(self.write.flush()); - Ok(self.written_size) + let header_offset: usize = self.write.written_bytes() as usize; + let (mut write, written_size) = self.write.finish()?; + self.fields.serialize(&mut write)?; + write.seek(SeekFrom::Start(0))?; + (header_offset as u32).serialize(&mut write)?; + write.flush()?; + Ok(written_size) } } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index d5e72fa..4a3078a 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -10,12 +10,11 @@ use directory::WritePtr; use compression::{NUM_DOCS_PER_BLOCK, BlockEncoder, CompositeEncoder}; use DocId; use core::Segment; -use std::io; -use core::SegmentComponent; -use std::io::Write; +use std::io::{self, Write}; use compression::VIntEncoder; use common::VInt; use common::BinarySerializable; +use common::CountingWriter; use termdict::TermDictionaryBuilder; @@ -52,10 +51,8 @@ use termdict::TermDictionaryBuilder; /// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html). pub struct PostingsSerializer { terms_fst_builder: TermDictionaryBuilderImpl<WritePtr, TermInfo>, - postings_write: WritePtr, - positions_write: WritePtr, - written_bytes_postings: usize, - written_bytes_positions: usize, + postings_write: CountingWriter<WritePtr>, + positions_write: CountingWriter<WritePtr>, last_doc_id_encoded: u32, positions_encoder: CompositeEncoder, block_encoder: BlockEncoder, @@ -78,10 +75,8 @@ impl PostingsSerializer { let terms_fst_builder = try!(TermDictionaryBuilderImpl::new(terms_write)); Ok(PostingsSerializer { terms_fst_builder: terms_fst_builder, - postings_write: postings_write, - positions_write: positions_write, - written_bytes_postings: 0, - written_bytes_positions: 0, + postings_write: CountingWriter::wrap(postings_write), + positions_write: CountingWriter::wrap(positions_write), last_doc_id_encoded: 0u32, positions_encoder: CompositeEncoder::new(), block_encoder: BlockEncoder::new(), @@ -98,12 +93,10 @@ impl PostingsSerializer { /// Open a new `PostingsSerializer` for the given segment pub fn open(segment: &mut Segment) -> Result<PostingsSerializer> { - let terms_write = try!(segment.open_write(SegmentComponent::TERMS)); - let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); - let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS)); - PostingsSerializer::new(terms_write, - postings_write, - positions_write, + use SegmentComponent::{TERMS, POSTINGS, POSITIONS}; + PostingsSerializer::new(segment.open_write(TERMS)?, + segment.open_write(POSTINGS)?, + segment.open_write(POSITIONS)?, segment.schema()) } @@ -141,8 +134,8 @@ impl PostingsSerializer { self.position_deltas.clear(); self.current_term_info = TermInfo { doc_freq: 0, - postings_offset: self.written_bytes_postings as u32, - positions_offset: self.written_bytes_positions as u32, + postings_offset: self.postings_write.written_bytes() as u32, + positions_offset: self.positions_write.written_bytes() as u32, }; self.terms_fst_builder.insert_key(term) } @@ -168,8 +161,7 @@ impl PostingsSerializer { let block_encoded = self.block_encoder .compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); - self.written_bytes_postings += block_encoded.len(); - try!(self.postings_write.write_all(block_encoded)); + self.postings_write.write_all(block_encoded)?; self.doc_ids.clear(); } // ... Idem for term frequencies @@ -177,8 +169,7 @@ impl PostingsSerializer { let block_encoded = self.block_encoder .compress_vint_unsorted(&self.term_freqs[..]); for num in block_encoded { - self.written_bytes_postings += - try!(num.serialize(&mut self.postings_write)); + num.serialize(&mut self.postings_write)?; } self.term_freqs.clear(); } @@ -186,13 +177,11 @@ impl PostingsSerializer { // On the other hand, positions are entirely buffered until the // end of the term, at which point they are compressed and written. if self.text_indexing_options.is_position_enabled() { - self.written_bytes_positions += - try!(VInt(self.position_deltas.len() as u64) - .serialize(&mut self.positions_write)); + let posdelta_len = VInt(self.position_deltas.len() as u64); + posdelta_len.serialize(&mut self.positions_write)?; let positions_encoded: &[u8] = self.positions_encoder .compress_unsorted(&self.position_deltas[..]); - try!(self.positions_write.write_all(positions_encoded)); - self.written_bytes_positions += positions_encoded.len(); + self.positions_write.write_all(positions_encoded)?; self.position_deltas.clear(); } self.term_open = false; @@ -230,15 +219,13 @@ impl PostingsSerializer { self.block_encoder .compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded); self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1]; - try!(self.postings_write.write_all(block_encoded)); - self.written_bytes_postings += block_encoded.len(); + self.postings_write.write_all(block_encoded)?; } if self.text_indexing_options.is_termfreq_enabled() { // encode the term_freqs let block_encoded: &[u8] = self.block_encoder .compress_block_unsorted(&self.term_freqs); - try!(self.postings_write.write_all(block_encoded)); - self.written_bytes_postings += block_encoded.len(); + self.postings_write.write_all(block_encoded)?; self.term_freqs.clear(); } self.doc_ids.clear(); diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index 90d9d02..d639e9a 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -24,10 +24,12 @@ pub struct TermInfo { impl BinarySerializable for TermInfo { - fn serialize(&self, writer: &mut io::Write) -> io::Result<usize> { - Ok(try!(self.doc_freq.serialize(writer)) + try!(self.postings_offset.serialize(writer)) + - try!(self.positions_offset.serialize(writer))) + fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> { + self.doc_freq.serialize(writer)?; + self.postings_offset.serialize(writer)?; + self.positions_offset.serialize(writer) } + fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> { let doc_freq = try!(u32::deserialize(reader)); let postings_offset = try!(u32::deserialize(reader)); diff --git a/src/schema/field.rs b/src/schema/field.rs index e5489ad..9df8e14 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -14,7 +14,7 @@ use common::BinarySerializable; pub struct Field(pub u32); impl BinarySerializable for Field { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { self.0.serialize(writer) } diff --git a/src/schema/field_value.rs b/src/schema/field_value.rs index b6cd546..5b7359f 100644 --- a/src/schema/field_value.rs +++ b/src/schema/field_value.rs @@ -34,8 +34,9 @@ impl FieldValue { } impl BinarySerializable for FieldValue { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - Ok(self.field.serialize(writer)? + self.value.serialize(writer)?) + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { + self.field.serialize(writer)?; + self.value.serialize(writer) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { diff --git a/src/schema/value.rs b/src/schema/value.rs index 139e23f..ad24688 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -130,23 +130,21 @@ mod binary_serialize { const I64_CODE: u8 = 2; impl BinarySerializable for Value { - fn serialize(&self, writer: &mut Write) -> io::Result<usize> { - let mut written_size = 0; + fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { match *self { Value::Str(ref text) => { - written_size += try!(TEXT_CODE.serialize(writer)); - written_size += try!(text.serialize(writer)); + TEXT_CODE.serialize(writer)?; + text.serialize(writer) } Value::U64(ref val) => { - written_size += try!(U64_CODE.serialize(writer)); - written_size += try!(val.serialize(writer)); + U64_CODE.serialize(writer)?; + val.serialize(writer) } Value::I64(ref val) => { - written_size += try!(I64_CODE.serialize(writer)); - written_size += try!(val.serialize(writer)); + I64_CODE.serialize(writer)?; + val.serialize(writer) } } - Ok(written_size) } fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> { let type_code = try!(u8::deserialize(reader)); diff --git a/src/store/writer.rs b/src/store/writer.rs index c6f1e49..28befa7 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -5,6 +5,7 @@ use common::BinarySerializable; use std::io::{self, Write}; use lz4; use datastruct::SkipListBuilder; +use common::CountingWriter; const BLOCK_SIZE: usize = 16_384; @@ -19,9 +20,8 @@ const BLOCK_SIZE: usize = 16_384; /// pub struct StoreWriter { doc: DocId, - written: u64, offset_index_writer: SkipListBuilder<u64>, - writer: WritePtr, + writer: CountingWriter<WritePtr>, intermediary_buffer: Vec<u8>, current_block: Vec<u8>, } @@ -35,9 +35,8 @@ impl StoreWriter { pub fn new(writer: WritePtr) -> StoreWriter { StoreWriter { doc: 0, - written: 0, offset_index_writer: SkipListBuilder::new(3), - writer: writer, + writer: CountingWriter::wrap(writer), intermediary_buffer: Vec::new(), current_block: Vec::new(), } @@ -54,11 +53,12 @@ impl StoreWriter { for field_value in field_values { try!((*field_value).serialize(&mut self.intermediary_buffer)); } - try!((self.intermediary_buffer.len() as u32).serialize(&mut self.current_block)); - try!(self.current_block.write_all(&self.intermediary_buffer[..])); + (self.intermediary_buffer.len() as u32) + .serialize(&mut self.current_block)?; + self.current_block.write_all(&self.intermediary_buffer[..])?; self.doc += 1; if self.current_block.len() > BLOCK_SIZE { - try!(self.write_and_compress_block()); + self.write_and_compress_block()?; } Ok(()) } @@ -71,11 +71,11 @@ impl StoreWriter { let (_, encoder_result) = encoder.finish(); try!(encoder_result); } - let compressed_block_size = self.intermediary_buffer.len() as u64; - self.written += try!((compressed_block_size as u32).serialize(&mut self.writer)) as u64; - try!(self.writer.write_all(&self.intermediary_buffer)); - self.written += compressed_block_size; - try!(self.offset_index_writer.insert(self.doc, &self.written)); + (self.intermediary_buffer.len() as u32) + .serialize(&mut self.writer)?; + self.writer.write_all(&self.intermediary_buffer)?; + self.offset_index_writer + .insert(self.doc, &(self.writer.written_bytes() as u64))?; self.current_block.clear(); Ok(()) } @@ -89,9 +89,9 @@ impl StoreWriter { if !self.current_block.is_empty() { try!(self.write_and_compress_block()); } - let header_offset: u64 = self.written; + let header_offset: u64 = self.writer.written_bytes() as u64; try!(self.offset_index_writer - .write::<Box<Write>>(&mut self.writer)); + .write(&mut self.writer)); try!(header_offset.serialize(&mut self.writer)); try!(self.doc.serialize(&mut self.writer)); self.writer.flush() diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 9ec3247..7e27216 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -411,7 +411,6 @@ mod tests { { for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { for j in 0..3 { - println!("i {} j {}", i, j); let &(ref fst_key, _) = &ids[i]; let &(ref last_key, _) = &ids[i + j]; let mut streamer = term_dictionary @@ -420,7 +419,6 @@ mod tests { .lt(last_key.as_bytes()) .into_stream(); for _ in 0..j { - println!("ij"); assert!(streamer.next().is_some()); } assert!(streamer.next().is_none()); diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index 66c3eb9..90b719d 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -1,9 +1,7 @@ mod termdict; mod streamer; -mod counting_writer; -use self::counting_writer::CountingWriter; pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index dd27a2b..55167d4 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -4,7 +4,6 @@ use std::cmp::max; use common::BinarySerializable; use super::TermDictionaryImpl; use termdict::{TermStreamerBuilder, TermStreamer}; -use std::io::Read; pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionaryImpl<V>, target_key: &[u8]) diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs< |