From 8e509213634ccbf82c3380e1b660ec1ffb735301 Mon Sep 17 00:00:00 2001 From: petr-tik Date: Tue, 16 Apr 2019 00:05:53 +0100 Subject: Tidied up the Stamper module and upgraded to a 1.34 dependency Added stamper.revert method to be used for rollback - rolling back to a previous commit in case of deleting all documents or rolling operations back should reset the stamper as well Added type alias for Opstamp - helps code readibility instead of seeing u64 returned by functions. Moved to AtomicU64 on stable rust (since 1.34) - where possible use standard library interfaces. --- src/core/index_meta.rs | 3 ++- src/core/segment.rs | 3 ++- src/core/segment_meta.rs | 7 +++--- src/indexer/delete_queue.rs | 5 ++-- src/indexer/doc_opstamp_mapping.rs | 6 +++-- src/indexer/index_writer.rs | 29 +++++++++++----------- src/indexer/merge_operation.rs | 7 +++--- src/indexer/mod.rs | 1 + src/indexer/operation.rs | 5 ++-- src/indexer/prepared_commit.rs | 11 ++++---- src/indexer/segment_updater.rs | 7 +++--- src/indexer/segment_writer.rs | 3 ++- src/indexer/stamper.rs | 51 +++++++++++++++++++++++++++++++------- 13 files changed, 92 insertions(+), 46 deletions(-) diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index c5d33e3..c73aab6 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -1,4 +1,5 @@ use core::SegmentMeta; +use indexer::Opstamp; use schema::Schema; use serde_json; use std::fmt; @@ -15,7 +16,7 @@ use std::fmt; pub struct IndexMeta { pub segments: Vec, pub schema: Schema, - pub opstamp: u64, + pub opstamp: Opstamp, #[serde(skip_serializing_if = "Option::is_none")] pub payload: Option, } diff --git a/src/core/segment.rs b/src/core/segment.rs index a747cfa..1f582d6 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -6,6 +6,7 @@ use directory::error::{OpenReadError, OpenWriteError}; use directory::Directory; use directory::{ReadOnlySource, WritePtr}; use indexer::segment_serializer::SegmentSerializer; +use indexer::Opstamp; use schema::Schema; use std::fmt; use std::path::PathBuf; @@ -50,7 +51,7 @@ impl Segment { } #[doc(hidden)] - pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment { + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment { Segment { index: self.index, meta: self.meta.with_delete_meta(num_deleted_docs, opstamp), diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index 9478663..7ad7ecf 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -1,6 +1,7 @@ use super::SegmentComponent; use census::{Inventory, TrackedObject}; use core::SegmentId; +use indexer::Opstamp; use serde; use std::collections::HashSet; use std::fmt; @@ -13,7 +14,7 @@ lazy_static! { #[derive(Clone, Debug, Serialize, Deserialize)] struct DeleteMeta { num_deleted_docs: u32, - opstamp: u64, + opstamp: Opstamp, } /// `SegmentMeta` contains simple meta information about a segment. @@ -138,7 +139,7 @@ impl SegmentMeta { /// Returns the opstamp of the last delete operation /// taken in account in this segment. - pub fn delete_opstamp(&self) -> Option { + pub fn delete_opstamp(&self) -> Option { self.tracked .deletes .as_ref() @@ -152,7 +153,7 @@ impl SegmentMeta { } #[doc(hidden)] - pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta { + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta { let delete_meta = DeleteMeta { num_deleted_docs, opstamp, diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index be36bef..b46640f 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -1,4 +1,5 @@ use super::operation::DeleteOperation; +use indexer::Opstamp; use std::mem; use std::ops::DerefMut; use std::sync::{Arc, RwLock}; @@ -184,7 +185,7 @@ impl DeleteCursor { /// queue are consume and the next get will return None. /// - the next get will return the first operation with an /// `opstamp >= target_opstamp`. - pub fn skip_to(&mut self, target_opstamp: u64) { + pub fn skip_to(&mut self, target_opstamp: Opstamp) { // TODO Can be optimize as we work with block. while self.is_behind_opstamp(target_opstamp) { self.advance(); @@ -192,7 +193,7 @@ impl DeleteCursor { } #[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))] - fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool { + fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool { self.get() .map(|operation| operation.opstamp < target_opstamp) .unwrap_or(false) diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 26bbe4c..c3f498d 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -22,8 +22,10 @@ pub enum DocToOpstampMapping { None, } +use super::Opstamp; + impl From> for DocToOpstampMapping { - fn from(opstamps: Vec) -> DocToOpstampMapping { + fn from(opstamps: Vec) -> DocToOpstampMapping { DocToOpstampMapping::WithMap(Arc::new(opstamps)) } } @@ -35,7 +37,7 @@ impl DocToOpstampMapping { // // The edge case opstamp = some doc opstamp is in practise // never called. - pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId { + pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId { match *self { DocToOpstampMapping::WithMap(ref doc_opstamps) => { match doc_opstamps.binary_search(&target_opstamp) { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 94961f5..3b4a01e 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -19,6 +19,7 @@ use indexer::doc_opstamp_mapping::DocToOpstampMapping; use indexer::operation::DeleteOperation; use indexer::stamper::Stamper; use indexer::MergePolicy; +use indexer::Opstamp; use indexer::SegmentEntry; use indexer::SegmentWriter; use postings::compute_table_size; @@ -99,7 +100,7 @@ pub struct IndexWriter { delete_queue: DeleteQueue, stamper: Stamper, - committed_opstamp: u64, + committed_opstamp: Opstamp, } /// Open a new index writer. Attempts to acquire a lockfile. @@ -177,7 +178,7 @@ pub fn compute_deleted_bitset( segment_reader: &SegmentReader, delete_cursor: &mut DeleteCursor, doc_opstamps: &DocToOpstampMapping, - target_opstamp: u64, + target_opstamp: Opstamp, ) -> Result { let mut might_have_changed = false; @@ -219,7 +220,7 @@ pub fn compute_deleted_bitset( pub fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, - target_opstamp: u64, + target_opstamp: Opstamp, ) -> Result<()> { { if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { @@ -299,11 +300,11 @@ fn index_documents( // the worker thread. assert!(num_docs > 0); - let doc_opstamps: Vec = segment_writer.finalize()?; + let doc_opstamps: Vec = segment_writer.finalize()?; let segment_meta = SegmentMeta::new(segment_id, num_docs); - let last_docstamp: u64 = *(doc_opstamps.last().unwrap()); + let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); let delete_bitset_opt = if delete_cursor.get().is_some() { let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); @@ -494,7 +495,7 @@ impl IndexWriter { /// state as it was after the last commit. /// /// The opstamp at the last commit is returned. - pub fn rollback(&mut self) -> Result<()> { + pub fn rollback(&mut self) -> Result { info!("Rolling back to opstamp {}", self.committed_opstamp); // marks the segment updater as killed. From now on, all @@ -529,7 +530,7 @@ impl IndexWriter { // was dropped with the index_writer. for _ in document_receiver.clone() {} - Ok(()) + Ok(self.committed_opstamp) } /// Prepares a commit. @@ -567,7 +568,7 @@ impl IndexWriter { info!("Preparing commit"); // this will drop the current document channel - // and recreate a new one channels. + // and recreate a new one. self.recreate_document_channel(); let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new()); @@ -601,7 +602,7 @@ impl IndexWriter { /// Commit returns the `opstamp` of the last document /// that made it in the commit. /// - pub fn commit(&mut self) -> Result { + pub fn commit(&mut self) -> Result { self.prepare_commit()?.commit() } @@ -617,7 +618,7 @@ impl IndexWriter { /// /// Like adds, the deletion itself will be visible /// only after calling `commit()`. - pub fn delete_term(&mut self, term: Term) -> u64 { + pub fn delete_term(&mut self, term: Term) -> Opstamp { let opstamp = self.stamper.stamp(); let delete_operation = DeleteOperation { opstamp, term }; self.delete_queue.push(delete_operation); @@ -631,7 +632,7 @@ impl IndexWriter { /// /// This is also the opstamp of the commit that is currently /// available for searchers. - pub fn commit_opstamp(&self) -> u64 { + pub fn commit_opstamp(&self) -> Opstamp { self.committed_opstamp } @@ -645,7 +646,7 @@ impl IndexWriter { /// /// Currently it represents the number of documents that /// have been added since the creation of the index. - pub fn add_document(&mut self, document: Document) -> u64 { + pub fn add_document(&mut self, document: Document) -> Opstamp { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; let send_result = self.operation_sender.send(vec![add_operation]); @@ -662,7 +663,7 @@ impl IndexWriter { /// The total number of stamps generated by this method is `count + 1`; /// each operation gets a stamp from the `stamps` iterator and `last_opstamp` /// is for the batch itself. - fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range) { + fn get_batch_opstamps(&mut self, count: Opstamp) -> (Opstamp, Range) { let Range { start, end } = self.stamper.stamps(count + 1u64); let last_opstamp = end - 1; let stamps = Range { @@ -688,7 +689,7 @@ impl IndexWriter { /// Like adds and deletes (see `IndexWriter.add_document` and /// `IndexWriter.delete_term`), the changes made by calling `run` will be /// visible to readers only after calling `commit()`. - pub fn run(&mut self, user_operations: Vec) -> u64 { + pub fn run(&mut self, user_operations: Vec) -> Opstamp { let count = user_operations.len() as u64; if count == 0 { return self.stamper.stamp(); diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs index 9d7bcbc..5bf84f1 100644 --- a/src/indexer/merge_operation.rs +++ b/src/indexer/merge_operation.rs @@ -1,4 +1,5 @@ use census::{Inventory, TrackedObject}; +use indexer::Opstamp; use std::collections::HashSet; use SegmentId; @@ -35,14 +36,14 @@ pub struct MergeOperation { } struct InnerMergeOperation { - target_opstamp: u64, + target_opstamp: Opstamp, segment_ids: Vec, } impl MergeOperation { pub fn new( inventory: &MergeOperationInventory, - target_opstamp: u64, + target_opstamp: Opstamp, segment_ids: Vec, ) -> MergeOperation { let inner_merge_operation = InnerMergeOperation { @@ -54,7 +55,7 @@ impl MergeOperation { } } - pub fn target_opstamp(&self) -> u64 { + pub fn target_opstamp(&self) -> Opstamp { self.inner.target_opstamp } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 0620230..9c154c6 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -25,6 +25,7 @@ pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; +pub use self::stamper::Opstamp; /// Alias for the default merge policy, which is the `LogMergePolicy`. pub type DefaultMergePolicy = LogMergePolicy; diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index fe57a4a..e5beca2 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,17 +1,18 @@ +use indexer::Opstamp; use schema::Document; use schema::Term; /// Timestamped Delete operation. #[derive(Clone, Eq, PartialEq, Debug)] pub struct DeleteOperation { - pub opstamp: u64, + pub opstamp: Opstamp, pub term: Term, } /// Timestamped Add operation. #[derive(Eq, PartialEq, Debug)] pub struct AddOperation { - pub opstamp: u64, + pub opstamp: Opstamp, pub document: Document, } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 4728af0..77ea073 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,15 +1,16 @@ use super::IndexWriter; +use indexer::Opstamp; use Result; /// A prepared commit pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, - opstamp: u64, + opstamp: Opstamp, } impl<'a> PreparedCommit<'a> { - pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit { + pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit { PreparedCommit { index_writer, payload: None, @@ -17,7 +18,7 @@ impl<'a> PreparedCommit<'a> { } } - pub fn opstamp(&self) -> u64 { + pub fn opstamp(&self) -> Opstamp { self.opstamp } @@ -25,11 +26,11 @@ impl<'a> PreparedCommit<'a> { self.payload = Some(payload.to_string()) } - pub fn abort(self) -> Result<()> { + pub fn abort(self) -> Result { self.index_writer.rollback() } - pub fn commit(self) -> Result { + pub fn commit(self) -> Result { info!("committing {}", self.opstamp); self.index_writer .segment_updater() diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 316dbd6..e0168a7 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -20,6 +20,7 @@ use indexer::merge_operation::MergeOperationInventory; use indexer::merger::IndexMerger; use indexer::stamper::Stamper; use indexer::MergeOperation; +use indexer::Opstamp; use indexer::SegmentEntry; use indexer::SegmentSerializer; use indexer::{DefaultMergePolicy, MergePolicy}; @@ -224,7 +225,7 @@ impl SegmentUpdater { /// /// Tne method returns copies of the segment entries, /// updated with the delete information. - fn purge_deletes(&self, target_opstamp: u64) -> Result> { + fn purge_deletes(&self, target_opstamp: Opstamp) -> Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { let segment = self.0.index.segment(segment_entry.meta().clone()); @@ -233,7 +234,7 @@ impl SegmentUpdater { Ok(segment_entries) } - pub fn save_metas(&self, opstamp: u64, commit_message: Option) { + pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option) { if self.is_alive() { let index = &self.0.index; let directory = index.directory(); @@ -280,7 +281,7 @@ impl SegmentUpdater { .garbage_collect(|| self.0.segment_manager.list_files()); } - pub fn commit(&self, opstamp: u64, payload: Option) -> Result<()> { + pub fn commit(&self, opstamp: Opstamp, payload: Option) -> Result<()> { self.run_async(move |segment_updater| { if segment_updater.is_alive() { let segment_entries = segment_updater diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 4eb42c8..e533bdc 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -4,6 +4,7 @@ use core::SerializableSegment; use fastfield::FastFieldsWriter; use fieldnorm::FieldNormsWriter; use indexer::segment_serializer::SegmentSerializer; +use indexer::Opstamp; use postings::MultiFieldPostingsWriter; use schema::FieldType; use schema::Schema; @@ -28,7 +29,7 @@ pub struct SegmentWriter { segment_serializer: SegmentSerializer, fast_field_writers: FastFieldsWriter, fieldnorms_writer: FieldNormsWriter, - doc_opstamps: Vec, + doc_opstamps: Vec, tokenizers: Vec>>, } diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 6cd1c49..65dba3a 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -2,24 +2,32 @@ use std::ops::Range; use std::sync::atomic::Ordering; use std::sync::Arc; +pub type Opstamp = u64; + // AtomicU64 have not landed in stable. // For the moment let's just use AtomicUsize on // x86/64 bit platform, and a mutex on other platform. #[cfg(target_arch = "x86_64")] mod archicture_impl { - use std::sync::atomic::{AtomicUsize, Ordering}; + use indexer::stamper::Opstamp; + use std::sync::atomic::{AtomicU64, Ordering}; #[derive(Default)] - pub struct AtomicU64Ersatz(AtomicUsize); + pub struct AtomicU64Ersatz(AtomicU64); impl AtomicU64Ersatz { - pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { - AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize)) + pub fn new(first_opstamp: Opstamp) -> AtomicU64Ersatz { + AtomicU64Ersatz(AtomicU64::new(first_opstamp as u64)) } pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 { - self.0.fetch_add(val as usize, order) as u64 + self.0.fetch_add(val as u64, order) as u64 + } + + pub fn revert(&self, val: u64, order: Ordering) -> u64 { + self.0.store(val, order); + val } } } @@ -27,6 +35,7 @@ mod archicture_impl { #[cfg(not(target_arch = "x86_64"))] mod archicture_impl { + use indexer::stamper::Opstamp; use std::sync::atomic::Ordering; /// Under other architecture, we rely on a mutex. use std::sync::RwLock; @@ -35,7 +44,7 @@ mod archicture_impl { pub struct AtomicU64Ersatz(RwLock); impl AtomicU64Ersatz { - pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { + pub fn new(first_opstamp: Opstamp) -> AtomicU64Ersatz { AtomicU64Ersatz(RwLock::new(first_opstamp)) } @@ -45,6 +54,12 @@ mod archicture_impl { *lock = previous_val + incr; previous_val } + + pub fn revert(&self, val: u64, _order: Ordering) -> u64 { + let mut lock = self.0.write().unwrap(); + *lock = val; + val + } } } @@ -54,23 +69,27 @@ use self::archicture_impl::AtomicU64Ersatz; pub struct Stamper(Arc); impl Stamper { - pub fn new(first_opstamp: u64) -> Stamper { + pub fn new(first_opstamp: Opstamp) -> Stamper { Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp))) } - pub fn stamp(&self) -> u64 { + pub fn stamp(&self) -> Opstamp { self.0.fetch_add(1u64, Ordering::SeqCst) as u64 } /// Given a desired count `n`, `stamps` returns an iterator that /// will supply `n` number of u64 stamps. - pub fn stamps(&self, n: u64) -> Range { + pub fn stamps(&self, n: u64) -> Range { let start = self.0.fetch_add(n, Ordering::SeqCst); Range { start, end: start + n, } } + + pub fn revert(&mut self, to_opstamp: Opstamp) -> Opstamp { + self.0.revert(to_opstamp, Ordering::SeqCst) + } } #[cfg(test)] @@ -92,4 +111,18 @@ mod test { assert_eq!(stamper.stamps(3u64), (12..15)); assert_eq!(stamper.stamp(), 15u64); } + + #[test] + fn test_stamper_revert() { + let mut stamper = Stamper::new(7u64); + assert_eq!(stamper.stamp(), 7u64); + assert_eq!(stamper.stamp(), 8u64); + + let stamper_clone = stamper.clone(); + assert_eq!(stamper_clone.stamp(), 9u64); + + stamper.revert(6); + assert_eq!(stamper.stamp(), 6); + assert_eq!(stamper_clone.stamp(), 7); + } } -- cgit v1.2.3