summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorpetr-tik <petr-tik@users.noreply.github.com>2019-04-16 00:05:53 +0100
committerpetr-tik <petr-tik@users.noreply.github.com>2019-04-24 20:46:28 +0100
commit8e509213634ccbf82c3380e1b660ec1ffb735301 (patch)
treeccabf983d387fd1fd7dc48b5f9932e7a968f0db7 /src
parent96a4f503ecd8f138862bf0a22948a600faf2b120 (diff)
Tidied up the Stamper module and upgraded to a 1.34 dependency
Added stamper.revert method to be used for rollback - rolling back to a previous commit in case of deleting all documents or rolling operations back should reset the stamper as well Added type alias for Opstamp - helps code readibility instead of seeing u64 returned by functions. Moved to AtomicU64 on stable rust (since 1.34) - where possible use standard library interfaces.
Diffstat (limited to 'src')
-rw-r--r--src/core/index_meta.rs3
-rw-r--r--src/core/segment.rs3
-rw-r--r--src/core/segment_meta.rs7
-rw-r--r--src/indexer/delete_queue.rs5
-rw-r--r--src/indexer/doc_opstamp_mapping.rs6
-rw-r--r--src/indexer/index_writer.rs29
-rw-r--r--src/indexer/merge_operation.rs7
-rw-r--r--src/indexer/mod.rs1
-rw-r--r--src/indexer/operation.rs5
-rw-r--r--src/indexer/prepared_commit.rs11
-rw-r--r--src/indexer/segment_updater.rs7
-rw-r--r--src/indexer/segment_writer.rs3
-rw-r--r--src/indexer/stamper.rs51
13 files changed, 92 insertions, 46 deletions
diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs
index c5d33e3..c73aab6 100644
--- a/src/core/index_meta.rs
+++ b/src/core/index_meta.rs
@@ -1,4 +1,5 @@
use core::SegmentMeta;
+use indexer::Opstamp;
use schema::Schema;
use serde_json;
use std::fmt;
@@ -15,7 +16,7 @@ use std::fmt;
pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
- pub opstamp: u64,
+ pub opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}
diff --git a/src/core/segment.rs b/src/core/segment.rs
index a747cfa..1f582d6 100644
--- a/src/core/segment.rs
+++ b/src/core/segment.rs
@@ -6,6 +6,7 @@ use directory::error::{OpenReadError, OpenWriteError};
use directory::Directory;
use directory::{ReadOnlySource, WritePtr};
use indexer::segment_serializer::SegmentSerializer;
+use indexer::Opstamp;
use schema::Schema;
use std::fmt;
use std::path::PathBuf;
@@ -50,7 +51,7 @@ impl Segment {
}
#[doc(hidden)]
- pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
+ pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs
index 9478663..7ad7ecf 100644
--- a/src/core/segment_meta.rs
+++ b/src/core/segment_meta.rs
@@ -1,6 +1,7 @@
use super::SegmentComponent;
use census::{Inventory, TrackedObject};
use core::SegmentId;
+use indexer::Opstamp;
use serde;
use std::collections::HashSet;
use std::fmt;
@@ -13,7 +14,7 @@ lazy_static! {
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
- opstamp: u64,
+ opstamp: Opstamp,
}
/// `SegmentMeta` contains simple meta information about a segment.
@@ -138,7 +139,7 @@ impl SegmentMeta {
/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
- pub fn delete_opstamp(&self) -> Option<u64> {
+ pub fn delete_opstamp(&self) -> Option<Opstamp> {
self.tracked
.deletes
.as_ref()
@@ -152,7 +153,7 @@ impl SegmentMeta {
}
#[doc(hidden)]
- pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
+ pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs
index be36bef..b46640f 100644
--- a/src/indexer/delete_queue.rs
+++ b/src/indexer/delete_queue.rs
@@ -1,4 +1,5 @@
use super::operation::DeleteOperation;
+use indexer::Opstamp;
use std::mem;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
@@ -184,7 +185,7 @@ impl DeleteCursor {
/// queue are consume and the next get will return None.
/// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`.
- pub fn skip_to(&mut self, target_opstamp: u64) {
+ pub fn skip_to(&mut self, target_opstamp: Opstamp) {
// TODO Can be optimize as we work with block.
while self.is_behind_opstamp(target_opstamp) {
self.advance();
@@ -192,7 +193,7 @@ impl DeleteCursor {
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
- fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
+ fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false)
diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs
index 26bbe4c..c3f498d 100644
--- a/src/indexer/doc_opstamp_mapping.rs
+++ b/src/indexer/doc_opstamp_mapping.rs
@@ -22,8 +22,10 @@ pub enum DocToOpstampMapping {
None,
}
+use super::Opstamp;
+
impl From<Vec<u64>> for DocToOpstampMapping {
- fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
+ fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
@@ -35,7 +37,7 @@ impl DocToOpstampMapping {
//
// The edge case opstamp = some doc opstamp is in practise
// never called.
- pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
+ pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) {
diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs
index 94961f5..3b4a01e 100644
--- a/src/indexer/index_writer.rs
+++ b/src/indexer/index_writer.rs
@@ -19,6 +19,7 @@ use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use indexer::operation::DeleteOperation;
use indexer::stamper::Stamper;
use indexer::MergePolicy;
+use indexer::Opstamp;
use indexer::SegmentEntry;
use indexer::SegmentWriter;
use postings::compute_table_size;
@@ -99,7 +100,7 @@ pub struct IndexWriter {
delete_queue: DeleteQueue,
stamper: Stamper,
- committed_opstamp: u64,
+ committed_opstamp: Opstamp,
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -177,7 +178,7 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
- target_opstamp: u64,
+ target_opstamp: Opstamp,
) -> Result<bool> {
let mut might_have_changed = false;
@@ -219,7 +220,7 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
- target_opstamp: u64,
+ target_opstamp: Opstamp,
) -> Result<()> {
{
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
@@ -299,11 +300,11 @@ fn index_documents(
// the worker thread.
assert!(num_docs > 0);
- let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
+ let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let segment_meta = SegmentMeta::new(segment_id, num_docs);
- let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
+ let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
@@ -494,7 +495,7 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
- pub fn rollback(&mut self) -> Result<()> {
+ pub fn rollback(&mut self) -> Result<Opstamp> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
// marks the segment updater as killed. From now on, all
@@ -529,7 +530,7 @@ impl IndexWriter {
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
- Ok(())
+ Ok(self.committed_opstamp)
}
/// Prepares a commit.
@@ -567,7 +568,7 @@ impl IndexWriter {
info!("Preparing commit");
// this will drop the current document channel
- // and recreate a new one channels.
+ // and recreate a new one.
self.recreate_document_channel();
let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
@@ -601,7 +602,7 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
- pub fn commit(&mut self) -> Result<u64> {
+ pub fn commit(&mut self) -> Result<Opstamp> {
self.prepare_commit()?.commit()
}
@@ -617,7 +618,7 @@ impl IndexWriter {
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
- pub fn delete_term(&mut self, term: Term) -> u64 {
+ pub fn delete_term(&mut self, term: Term) -> Opstamp {
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
self.delete_queue.push(delete_operation);
@@ -631,7 +632,7 @@ impl IndexWriter {
///
/// This is also the opstamp of the commit that is currently
/// available for searchers.
- pub fn commit_opstamp(&self) -> u64 {
+ pub fn commit_opstamp(&self) -> Opstamp {
self.committed_opstamp
}
@@ -645,7 +646,7 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
- pub fn add_document(&mut self, document: Document) -> u64 {
+ pub fn add_document(&mut self, document: Document) -> Opstamp {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]);
@@ -662,7 +663,7 @@ impl IndexWriter {
/// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself.
- fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
+ fn get_batch_opstamps(&mut self, count: Opstamp) -> (Opstamp, Range<Opstamp>) {
let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1;
let stamps = Range {
@@ -688,7 +689,7 @@ impl IndexWriter {
/// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`.
- pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
+ pub fn run(&mut self, user_operations: Vec<UserOperation>) -> Opstamp {
let count = user_operations.len() as u64;
if count == 0 {
return self.stamper.stamp();
diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs
index 9d7bcbc..5bf84f1 100644
--- a/src/indexer/merge_operation.rs
+++ b/src/indexer/merge_operation.rs
@@ -1,4 +1,5 @@
use census::{Inventory, TrackedObject};
+use indexer::Opstamp;
use std::collections::HashSet;
use SegmentId;
@@ -35,14 +36,14 @@ pub struct MergeOperation {
}
struct InnerMergeOperation {
- target_opstamp: u64,
+ target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
}
impl MergeOperation {
pub fn new(
inventory: &MergeOperationInventory,
- target_opstamp: u64,
+ target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
@@ -54,7 +55,7 @@ impl MergeOperation {
}
}
- pub fn target_opstamp(&self) -> u64 {
+ pub fn target_opstamp(&self) -> Opstamp {
self.inner.target_opstamp
}
diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs
index 0620230..9c154c6 100644
--- a/src/indexer/mod.rs
+++ b/src/indexer/mod.rs
@@ -25,6 +25,7 @@ pub use self::segment_entry::SegmentEntry;
pub use self::segment_manager::SegmentManager;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
+pub use self::stamper::Opstamp;
/// Alias for the default merge policy, which is the `LogMergePolicy`.
pub type DefaultMergePolicy = LogMergePolicy;
diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs
index fe57a4a..e5beca2 100644
--- a/src/indexer/operation.rs
+++ b/src/indexer/operation.rs
@@ -1,17 +1,18 @@
+use indexer::Opstamp;
use schema::Document;
use schema::Term;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
- pub opstamp: u64,
+ pub opstamp: Opstamp,
pub term: Term,
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
- pub opstamp: u64,
+ pub opstamp: Opstamp,
pub document: Document,
}
diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs
index 4728af0..77ea073 100644
--- a/src/indexer/prepared_commit.rs
+++ b/src/indexer/prepared_commit.rs
@@ -1,15 +1,16 @@
use super::IndexWriter;
+use indexer::Opstamp;
use Result;
/// A prepared commit
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
- opstamp: u64,
+ opstamp: Opstamp,
}
impl<'a> PreparedCommit<'a> {
- pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
+ pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
@@ -17,7 +18,7 @@ impl<'a> PreparedCommit<'a> {
}
}
- pub fn opstamp(&self) -> u64 {
+ pub fn opstamp(&self) -> Opstamp {
self.opstamp
}
@@ -25,11 +26,11 @@ impl<'a> PreparedCommit<'a> {
self.payload = Some(payload.to_string())
}
- pub fn abort(self) -> Result<()> {
+ pub fn abort(self) -> Result<Opstamp> {
self.index_writer.rollback()
}
- pub fn commit(self) -> Result<u64> {
+ pub fn commit(self) -> Result<Opstamp> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs
index 316dbd6..e0168a7 100644
--- a/src/indexer/segment_updater.rs
+++ b/src/indexer/segment_updater.rs
@@ -20,6 +20,7 @@ use indexer::merge_operation::MergeOperationInventory;
use indexer::merger::IndexMerger;
use indexer::stamper::Stamper;
use indexer::MergeOperation;
+use indexer::Opstamp;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use indexer::{DefaultMergePolicy, MergePolicy};
@@ -224,7 +225,7 @@ impl SegmentUpdater {
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
- fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
+ fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
@@ -233,7 +234,7 @@ impl SegmentUpdater {
Ok(segment_entries)
}
- pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
+ pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
@@ -280,7 +281,7 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
- pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
+ pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs
index 4eb42c8..e533bdc 100644
--- a/src/indexer/segment_writer.rs
+++ b/src/indexer/segment_writer.rs
@@ -4,6 +4,7 @@ use core::SerializableSegment;
use fastfield::FastFieldsWriter;
use fieldnorm::FieldNormsWriter;
use indexer::segment_serializer::SegmentSerializer;
+use indexer::Opstamp;
use postings::MultiFieldPostingsWriter;
use schema::FieldType;
use schema::Schema;
@@ -28,7 +29,7 @@ pub struct SegmentWriter {
segment_serializer: SegmentSerializer,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter,
- doc_opstamps: Vec<u64>,
+ doc_opstamps: Vec<Opstamp>,
tokenizers: Vec<Option<Box<BoxedTokenizer>>>,
}
diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs
index 6cd1c49..65dba3a 100644
--- a/src/indexer/stamper.rs
+++ b/src/indexer/stamper.rs
@@ -2,24 +2,32 @@ use std::ops::Range;
use std::sync::atomic::Ordering;
use std::sync::Arc;
+pub type Opstamp = u64;
+
// AtomicU64 have not landed in stable.
// For the moment let's just use AtomicUsize on
// x86/64 bit platform, and a mutex on other platform.
#[cfg(target_arch = "x86_64")]
mod archicture_impl {
- use std::sync::atomic::{AtomicUsize, Ordering};
+ use indexer::stamper::Opstamp;
+ use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Default)]
- pub struct AtomicU64Ersatz(AtomicUsize);
+ pub struct AtomicU64Ersatz(AtomicU64);
impl AtomicU64Ersatz {
- pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
- AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
+ pub fn new(first_opstamp: Opstamp) -> AtomicU64Ersatz {
+ AtomicU64Ersatz(AtomicU64::new(first_opstamp as u64))
}
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
- self.0.fetch_add(val as usize, order) as u64
+ self.0.fetch_add(val as u64, order) as u64
+ }
+
+ pub fn revert(&self, val: u64, order: Ordering) -> u64 {
+ self.0.store(val, order);
+ val
}
}
}
@@ -27,6 +35,7 @@ mod archicture_impl {
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
+ use indexer::stamper::Opstamp;
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
@@ -35,7 +44,7 @@ mod archicture_impl {
pub struct AtomicU64Ersatz(RwLock<u64>);
impl AtomicU64Ersatz {
- pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
+ pub fn new(first_opstamp: Opstamp) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
@@ -45,6 +54,12 @@ mod archicture_impl {
*lock = previous_val + incr;
previous_val
}
+
+ pub fn revert(&self, val: u64, _order: Ordering) -> u64 {
+ let mut lock = self.0.write().unwrap();
+ *lock = val;
+ val
+ }
}
}
@@ -54,23 +69,27 @@ use self::archicture_impl::AtomicU64Ersatz;
pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper {
- pub fn new(first_opstamp: u64) -> Stamper {
+ pub fn new(first_opstamp: Opstamp) -> Stamper {
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
}
- pub fn stamp(&self) -> u64 {
+ pub fn stamp(&self) -> Opstamp {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}
/// Given a desired count `n`, `stamps` returns an iterator that
/// will supply `n` number of u64 stamps.
- pub fn stamps(&self, n: u64) -> Range<u64> {
+ pub fn stamps(&self, n: u64) -> Range<Opstamp> {
let start = self.0.fetch_add(n, Ordering::SeqCst);
Range {
start,
end: start + n,
}
}
+
+ pub fn revert(&mut self, to_opstamp: Opstamp) -> Opstamp {
+ self.0.revert(to_opstamp, Ordering::SeqCst)
+ }
}
#[cfg(test)]
@@ -92,4 +111,18 @@ mod test {
assert_eq!(stamper.stamps(3u64), (12..15));
assert_eq!(stamper.stamp(), 15u64);
}
+
+ #[test]
+ fn test_stamper_revert() {
+ let mut stamper = Stamper::new(7u64);
+ assert_eq!(stamper.stamp(), 7u64);
+ assert_eq!(stamper.stamp(), 8u64);
+
+ let stamper_clone = stamper.clone();
+ assert_eq!(stamper_clone.stamp(), 9u64);
+
+ stamper.revert(6);
+ assert_eq!(stamper.stamp(), 6);
+ assert_eq!(stamper_clone.stamp(), 7);
+ }
}