summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfdb-hiroshima <35889323+fdb-hiroshima@users.noreply.github.com>2019-09-18 11:26:25 +0200
committerPaul Masurel <paul.masurel@gmail.com>2019-09-18 18:26:25 +0900
commitd8894f0bd2e4a40bc48eaa27822136d2d29361f2 (patch)
tree929481b7f18b7366a5f103313ec1a16617d31804
parent7e08e0047bf842d2ce0f9cd7791451bffb85e2f6 (diff)
add checksum check in ManagedDirectory (#605)
* add checksum check in ManagedDirectory fix #400 * flush after writing checksum * don't checksum atomic file access and clone managed_paths * implement a footer storing metadata about a file this is more of a poc, it require some refactoring into multiple files `terminate(self)` is implemented, but not used anywhere yet * address comments and simplify things with new contract use BitOrder for integer to raw byte conversion consider atomic write imply atomic read, which might not actually be true use some indirection to have a boxable terminating writer * implement TerminatingWrite and make terminate() be called where it should add dependancy to drop_bomb to help find where terminate() should be called implement TerminatingWrite for wrapper writers make tests pass /!\ some tests seems to pass where they shouldn't * remove usage of drop_bomb * fmt * add test for checksum * address some review comments * update changelog * fmt
-rw-r--r--CHANGELOG.md1
-rw-r--r--Cargo.toml1
-rw-r--r--src/common/composite_file.rs7
-rw-r--r--src/common/counting_writer.rs9
-rw-r--r--src/core/index.rs8
-rw-r--r--src/directory/directory.rs4
-rw-r--r--src/directory/footer.rs213
-rw-r--r--src/directory/managed_directory.rs95
-rw-r--r--src/directory/mmap_directory.rs9
-rw-r--r--src/directory/mod.rs36
-rw-r--r--src/directory/ram_directory.rs9
-rw-r--r--src/indexer/index_writer.rs2
-rw-r--r--src/store/writer.rs3
-rw-r--r--tests/failpoints/mod.rs4
14 files changed, 381 insertions, 20 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc94b20..cf07ff4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ Tantivy 0.11.0
- Closes #498 - add support for Elastic-style unbounded range queries for alphanumeric types eg. "title:>hello", "weight:>=70.5", "height:<200" (@petr-tik)
- API change around `Box<BoxableTokenizer>`. See detail in #629
- Avoid rebuilding Regex automaton whenever a regex query is reused. #630 (@brainlock)
+- Add footer with some metadata to index files. #605 (@fdb-hiroshima)
## How to update?
diff --git a/Cargo.toml b/Cargo.toml
index e05fafa..6528cf6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,7 @@ edition = "2018"
[dependencies]
base64 = "0.10.0"
byteorder = "1.0"
+crc32fast = "1.2.0"
once_cell = "1.0"
regex ={version = "1.3.0", default-features = false, features = ["std"]}
tantivy-fst = "0.1"
diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs
index f2c2d22..f2d2926 100644
--- a/src/common/composite_file.rs
+++ b/src/common/composite_file.rs
@@ -2,7 +2,7 @@ use crate::common::BinarySerializable;
use crate::common::CountingWriter;
use crate::common::VInt;
use crate::directory::ReadOnlySource;
-use crate::directory::WritePtr;
+use crate::directory::{TerminatingWrite, WritePtr};
use crate::schema::Field;
use crate::space_usage::FieldUsage;
use crate::space_usage::PerFieldSpaceUsage;
@@ -42,7 +42,7 @@ pub struct CompositeWrite<W = WritePtr> {
offsets: HashMap<FileAddr, u64>,
}
-impl<W: Write> CompositeWrite<W> {
+impl<W: TerminatingWrite + Write> CompositeWrite<W> {
/// Crate a new API writer that writes a composite file
/// in a given write.
pub fn wrap(w: W) -> CompositeWrite<W> {
@@ -91,8 +91,7 @@ impl<W: Write> CompositeWrite<W> {
let footer_len = (self.write.written_bytes() - footer_offset) as u32;
footer_len.serialize(&mut self.write)?;
- self.write.flush()?;
- Ok(())
+ self.write.terminate()
}
}
diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs
index 339c60b..8293ba8 100644
--- a/src/common/counting_writer.rs
+++ b/src/common/counting_writer.rs
@@ -1,3 +1,5 @@
+use crate::directory::AntiCallToken;
+use crate::directory::TerminatingWrite;
use std::io;
use std::io::Write;
@@ -42,6 +44,13 @@ impl<W: Write> Write for CountingWriter<W> {
}
}
+impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
+ fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
+ self.flush()?;
+ self.underlying.terminate_ref(token)
+ }
+}
+
#[cfg(test)]
mod test {
diff --git a/src/core/index.rs b/src/core/index.rs
index 50d184d..ac0bea3 100644
--- a/src/core/index.rs
+++ b/src/core/index.rs
@@ -26,9 +26,10 @@ use crate::IndexWriter;
use crate::Result;
use num_cpus;
use std::borrow::BorrowMut;
+use std::collections::HashSet;
use std::fmt;
#[cfg(feature = "mmap")]
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
fn load_metas(directory: &dyn Directory, inventory: &SegmentMetaInventory) -> Result<IndexMeta> {
@@ -368,6 +369,11 @@ impl Index {
.map(SegmentMeta::id)
.collect())
}
+
+ /// Returns the set of corrupted files
+ pub fn validate_checksum(&self) -> Result<HashSet<PathBuf>> {
+ self.directory.list_damaged().map_err(Into::into)
+ }
}
impl fmt::Debug for Index {
diff --git a/src/directory/directory.rs b/src/directory/directory.rs
index 9da1cb2..6b50925 100644
--- a/src/directory/directory.rs
+++ b/src/directory/directory.rs
@@ -118,6 +118,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
///
/// Specifically, subsequent writes or flushes should
/// have no effect on the returned `ReadOnlySource` object.
+ ///
+ /// You should only use this to read files create with [`open_write`]
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
/// Removes a file
@@ -157,6 +159,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// atomic_write.
///
/// This should only be used for small files.
+ ///
+ /// You should only use this to read files create with [`atomic_write`]
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
/// Atomically replace the content of a file with data.
diff --git a/src/directory/footer.rs b/src/directory/footer.rs
new file mode 100644
index 0000000..bc4601e
--- /dev/null
+++ b/src/directory/footer.rs
@@ -0,0 +1,213 @@
+use crate::directory::read_only_source::ReadOnlySource;
+use crate::directory::{AntiCallToken, TerminatingWrite};
+use byteorder::{ByteOrder, LittleEndian};
+use crc32fast::Hasher;
+use std::io;
+use std::io::Write;
+
+const COMMON_FOOTER_SIZE: usize = 4 * 5;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Footer {
+ pub tantivy_version: (u32, u32, u32),
+ pub meta: String,
+ pub versioned_footer: VersionedFooter,
+}
+
+impl Footer {
+ pub fn new(versioned_footer: VersionedFooter) -> Self {
+ let tantivy_version = (
+ env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
+ env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
+ env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
+ );
+ Footer {
+ tantivy_version,
+ meta: format!(
+ "tantivy {}.{}.{}, index v{}",
+ tantivy_version.0,
+ tantivy_version.1,
+ tantivy_version.2,
+ versioned_footer.version()
+ ),
+ versioned_footer,
+ }
+ }
+
+ pub fn to_bytes(&self) -> Vec<u8> {
+ let mut res = self.versioned_footer.to_bytes();
+ res.extend_from_slice(self.meta.as_bytes());
+ let len = res.len();
+ res.resize(len + COMMON_FOOTER_SIZE, 0);
+ let mut common_footer = &mut res[len..];
+ LittleEndian::write_u32(&mut common_footer, self.meta.len() as u32);
+ LittleEndian::write_u32(&mut common_footer[4..], self.tantivy_version.0);
+ LittleEndian::write_u32(&mut common_footer[8..], self.tantivy_version.1);
+ LittleEndian::write_u32(&mut common_footer[12..], self.tantivy_version.2);
+ LittleEndian::write_u32(&mut common_footer[16..], (len + COMMON_FOOTER_SIZE) as u32);
+ res
+ }
+
+ pub fn from_bytes(data: &[u8]) -> Result<Self, io::Error> {
+ let len = data.len();
+ if len < COMMON_FOOTER_SIZE + 4 {
+ // 4 bytes for index version, stored in versioned footer
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!("File corrupted. The footer len must be over 24, while the entire file len is {}", len)
+ )
+ );
+ }
+
+ let size = LittleEndian::read_u32(&data[len - 4..]) as usize;
+ if len < size as usize {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!(
+ "File corrupted. The footer len is {}, while the entire file len is {}",
+ size, len
+ ),
+ ));
+ }
+ let footer = &data[len - size as usize..];
+ let meta_len = LittleEndian::read_u32(&footer[size - 20..]) as usize;
+ let tantivy_major = LittleEndian::read_u32(&footer[size - 16..]);
+ let tantivy_minor = LittleEndian::read_u32(&footer[size - 12..]);
+ let tantivy_patch = LittleEndian::read_u32(&footer[size - 8..]);
+ Ok(Footer {
+ tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch),
+ meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(),
+ versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])?,
+ })
+ }
+
+ pub fn extract_footer(source: ReadOnlySource) -> Result<(Footer, ReadOnlySource), io::Error> {
+ let footer = Footer::from_bytes(source.as_slice())?;
+ let reader = source.slice_to(source.as_slice().len() - footer.size());
+ Ok((footer, reader))
+ }
+
+ pub fn size(&self) -> usize {
+ self.versioned_footer.size() as usize + self.meta.len() + 20
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum VersionedFooter {
+ UnknownVersion { version: u32, size: u32 },
+ V0(u32), // crc
+}
+
+impl VersionedFooter {
+ pub fn to_bytes(&self) -> Vec<u8> {
+ match self {
+ Self::V0(crc) => {
+ let mut res = vec![0; 8];
+ LittleEndian::write_u32(&mut res, 0);
+ LittleEndian::write_u32(&mut res[4..], *crc);
+ res
+ }
+ Self::UnknownVersion { .. } => {
+ panic!("Unsupported index should never get serialized");
+ }
+ }
+ }
+
+ pub fn from_bytes(footer: &[u8]) -> Result<Self, io::Error> {
+ assert!(footer.len() >= 4);
+ let version = LittleEndian::read_u32(footer);
+ match version {
+ 0 => {
+ if footer.len() == 8 {
+ Ok(Self::V0(LittleEndian::read_u32(&footer[4..])))
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!(
+ "File corrupted. The versioned footer len is {}, while it should be 8",
+ footer.len()
+ ),
+ ))
+ }
+ }
+ version => Ok(Self::UnknownVersion {
+ version,
+ size: footer.len() as u32,
+ }),
+ }
+ }
+
+ pub fn size(&self) -> u32 {
+ match self {
+ Self::V0(_) => 8,
+ Self::UnknownVersion { size, .. } => *size,
+ }
+ }
+
+ pub fn version(&self) -> u32 {
+ match self {
+ Self::V0(_) => 0,
+ Self::UnknownVersion { version, .. } => *version,
+ }
+ }
+
+ pub fn crc(&self) -> Option<u32> {
+ match self {
+ Self::V0(crc) => Some(*crc),
+ Self::UnknownVersion { .. } => None,
+ }
+ }
+}
+
+pub(crate) struct FooterProxy<W: TerminatingWrite> {
+ /// always Some except after terminate call
+ hasher: Option<Hasher>,
+ /// always Some except after terminate call
+ writer: Option<W>,
+}
+
+impl<W: TerminatingWrite> FooterProxy<W> {
+ pub fn new(writer: W) -> Self {
+ FooterProxy {
+ hasher: Some(Hasher::new()),
+ writer: Some(writer),
+ }
+ }
+}
+
+impl<W: TerminatingWrite> Write for FooterProxy<W> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ let count = self.writer.as_mut().unwrap().write(buf)?;
+ self.hasher.as_mut().unwrap().update(&buf[..count]);
+ Ok(count)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.writer.as_mut().unwrap().flush()
+ }
+}
+
+impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
+ fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
+ let crc = self.hasher.take().unwrap().finalize();
+
+ let footer = Footer::new(VersionedFooter::V0(crc)).to_bytes();
+ let mut writer = self.writer.take().unwrap();
+ writer.write_all(&footer)?;
+ writer.terminate()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::directory::footer::{Footer, VersionedFooter};
+
+ #[test]
+ fn test_serialize_deserialize_footer() {
+ let crc = 123456;
+ let footer = Footer::new(VersionedFooter::V0(crc));
+ let footer_bytes = footer.to_bytes();
+
+ assert_eq!(Footer::from_bytes(&footer_bytes).unwrap(), footer);
+ }
+}
diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs
index 859e66d..f72668f 100644
--- a/src/directory/managed_directory.rs
+++ b/src/directory/managed_directory.rs
@@ -1,5 +1,6 @@
use crate::core::MANAGED_FILEPATH;
use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
+use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::Lock;
use crate::directory::META_LOCK;
@@ -8,6 +9,7 @@ use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
use crate::Directory;
use crate::Result;
+use crc32fast::Hasher;
use serde_json;
use std::collections::HashSet;
use std::io;
@@ -207,17 +209,59 @@ impl ManagedDirectory {
}
Ok(())
}
+
+ /// Verify checksum of a managed file
+ pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
+ let reader = self.directory.open_read(path)?;
+ let (footer, data) = Footer::extract_footer(reader)
+ .map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
+ let mut hasher = Hasher::new();
+ hasher.update(data.as_slice());
+ let crc = hasher.finalize();
+ Ok(footer
+ .versioned_footer
+ .crc()
+ .map(|v| v == crc)
+ .unwrap_or(false))
+ }
+
+ /// List files for which checksum does not match content
+ pub fn list_damaged(&self) -> result::Result<HashSet<PathBuf>, OpenReadError> {
+ let mut hashset = HashSet::new();
+ let managed_paths = self
+ .meta_informations
+ .read()
+ .expect("Managed directory rlock poisoned in list damaged.")
+ .managed_paths
+ .clone();
+
+ for path in managed_paths.into_iter() {
+ if !self.validate_checksum(&path)? {
+ hashset.insert(path);
+ }
+ }
+ Ok(hashset)
+ }
}
impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
- self.directory.open_read(path)
+ let read_only_source = self.directory.open_read(path)?;
+ let (_footer, reader) = Footer::extract_footer(read_only_source)
+ .map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
+ Ok(reader)
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.register_file_as_managed(path)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
- self.directory.open_write(path)
+ Ok(io::BufWriter::new(Box::new(FooterProxy::new(
+ self.directory
+ .open_write(path)?
+ .into_inner()
+ .map_err(|_| ())
+ .expect("buffer should be empty"),
+ ))))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
@@ -259,8 +303,9 @@ impl Clone for ManagedDirectory {
#[cfg(test)]
mod tests_mmap_specific {
- use crate::directory::{Directory, ManagedDirectory, MmapDirectory};
+ use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
use std::collections::HashSet;
+ use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use tempfile::TempDir;
@@ -275,8 +320,8 @@ mod tests_mmap_specific {
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
- let mut write_file = managed_directory.open_write(test_path1).unwrap();
- write_file.flush().unwrap();
+ let write_file = managed_directory.open_write(test_path1).unwrap();
+ write_file.terminate().unwrap();
managed_directory
.atomic_write(test_path2, &[0u8, 1u8])
.unwrap();
@@ -310,9 +355,9 @@ mod tests_mmap_specific {
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
- managed_directory
- .atomic_write(test_path1, &vec![0u8, 1u8])
- .unwrap();
+ let mut write = managed_directory.open_write(test_path1).unwrap();
+ write.write_all(&[0u8, 1u8]).unwrap();
+ write.terminate().unwrap();
assert!(managed_directory.exists(test_path1));
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
@@ -331,4 +376,38 @@ mod tests_mmap_specific {
}
}
+ #[test]
+ fn test_checksum() {
+ let test_path1: &'static Path = Path::new("some_path_for_test");
+ let test_path2: &'static Path = Path::new("other_test_path");
+
+ let tempdir = TempDir::new().unwrap();
+ let tempdir_path = PathBuf::from(tempdir.path());
+
+ let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
+ let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
+ let mut write = managed_directory.open_write(test_path1).unwrap();
+ write.write_all(&[0u8, 1u8]).unwrap();
+ write.terminate().unwrap();
+
+ let mut write = managed_directory.open_write(test_path2).unwrap();
+ write.write_all(&[3u8, 4u8, 5u8]).unwrap();
+ write.terminate().unwrap();
+
+ assert!(managed_directory.list_damaged().unwrap().is_empty());
+
+ let mut corrupted_path = tempdir_path.clone();
+ corrupted_path.push(test_path2);
+ let mut file = OpenOptions::new()
+ .write(true)
+ .open(&corrupted_path)
+ .unwrap();
+ file.write_all(&[255u8]).unwrap();
+ file.flush().unwrap();
+ drop(file);
+
+ let damaged = managed_directory.list_damaged().unwrap();
+ assert_eq!(damaged.len(), 1);
+ assert!(damaged.contains(test_path2));
+ }
}
diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs
index cfb1e87..fc3898c 100644
--- a/src/directory/mmap_directory.rs
+++ b/src/directory/mmap_directory.rs
@@ -11,6 +11,7 @@ use crate::directory::error::{
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
};
use crate::directory::read_only_source::BoxedData;
+use crate::directory::AntiCallToken;
use crate::directory::Directory;
use crate::directory::DirectoryLock;
use crate::directory::Lock;
@@ -18,7 +19,7 @@ use crate::directory::ReadOnlySource;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
-use crate::directory::WritePtr;
+use crate::directory::{TerminatingWrite, WritePtr};
use atomicwrites;
use memmap::Mmap;
use std::collections::HashMap;
@@ -412,6 +413,12 @@ impl Seek for SafeFileWriter {
}
}
+impl TerminatingWrite for SafeFileWriter {
+ fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
+ self.flush()
+ }
+}
+
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
diff --git a/src/directory/mod.rs b/src/directory/mod.rs
index 70fa013..294beb9 100644
--- a/src/directory/mod.rs
+++ b/src/directory/mod.rs
@@ -9,6 +9,7 @@ mod mmap_directory;
mod directory;
mod directory_lock;
+mod footer;
mod managed_directory;
mod ram_directory;
mod read_only_source;
@@ -24,18 +25,49 @@ pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;
pub(crate) use self::watch_event_router::WatchCallbackList;
pub use self::watch_event_router::{WatchCallback, WatchHandle};
-use std::io::{BufWriter, Write};
+use std::io::{self, BufWriter, Write};
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
pub use self::managed_directory::ManagedDirectory;
+/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
+pub struct AntiCallToken(());
+
+/// Trait used to indicate when no more write need to be done on a writer
+pub trait TerminatingWrite: Write {
+ /// Indicate that the writer will no longer be used. Internally call terminate_ref.
+ fn terminate(mut self) -> io::Result<()>
+ where
+ Self: Sized,
+ {
+ self.terminate_ref(AntiCallToken(()))
+ }
+
+ /// You should implement this function to define custom behavior.
+ /// This function should flush any buffer it may hold.
+ fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
+}
+
+impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
+ fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
+ self.as_mut().terminate_ref(token)
+ }
+}
+
+impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
+ fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
+ self.flush()?;
+ self.get_mut().terminate_ref(a)
+ }
+}
+
/// Write object for Directory.
///
/// `WritePtr` are required to implement both Write
/// and Seek.
-pub type WritePtr = BufWriter<Box<dyn Write>>;
+pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;
#[cfg(test)]
mod tests;
diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs
index 8c6d237..db19f98 100644
--- a/src/directory/ram_directory.rs
+++ b/src/directory/ram_directory.rs
@@ -1,8 +1,9 @@
use crate::core::META_FILEPATH;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
+use crate::directory::AntiCallToken;
use crate::directory::WatchCallbackList;
-use crate::directory::WritePtr;
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
+use crate::directory::{TerminatingWrite, WritePtr};
use fail::fail_point;
use std::collections::HashMap;
use std::fmt;
@@ -71,6 +72,12 @@ impl Write for VecWriter {
}
}
+impl TerminatingWrite for VecWriter {
+ fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
+ self.flush()
+ }
+}
+
#[derive(Default)]
struct InnerDirectory {
fs: HashMap<PathBuf, ReadOnlySource>,
diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs
index 917b90f..bf5c436 100644
--- a/src/indexer/index_writer.rs
+++ b/src/indexer/index_writer.rs
@@ -8,6 +8,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentReader;
use crate::directory::DirectoryLock;
+use crate::directory::TerminatingWrite;
use crate::docset::DocSet;
use crate::error::TantivyError;
use crate::fastfield::write_delete_bitset;
@@ -168,6 +169,7 @@ pub(crate) fn advance_deletes(
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
+ delete_file.terminate()?;
}
}
segment_entry.set_meta(segment.meta().clone());
diff --git a/src/store/writer.rs b/src/store/writer.rs
index bcb74f9..5ddda2c 100644
--- a/src/store/writer.rs
+++ b/src/store/writer.rs
@@ -3,6 +3,7 @@ use super::skiplist::SkipListBuilder;
use super::StoreReader;
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
+use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::DocId;
@@ -109,6 +110,6 @@ impl StoreWriter {
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
- self.writer.flush()
+ self.writer.terminate()
}
}
diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs
index 807ca7a..509e375 100644
--- a/tests/failpoints/mod.rs
+++ b/tests/failpoints/mod.rs
@@ -1,7 +1,7 @@
use fail;
use std::io::Write;
use std::path::Path;
-use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory};
+use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
use tantivy::doc;
use tantivy::schema::{Schema, TEXT};
use tantivy::{Index, Term};
@@ -17,7 +17,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
managed_directory
.open_write(test_path)
.unwrap()
- .flush()
+ .terminate()
.unwrap();
assert!(managed_directory.exists(test_path));
// triggering gc and setting the delete operation to fail.