diff options
author | Paul Masurel <paul.masurel@gmail.com> | 2019-07-17 13:20:02 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-07-17 13:20:02 +0900 |
commit | af7ea1422a887177b2febf37e5fbdb61e9d499c1 (patch) | |
tree | ef611c6ee6f87e13222126355964552d6129d2fa | |
parent | 498057c5b7ad87e5e2d81b81dbd81bdd9479cb70 (diff) |
using smallvec for operation batches (#599)
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/indexer/index_writer.rs | 19 |
2 files changed, 15 insertions, 5 deletions
@@ -53,6 +53,7 @@ fail = "0.2" scoped-pool = "1.0" murmurhash32 = "0.2" chrono = "0.4" +smallvec = "0.6" [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 99eccc5..fee0731 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -26,6 +26,8 @@ use crate::Result; use bit_set::BitSet; use crossbeam::channel; use futures::{Canceled, Future}; +use smallvec::smallvec; +use smallvec::SmallVec; use std::mem; use std::ops::Range; use std::sync::Arc; @@ -44,8 +46,15 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; // reaches `PIPELINE_MAX_SIZE_IN_DOCS` const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -type OperationSender = channel::Sender<Vec<AddOperation>>; -type OperationReceiver = channel::Receiver<Vec<AddOperation>>; +// Group of operations. +// Most of the time, users will send operation one-by-one, but it can be useful to +// send them as a small block to ensure that +// - all docs in the operation will happen on the same segment and continuous docids. +// - all operations in the group are committed at the same time, making the group +// atomic. +type OperationGroup = SmallVec<[AddOperation; 4]>; +type OperationSender = channel::Sender<OperationGroup>; +type OperationReceiver = channel::Receiver<OperationGroup>; /// `IndexWriter` is the user entry-point to add document to an index. /// @@ -236,7 +245,7 @@ pub fn advance_deletes( fn index_documents( memory_budget: usize, segment: &Segment, - grouped_document_iterator: &mut dyn Iterator<Item = Vec<AddOperation>>, + grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor, ) -> Result<bool> { @@ -667,7 +676,7 @@ impl IndexWriter { pub fn add_document(&self, document: Document) -> Opstamp { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; - let send_result = self.operation_sender.send(vec![add_operation]); + let send_result = self.operation_sender.send(smallvec![add_operation]); if let Err(e) = send_result { panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); } @@ -714,7 +723,7 @@ impl IndexWriter { } let (batch_opstamp, stamps) = self.get_batch_opstamps(count); - let mut adds: Vec<AddOperation> = Vec::new(); + let mut adds = OperationGroup::default(); for (user_op, opstamp) in user_operations.into_iter().zip(stamps) { match user_op { |