summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorThomas Waldmann <tw@waldmann-edv.de>2016-06-04 18:26:55 +0200
committerThomas Waldmann <tw@waldmann-edv.de>2016-08-27 23:17:57 +0200
commit1f04820d9d109d77ebb9997c770612662ac28dd6 (patch)
treedec452f62fed406c03a18adc03dc9f964b078d0f /src
parent1219ba5f5088f1c63ef972e5d8ddcacab537a471 (diff)
fuse: implement versions view
all archives, all items are read to build a unified view. files are represented by a same-name directory with the versions of the file. A filename suffix computed by adler32(chunkids) is used to disambiguate the versions. also: refactor code a little, create methods for leaves, inner nodes.
Diffstat (limited to 'src')
-rw-r--r--src/borg/archiver.py2
-rw-r--r--src/borg/fuse.py102
-rw-r--r--src/borg/testsuite/archiver.py15
3 files changed, 89 insertions, 30 deletions
diff --git a/src/borg/archiver.py b/src/borg/archiver.py
index 2b9888650..5936cdf8b 100644
--- a/src/borg/archiver.py
+++ b/src/borg/archiver.py
@@ -1835,6 +1835,8 @@ class Archiver:
For mount options, see the fuse(8) manual page. Additional mount options
supported by borg:
+ - versions: when used with a repository mount, this gives a merged, versioned
+ view of the files in the archives. EXPERIMENTAL, layout may change in future.
- allow_damaged_files: by default damaged files (where missing chunks were
replaced with runs of zeros by borg check --repair) are not readable and
return EIO (I/O error). Set this option to read such files.
diff --git a/src/borg/fuse.py b/src/borg/fuse.py
index 4e7cf10c5..c81292b6d 100644
--- a/src/borg/fuse.py
+++ b/src/borg/fuse.py
@@ -6,12 +6,12 @@ import tempfile
import time
from collections import defaultdict
from distutils.version import LooseVersion
+from zlib import adler32
import llfuse
import msgpack
from .logger import create_logger
-from .lrucache import LRUCache
logger = create_logger()
from .archive import Archive
@@ -51,14 +51,18 @@ class ItemCache:
class FuseOperations(llfuse.Operations):
"""Export archive as a fuse filesystem
"""
-
+ # mount options
allow_damaged_files = False
+ versions = False
def __init__(self, key, repository, manifest, archive, cached_repo):
super().__init__()
- self._inode_count = 0
- self.key = key
+ self.repository_uncached = repository
self.repository = cached_repo
+ self.archive = archive
+ self.manifest = manifest
+ self.key = key
+ self._inode_count = 0
self.items = {}
self.parent = {}
self.contents = defaultdict(dict)
@@ -69,15 +73,22 @@ class FuseOperations(llfuse.Operations):
data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
+
+ def _create_filesystem(self):
self._create_dir(parent=1) # first call, create root dir (inode == 1)
- if archive:
- self.process_archive(archive)
+ if self.archive:
+ self.process_archive(self.archive)
else:
- for name in manifest.archives:
- # Create archive placeholder inode
- archive_inode = self._create_dir(parent=1)
- self.contents[1][os.fsencode(name)] = archive_inode
- self.pending_archives[archive_inode] = Archive(repository, key, manifest, name)
+ for name in self.manifest.archives:
+ archive = Archive(self.repository_uncached, self.key, self.manifest, name)
+ if self.versions:
+ # process archives immediately
+ self.process_archive(archive)
+ else:
+ # lazy load archives, create archive placeholder inode
+ archive_inode = self._create_dir(parent=1)
+ self.contents[1][os.fsencode(name)] = archive_inode
+ self.pending_archives[archive_inode] = archive
def mount(self, mountpoint, mount_options, foreground=False):
"""Mount filesystem on *mountpoint* with *mount_options*."""
@@ -89,6 +100,12 @@ class FuseOperations(llfuse.Operations):
self.allow_damaged_files = True
except ValueError:
pass
+ try:
+ options.remove('versions')
+ self.versions = True
+ except ValueError:
+ pass
+ self._create_filesystem()
llfuse.init(self, mountpoint, options)
if not foreground:
daemonize()
@@ -122,11 +139,16 @@ class FuseOperations(llfuse.Operations):
unpacker.feed(data)
for item in unpacker:
item = Item(internal_dict=item)
+ is_dir = stat.S_ISDIR(item.mode)
try:
# This can happen if an archive was created with a command line like
# $ borg create ... dir1/file dir1
# In this case the code below will have created a default_dir inode for dir1 already.
- inode = self._find_inode(safe_encode(item.path), prefix)
+ path = safe_encode(item.path)
+ if not is_dir:
+ # not a directory -> no lookup needed
+ raise KeyError
+ inode = self._find_inode(path, prefix)
except KeyError:
pass
else:
@@ -137,25 +159,46 @@ class FuseOperations(llfuse.Operations):
num_segments = len(segments)
parent = 1
for i, segment in enumerate(segments, 1):
- # Leaf segment?
if i == num_segments:
- if 'source' in item and stat.S_ISREG(item.mode):
- inode = self._find_inode(item.source, prefix)
- item = self.cache.get(inode)
- item.nlink = item.get('nlink', 1) + 1
- self.items[inode] = item
- else:
- inode = self.cache.add(item)
- self.parent[inode] = parent
- if segment:
- self.contents[parent][segment] = inode
- elif segment in self.contents[parent]:
- parent = self.contents[parent][segment]
+ self.process_leaf(segment, item, parent, prefix, is_dir)
else:
- inode = self._create_dir(parent)
- if segment:
- self.contents[parent][segment] = inode
- parent = inode
+ parent = self.process_inner(segment, parent)
+
+ def process_leaf(self, name, item, parent, prefix, is_dir):
+ def version_name(name, item):
+ if 'chunks' in item:
+ ident = 0
+ for chunkid, _, _ in item.chunks:
+ ident = adler32(chunkid, ident)
+ name = name + safe_encode('.%08x' % ident)
+ return name
+
+ if self.versions and not is_dir:
+ parent = self.process_inner(name, parent)
+ name = version_name(name, item)
+ self.process_real_leaf(name, item, parent, prefix)
+
+ def process_real_leaf(self, name, item, parent, prefix):
+ if 'source' in item and stat.S_ISREG(item.mode):
+ inode = self._find_inode(item.source, prefix)
+ item = self.cache.get(inode)
+ item.nlink = item.get('nlink', 1) + 1
+ self.items[inode] = item
+ else:
+ inode = self.cache.add(item)
+ self.parent[inode] = parent
+ if name:
+ self.contents[parent][name] = inode
+
+ def process_inner(self, name, parent):
+ if name in self.contents[parent]:
+ parent = self.contents[parent][name]
+ else:
+ inode = self._create_dir(parent)
+ if name:
+ self.contents[parent][name] = inode
+ parent = inode
+ return parent
def allocate_inode(self):
self._inode_count += 1
@@ -280,7 +323,6 @@ class FuseOperations(llfuse.Operations):
# evict fully read chunk from cache
del self.data_cache[id]
else:
- # XXX
_, data = self.key.decrypt(id, self.repository.get(id))
if offset + n < len(data):
# chunk was only partially read, cache it
diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py
index d5d6f5a59..181ccb17a 100644
--- a/src/borg/testsuite/archiver.py
+++ b/src/borg/testsuite/archiver.py
@@ -1442,6 +1442,21 @@ class ArchiverTestCase(ArchiverTestCaseBase):
assert stat.S_ISFIFO(sto.st_mode)
@unittest.skipUnless(has_llfuse, 'llfuse not installed')
+ def test_fuse_versions_view(self):
+ self.cmd('init', self.repository_location)
+ self.create_regular_file('test', contents=b'first')
+ self.cmd('create', self.repository_location + '::archive1', 'input')
+ self.create_regular_file('test', contents=b'second')
+ self.cmd('create', self.repository_location + '::archive2', 'input')
+ mountpoint = os.path.join(self.tmpdir, 'mountpoint')
+ # mount the whole repository, archive contents shall show up in versioned view:
+ with self.fuse_mount(self.repository_location, mountpoint, 'versions'):
+ path = os.path.join(mountpoint, 'input', 'test') # filename shows up as directory ...
+ files = os.listdir(path)
+ assert all(f.startswith('test.') for f in files) # ... with files test.xxxxxxxx in there
+ assert {b'first', b'second'} == {open(os.path.join(path, f), 'rb').read() for f in files}
+
+ @unittest.skipUnless(has_llfuse, 'llfuse not installed')
def test_fuse_allow_damaged_files(self):
self.cmd('init', self.repository_location)
self.create_src_archive('archive')