diff options
author | TW <tw@waldmann-edv.de> | 2023-02-14 00:01:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-14 00:01:50 +0100 |
commit | 3849ebe312e441732382542579f5ed29ac93ed04 (patch) | |
tree | 69da330721c652d318fb5d8e4536c5278a388c86 | |
parent | 6cfe77ebafffe578dad93eb571d420c833ee54e4 (diff) | |
parent | 71f8dd3a17f88e8dbb82294fd79ab13bf432bfd0 (diff) |
Merge pull request #7349 from ThomasWaldmann/avoid-orphan-content-chunks3
avoid orphan content chunks (master)
-rw-r--r-- | src/borg/archive.py | 238 |
1 files changed, 127 insertions, 111 deletions
diff --git a/src/borg/archive.py b/src/borg/archive.py index af8698712..0cd81e784 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1300,31 +1300,13 @@ class ChunksProcessor: # to get rid of .chunks_healthy, as it might not correspond to .chunks any more. if self.rechunkify and "chunks_healthy" in item: del item.chunks_healthy - try: - for chunk in chunk_iter: - cle = chunk_processor(chunk) - item.chunks.append(cle) - self.current_volume += cle[1] - if show_progress: - stats.show_progress(item=item, dt=0.2) - self.maybe_checkpoint(item) - except BackupOSError: - # something went wrong (e.g. an I/O error while reading a source file), try to avoid orphan content chunks: - # case A: "no checkpoint archive has been created yet": - # we have incref'd (written) some chunks, no commit yet, no file item for these chunks yet. - # -> item.chunks has a list of orphaned content chunks, we need to decref them. - # case B: "some checkpoint archives have been created already": - # at the time we commit them, everything is fine and consistent: - # we have incref'd (written) some chunks, created a part file item referencing them, committed. - # directly after commit, we have removed the part file item, but kept chunks in the repo, kept refcounts. - # maybe we have incref'd (written) some more chunks after the commit, no file item for these chunks yet. - # -> item.chunks has a list of orphaned content chunks, we need to decref them. - # So, cases A and B need same treatment. - for chunk in item.chunks: - cache.chunk_decref(chunk.id, stats, wait=False) - # now that we have cleaned up the chunk references, we can re-raise the exception - # this will skip THIS processing of this file, but continue with the next one. - raise + for chunk in chunk_iter: + chunk_entry = chunk_processor(chunk) + item.chunks.append(chunk_entry) + self.current_volume += chunk_entry[1] + if show_progress: + stats.show_progress(item=item, dt=0.2) + self.maybe_checkpoint(item) class FilesystemObjectProcessors: @@ -1365,6 +1347,7 @@ class FilesystemObjectProcessors: safe_path = make_path_safe(path) item = Item(path=safe_path) hardlinked = hardlinkable and st.st_nlink > 1 + hl_chunks = None update_map = False if hardlinked: status = "h" # hardlink @@ -1373,9 +1356,9 @@ class FilesystemObjectProcessors: if chunks is nothing: update_map = True elif chunks is not None: - item.chunks = chunks + hl_chunks = chunks item.hlid = self.hlm.hardlink_id_from_inode(ino=st.st_ino, dev=st.st_dev) - yield item, status, hardlinked + yield item, status, hardlinked, hl_chunks self.add_item(item, stats=self.stats) if update_map: # remember the hlid of this fs object and if the item has chunks, @@ -1384,12 +1367,12 @@ class FilesystemObjectProcessors: self.hlm.remember(id=(st.st_ino, st.st_dev), info=chunks) def process_dir_with_fd(self, *, path, fd, st): - with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked): + with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked, hl_chunks): item.update(self.metadata_collector.stat_attrs(st, path, fd=fd)) return status def process_dir(self, *, path, parent_fd, name, st): - with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked): + with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked, hl_chunks): with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir, noatime=True, op="dir_open") as fd: # fd is None for directories on windows, in that case a race condition check is not possible. if fd is not None: @@ -1399,7 +1382,7 @@ class FilesystemObjectProcessors: return status def process_fifo(self, *, path, parent_fd, name, st): - with self.create_helper(path, st, "f") as (item, status, hardlinked): # fifo + with self.create_helper(path, st, "f") as (item, status, hardlinked, hl_chunks): # fifo with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd: with backup_io("fstat"): st = stat_update_check(st, os.fstat(fd)) @@ -1407,7 +1390,7 @@ class FilesystemObjectProcessors: return status def process_dev(self, *, path, parent_fd, name, st, dev_type): - with self.create_helper(path, st, dev_type) as (item, status, hardlinked): # char/block device + with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hl_chunks): # char/block device # looks like we can not work fd-based here without causing issues when trying to open/close the device with backup_io("stat"): st = stat_update_check(st, os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)) @@ -1416,7 +1399,7 @@ class FilesystemObjectProcessors: return status def process_symlink(self, *, path, parent_fd, name, st): - with self.create_helper(path, st, "s", hardlinkable=True) as (item, status, hardlinked): + with self.create_helper(path, st, "s", hardlinkable=True) as (item, status, hardlinked, hl_chunks): fname = name if name is not None and parent_fd is not None else path with backup_io("readlink"): target = os.readlink(fname, dir_fd=parent_fd) @@ -1450,14 +1433,23 @@ class FilesystemObjectProcessors: item.uid = uid if gid is not None: item.gid = gid - self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))) - item.get_size(memorize=True) - self.stats.nfiles += 1 - self.add_item(item, stats=self.stats) - return status + try: + self.process_file_chunks( + item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)) + ) + except BackupOSError: + # see comments in process_file's exception handler, same issue here. + for chunk in item.get("chunks", []): + cache.chunk_decref(chunk.id, self.stats, wait=False) + raise + else: + item.get_size(memorize=True) + self.stats.nfiles += 1 + self.add_item(item, stats=self.stats) + return status def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal): - with self.create_helper(path, st, None) as (item, status, hardlinked): # no status yet + with self.create_helper(path, st, None) as (item, status, hardlinked, hl_chunks): # no status yet with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd: with backup_io("fstat"): st = stat_update_check(st, os.fstat(fd)) @@ -1468,72 +1460,90 @@ class FilesystemObjectProcessors: # so it can be extracted / accessed in FUSE mount like a regular file. # this needs to be done early, so that part files also get the patched mode. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode) - if "chunks" in item: # create_helper might have put chunks from a previous hardlink there - [cache.chunk_incref(id_, self.stats) for id_, _ in item.chunks] - else: # normal case, no "2nd+" hardlink - if not is_special_file: - hashed_path = safe_encode(os.path.join(self.cwd, path)) - started_hashing = time.monotonic() - path_hash = self.key.id_hash(hashed_path) - self.stats.hashing_time += time.monotonic() - started_hashing - known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) - else: - # in --read-special mode, we may be called for special files. - # there should be no information in the cache about special files processed in - # read-special mode, but we better play safe as this was wrong in the past: - hashed_path = path_hash = None - known, ids = False, None - chunks = None - if ids is not None: - # Make sure all ids are available - for id_ in ids: - if not cache.seen_chunk(id_): - status = ( - "M" # cache said it is unmodified, but we lost a chunk: process file like modified - ) - break + # we begin processing chunks now (writing or incref'ing them to the repository), + # which might require cleanup (see except-branch): + try: + if hl_chunks is not None: # create_helper gave us chunks from a previous hardlink + item.chunks = [] + for chunk_id, chunk_size in hl_chunks: + # process one-by-one, so we will know in item.chunks how far we got + chunk_entry = cache.chunk_incref(chunk_id, self.stats) + item.chunks.append(chunk_entry) + else: # normal case, no "2nd+" hardlink + if not is_special_file: + hashed_path = safe_encode(os.path.join(self.cwd, path)) + started_hashing = time.monotonic() + path_hash = self.key.id_hash(hashed_path) + self.stats.hashing_time += time.monotonic() - started_hashing + known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) else: - chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids] - status = "U" # regular file, unchanged - else: - status = "M" if known else "A" # regular file, modified or added - self.print_file_status(status, path) - self.stats.files_stats[status] += 1 - status = None # we already printed the status - # Only chunkify the file if needed - if chunks is not None: - item.chunks = chunks - else: - with backup_io("read"): - self.process_file_chunks( - item, - cache, - self.stats, - self.show_progress, - backup_io_iter(self.chunker.chunkify(None, fd)), - ) - self.stats.chunking_time = self.chunker.chunking_time - if is_win32: - changed_while_backup = False # TODO + # in --read-special mode, we may be called for special files. + # there should be no information in the cache about special files processed in + # read-special mode, but we better play safe as this was wrong in the past: + hashed_path = path_hash = None + known, ids = False, None + if ids is not None: + # Make sure all ids are available + for id_ in ids: + if not cache.seen_chunk(id_): + # cache said it is unmodified, but we lost a chunk: process file like modified + status = "M" + break + else: + item.chunks = [] + for chunk_id in ids: + # process one-by-one, so we will know in item.chunks how far we got + chunk_entry = cache.chunk_incref(chunk_id, self.stats) + item.chunks.append(chunk_entry) + status = "U" # regular file, unchanged else: - with backup_io("fstat2"): - st2 = os.fstat(fd) - # special files: - # - fifos change naturally, because they are fed from the other side. no problem. - # - blk/chr devices don't change ctime anyway. - changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns - if changed_while_backup: - status = "C" # regular file changed while we backed it up, might be inconsistent/corrupt! - if not is_special_file and not changed_while_backup: - # we must not memorize special files, because the contents of e.g. a - # block or char device will change without its mtime/size/inode changing. - # also, we must not memorize a potentially inconsistent/corrupt file that - # changed while we backed it up. - cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) - self.stats.nfiles += 1 - item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd)) - item.get_size(memorize=True) - return status + status = "M" if known else "A" # regular file, modified or added + self.print_file_status(status, path) + self.stats.files_stats[status] += 1 + status = None # we already printed the status + # Only chunkify the file if needed + if "chunks" not in item: + with backup_io("read"): + self.process_file_chunks( + item, + cache, + self.stats, + self.show_progress, + backup_io_iter(self.chunker.chunkify(None, fd)), + ) + self.stats.chunking_time = self.chunker.chunking_time + if is_win32: + changed_while_backup = False # TODO + else: + with backup_io("fstat2"): + st2 = os.fstat(fd) + # special files: + # - fifos change naturally, because they are fed from the other side. no problem. + # - blk/chr devices don't change ctime anyway. + changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns + if changed_while_backup: + # regular file changed while we backed it up, might be inconsistent/corrupt! + status = "C" + if not is_special_file and not changed_while_backup: + # we must not memorize special files, because the contents of e.g. a + # block or char device will change without its mtime/size/inode changing. + # also, we must not memorize a potentially inconsistent/corrupt file that + # changed while we backed it up. + cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) + self.stats.nfiles += 1 + item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd)) + item.get_size(memorize=True) + return status + except BackupOSError: + # Something went wrong and we might need to clean up a bit. + # Maybe we have already incref'ed some file content chunks in the repo - + # but we will not add an item (see add_item in create_helper) and thus + # they would be orphaned chunks in case that we commit the transaction. + for chunk in item.get("chunks", []): + cache.chunk_decref(chunk.id, self.stats, wait=False) + # Now that we have cleaned up the chunk references, we can re-raise the exception. + # This will skip processing of this file, but might retry or continue with the next one. + raise class TarfileObjectProcessors: @@ -1628,15 +1638,21 @@ class TarfileObjectProcessors: with self.create_helper(tarinfo, status, type) as (item, status): self.print_file_status(status, tarinfo.name) status = None # we already printed the status - fd = tar.extractfile(tarinfo) - self.process_file_chunks( - item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)) - ) - item.get_size(memorize=True, from_chunks=True) - self.stats.nfiles += 1 - # we need to remember ALL files, see HardLinkManager.__doc__ - self.hlm.remember(id=tarinfo.name, info=item.chunks) - return status + try: + fd = tar.extractfile(tarinfo) + self.process_file_chunks( + item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)) + ) + item.get_size(memorize=True, from_chunks=True) + self.stats.nfiles += 1 + # we need to remember ALL files, see HardLinkManager.__doc__ + self.hlm.remember(id=tarinfo.name, info=item.chunks) + return status + except BackupOSError: + # see comment in FilesystemObjectProcessors.process_file, same issue here. + for chunk in item.get("chunks", []): + self.cache.chunk_decref(chunk.id, self.stats, wait=False) + raise def valid_msgpacked_dict(d, keys_serialized): |