summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul Masurel <paul.masurel@gmail.com>2019-07-17 13:20:02 +0900
committerGitHub <noreply@github.com>2019-07-17 13:20:02 +0900
commitaf7ea1422a887177b2febf37e5fbdb61e9d499c1 (patch)
treeef611c6ee6f87e13222126355964552d6129d2fa
parent498057c5b7ad87e5e2d81b81dbd81bdd9479cb70 (diff)
using smallvec for operation batches (#599)
-rw-r--r--Cargo.toml1
-rw-r--r--src/indexer/index_writer.rs19
2 files changed, 15 insertions, 5 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 746cc83..fbe329b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,6 +53,7 @@ fail = "0.2"
scoped-pool = "1.0"
murmurhash32 = "0.2"
chrono = "0.4"
+smallvec = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs
index 99eccc5..fee0731 100644
--- a/src/indexer/index_writer.rs
+++ b/src/indexer/index_writer.rs
@@ -26,6 +26,8 @@ use crate::Result;
use bit_set::BitSet;
use crossbeam::channel;
use futures::{Canceled, Future};
+use smallvec::smallvec;
+use smallvec::SmallVec;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
@@ -44,8 +46,15 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
-type OperationSender = channel::Sender<Vec<AddOperation>>;
-type OperationReceiver = channel::Receiver<Vec<AddOperation>>;
+// Group of operations.
+// Most of the time, users will send operation one-by-one, but it can be useful to
+// send them as a small block to ensure that
+// - all docs in the operation will happen on the same segment and continuous docids.
+// - all operations in the group are committed at the same time, making the group
+// atomic.
+type OperationGroup = SmallVec<[AddOperation; 4]>;
+type OperationSender = channel::Sender<OperationGroup>;
+type OperationReceiver = channel::Receiver<OperationGroup>;
/// `IndexWriter` is the user entry-point to add document to an index.
///
@@ -236,7 +245,7 @@ pub fn advance_deletes(
fn index_documents(
memory_budget: usize,
segment: &Segment,
- grouped_document_iterator: &mut dyn Iterator<Item = Vec<AddOperation>>,
+ grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> Result<bool> {
@@ -667,7 +676,7 @@ impl IndexWriter {
pub fn add_document(&self, document: Document) -> Opstamp {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
- let send_result = self.operation_sender.send(vec![add_operation]);
+ let send_result = self.operation_sender.send(smallvec![add_operation]);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
}
@@ -714,7 +723,7 @@ impl IndexWriter {
}
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
- let mut adds: Vec<AddOperation> = Vec::new();
+ let mut adds = OperationGroup::default();
for (user_op, opstamp) in user_operations.into_iter().zip(stamps) {
match user_op {