summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTW <tw@waldmann-edv.de>2023-02-14 00:01:50 +0100
committerGitHub <noreply@github.com>2023-02-14 00:01:50 +0100
commit3849ebe312e441732382542579f5ed29ac93ed04 (patch)
tree69da330721c652d318fb5d8e4536c5278a388c86
parent6cfe77ebafffe578dad93eb571d420c833ee54e4 (diff)
parent71f8dd3a17f88e8dbb82294fd79ab13bf432bfd0 (diff)
Merge pull request #7349 from ThomasWaldmann/avoid-orphan-content-chunks3
avoid orphan content chunks (master)
-rw-r--r--src/borg/archive.py238
1 files changed, 127 insertions, 111 deletions
diff --git a/src/borg/archive.py b/src/borg/archive.py
index af8698712..0cd81e784 100644
--- a/src/borg/archive.py
+++ b/src/borg/archive.py
@@ -1300,31 +1300,13 @@ class ChunksProcessor:
# to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
if self.rechunkify and "chunks_healthy" in item:
del item.chunks_healthy
- try:
- for chunk in chunk_iter:
- cle = chunk_processor(chunk)
- item.chunks.append(cle)
- self.current_volume += cle[1]
- if show_progress:
- stats.show_progress(item=item, dt=0.2)
- self.maybe_checkpoint(item)
- except BackupOSError:
- # something went wrong (e.g. an I/O error while reading a source file), try to avoid orphan content chunks:
- # case A: "no checkpoint archive has been created yet":
- # we have incref'd (written) some chunks, no commit yet, no file item for these chunks yet.
- # -> item.chunks has a list of orphaned content chunks, we need to decref them.
- # case B: "some checkpoint archives have been created already":
- # at the time we commit them, everything is fine and consistent:
- # we have incref'd (written) some chunks, created a part file item referencing them, committed.
- # directly after commit, we have removed the part file item, but kept chunks in the repo, kept refcounts.
- # maybe we have incref'd (written) some more chunks after the commit, no file item for these chunks yet.
- # -> item.chunks has a list of orphaned content chunks, we need to decref them.
- # So, cases A and B need same treatment.
- for chunk in item.chunks:
- cache.chunk_decref(chunk.id, stats, wait=False)
- # now that we have cleaned up the chunk references, we can re-raise the exception
- # this will skip THIS processing of this file, but continue with the next one.
- raise
+ for chunk in chunk_iter:
+ chunk_entry = chunk_processor(chunk)
+ item.chunks.append(chunk_entry)
+ self.current_volume += chunk_entry[1]
+ if show_progress:
+ stats.show_progress(item=item, dt=0.2)
+ self.maybe_checkpoint(item)
class FilesystemObjectProcessors:
@@ -1365,6 +1347,7 @@ class FilesystemObjectProcessors:
safe_path = make_path_safe(path)
item = Item(path=safe_path)
hardlinked = hardlinkable and st.st_nlink > 1
+ hl_chunks = None
update_map = False
if hardlinked:
status = "h" # hardlink
@@ -1373,9 +1356,9 @@ class FilesystemObjectProcessors:
if chunks is nothing:
update_map = True
elif chunks is not None:
- item.chunks = chunks
+ hl_chunks = chunks
item.hlid = self.hlm.hardlink_id_from_inode(ino=st.st_ino, dev=st.st_dev)
- yield item, status, hardlinked
+ yield item, status, hardlinked, hl_chunks
self.add_item(item, stats=self.stats)
if update_map:
# remember the hlid of this fs object and if the item has chunks,
@@ -1384,12 +1367,12 @@ class FilesystemObjectProcessors:
self.hlm.remember(id=(st.st_ino, st.st_dev), info=chunks)
def process_dir_with_fd(self, *, path, fd, st):
- with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked):
+ with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked, hl_chunks):
item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
return status
def process_dir(self, *, path, parent_fd, name, st):
- with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked):
+ with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked, hl_chunks):
with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir, noatime=True, op="dir_open") as fd:
# fd is None for directories on windows, in that case a race condition check is not possible.
if fd is not None:
@@ -1399,7 +1382,7 @@ class FilesystemObjectProcessors:
return status
def process_fifo(self, *, path, parent_fd, name, st):
- with self.create_helper(path, st, "f") as (item, status, hardlinked): # fifo
+ with self.create_helper(path, st, "f") as (item, status, hardlinked, hl_chunks): # fifo
with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd:
with backup_io("fstat"):
st = stat_update_check(st, os.fstat(fd))
@@ -1407,7 +1390,7 @@ class FilesystemObjectProcessors:
return status
def process_dev(self, *, path, parent_fd, name, st, dev_type):
- with self.create_helper(path, st, dev_type) as (item, status, hardlinked): # char/block device
+ with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hl_chunks): # char/block device
# looks like we can not work fd-based here without causing issues when trying to open/close the device
with backup_io("stat"):
st = stat_update_check(st, os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False))
@@ -1416,7 +1399,7 @@ class FilesystemObjectProcessors:
return status
def process_symlink(self, *, path, parent_fd, name, st):
- with self.create_helper(path, st, "s", hardlinkable=True) as (item, status, hardlinked):
+ with self.create_helper(path, st, "s", hardlinkable=True) as (item, status, hardlinked, hl_chunks):
fname = name if name is not None and parent_fd is not None else path
with backup_io("readlink"):
target = os.readlink(fname, dir_fd=parent_fd)
@@ -1450,14 +1433,23 @@ class FilesystemObjectProcessors:
item.uid = uid
if gid is not None:
item.gid = gid
- self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
- item.get_size(memorize=True)
- self.stats.nfiles += 1
- self.add_item(item, stats=self.stats)
- return status
+ try:
+ self.process_file_chunks(
+ item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
+ )
+ except BackupOSError:
+ # see comments in process_file's exception handler, same issue here.
+ for chunk in item.get("chunks", []):
+ cache.chunk_decref(chunk.id, self.stats, wait=False)
+ raise
+ else:
+ item.get_size(memorize=True)
+ self.stats.nfiles += 1
+ self.add_item(item, stats=self.stats)
+ return status
def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
- with self.create_helper(path, st, None) as (item, status, hardlinked): # no status yet
+ with self.create_helper(path, st, None) as (item, status, hardlinked, hl_chunks): # no status yet
with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
with backup_io("fstat"):
st = stat_update_check(st, os.fstat(fd))
@@ -1468,72 +1460,90 @@ class FilesystemObjectProcessors:
# so it can be extracted / accessed in FUSE mount like a regular file.
# this needs to be done early, so that part files also get the patched mode.
item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
- if "chunks" in item: # create_helper might have put chunks from a previous hardlink there
- [cache.chunk_incref(id_, self.stats) for id_, _ in item.chunks]
- else: # normal case, no "2nd+" hardlink
- if not is_special_file:
- hashed_path = safe_encode(os.path.join(self.cwd, path))
- started_hashing = time.monotonic()
- path_hash = self.key.id_hash(hashed_path)
- self.stats.hashing_time += time.monotonic() - started_hashing
- known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
- else:
- # in --read-special mode, we may be called for special files.
- # there should be no information in the cache about special files processed in
- # read-special mode, but we better play safe as this was wrong in the past:
- hashed_path = path_hash = None
- known, ids = False, None
- chunks = None
- if ids is not None:
- # Make sure all ids are available
- for id_ in ids:
- if not cache.seen_chunk(id_):
- status = (
- "M" # cache said it is unmodified, but we lost a chunk: process file like modified
- )
- break
+ # we begin processing chunks now (writing or incref'ing them to the repository),
+ # which might require cleanup (see except-branch):
+ try:
+ if hl_chunks is not None: # create_helper gave us chunks from a previous hardlink
+ item.chunks = []
+ for chunk_id, chunk_size in hl_chunks:
+ # process one-by-one, so we will know in item.chunks how far we got
+ chunk_entry = cache.chunk_incref(chunk_id, self.stats)
+ item.chunks.append(chunk_entry)
+ else: # normal case, no "2nd+" hardlink
+ if not is_special_file:
+ hashed_path = safe_encode(os.path.join(self.cwd, path))
+ started_hashing = time.monotonic()
+ path_hash = self.key.id_hash(hashed_path)
+ self.stats.hashing_time += time.monotonic() - started_hashing
+ known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
else:
- chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
- status = "U" # regular file, unchanged
- else:
- status = "M" if known else "A" # regular file, modified or added
- self.print_file_status(status, path)
- self.stats.files_stats[status] += 1
- status = None # we already printed the status
- # Only chunkify the file if needed
- if chunks is not None:
- item.chunks = chunks
- else:
- with backup_io("read"):
- self.process_file_chunks(
- item,
- cache,
- self.stats,
- self.show_progress,
- backup_io_iter(self.chunker.chunkify(None, fd)),
- )
- self.stats.chunking_time = self.chunker.chunking_time
- if is_win32:
- changed_while_backup = False # TODO
+ # in --read-special mode, we may be called for special files.
+ # there should be no information in the cache about special files processed in
+ # read-special mode, but we better play safe as this was wrong in the past:
+ hashed_path = path_hash = None
+ known, ids = False, None
+ if ids is not None:
+ # Make sure all ids are available
+ for id_ in ids:
+ if not cache.seen_chunk(id_):
+ # cache said it is unmodified, but we lost a chunk: process file like modified
+ status = "M"
+ break
+ else:
+ item.chunks = []
+ for chunk_id in ids:
+ # process one-by-one, so we will know in item.chunks how far we got
+ chunk_entry = cache.chunk_incref(chunk_id, self.stats)
+ item.chunks.append(chunk_entry)
+ status = "U" # regular file, unchanged
else:
- with backup_io("fstat2"):
- st2 = os.fstat(fd)
- # special files:
- # - fifos change naturally, because they are fed from the other side. no problem.
- # - blk/chr devices don't change ctime anyway.
- changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
- if changed_while_backup:
- status = "C" # regular file changed while we backed it up, might be inconsistent/corrupt!
- if not is_special_file and not changed_while_backup:
- # we must not memorize special files, because the contents of e.g. a
- # block or char device will change without its mtime/size/inode changing.
- # also, we must not memorize a potentially inconsistent/corrupt file that
- # changed while we backed it up.
- cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
- self.stats.nfiles += 1
- item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
- item.get_size(memorize=True)
- return status
+ status = "M" if known else "A" # regular file, modified or added
+ self.print_file_status(status, path)
+ self.stats.files_stats[status] += 1
+ status = None # we already printed the status
+ # Only chunkify the file if needed
+ if "chunks" not in item:
+ with backup_io("read"):
+ self.process_file_chunks(
+ item,
+ cache,
+ self.stats,
+ self.show_progress,
+ backup_io_iter(self.chunker.chunkify(None, fd)),
+ )
+ self.stats.chunking_time = self.chunker.chunking_time
+ if is_win32:
+ changed_while_backup = False # TODO
+ else:
+ with backup_io("fstat2"):
+ st2 = os.fstat(fd)
+ # special files:
+ # - fifos change naturally, because they are fed from the other side. no problem.
+ # - blk/chr devices don't change ctime anyway.
+ changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
+ if changed_while_backup:
+ # regular file changed while we backed it up, might be inconsistent/corrupt!
+ status = "C"
+ if not is_special_file and not changed_while_backup:
+ # we must not memorize special files, because the contents of e.g. a
+ # block or char device will change without its mtime/size/inode changing.
+ # also, we must not memorize a potentially inconsistent/corrupt file that
+ # changed while we backed it up.
+ cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
+ self.stats.nfiles += 1
+ item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
+ item.get_size(memorize=True)
+ return status
+ except BackupOSError:
+ # Something went wrong and we might need to clean up a bit.
+ # Maybe we have already incref'ed some file content chunks in the repo -
+ # but we will not add an item (see add_item in create_helper) and thus
+ # they would be orphaned chunks in case that we commit the transaction.
+ for chunk in item.get("chunks", []):
+ cache.chunk_decref(chunk.id, self.stats, wait=False)
+ # Now that we have cleaned up the chunk references, we can re-raise the exception.
+ # This will skip processing of this file, but might retry or continue with the next one.
+ raise
class TarfileObjectProcessors:
@@ -1628,15 +1638,21 @@ class TarfileObjectProcessors:
with self.create_helper(tarinfo, status, type) as (item, status):
self.print_file_status(status, tarinfo.name)
status = None # we already printed the status
- fd = tar.extractfile(tarinfo)
- self.process_file_chunks(
- item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
- )
- item.get_size(memorize=True, from_chunks=True)
- self.stats.nfiles += 1
- # we need to remember ALL files, see HardLinkManager.__doc__
- self.hlm.remember(id=tarinfo.name, info=item.chunks)
- return status
+ try:
+ fd = tar.extractfile(tarinfo)
+ self.process_file_chunks(
+ item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
+ )
+ item.get_size(memorize=True, from_chunks=True)
+ self.stats.nfiles += 1
+ # we need to remember ALL files, see HardLinkManager.__doc__
+ self.hlm.remember(id=tarinfo.name, info=item.chunks)
+ return status
+ except BackupOSError:
+ # see comment in FilesystemObjectProcessors.process_file, same issue here.
+ for chunk in item.get("chunks", []):
+ self.cache.chunk_decref(chunk.id, self.stats, wait=False)
+ raise
def valid_msgpacked_dict(d, keys_serialized):