summaryrefslogtreecommitdiffstats
path: root/src/borg/testsuite/__init__.py
blob: 38f5d4ab1d5d88a7a0daff2d4bffa5ed42013c91 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
from contextlib import contextmanager
import filecmp
import functools
import os
import posix
import stat
import sys
import sysconfig
import tempfile
import time
import uuid
import unittest

from ..xattr import get_all
from ..platform import get_flags, umount
from .. import platform

# Note: this is used by borg.selftest, do not use or import py.test functionality here.

try:
    import llfuse
    # Does this version of llfuse support ns precision?
    have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
except ImportError:
    have_fuse_mtime_ns = False

try:
    from pytest import raises
except ImportError:
    raises = None

has_lchflags = hasattr(os, 'lchflags') or sys.platform.startswith('linux')
no_lchlfags_because = '' if has_lchflags else '(not supported on this platform)'
try:
    with tempfile.NamedTemporaryFile() as file:
        platform.set_flags(file.name, stat.UF_NODUMP)
except OSError:
    has_lchflags = False
    no_lchlfags_because = '(the file system at %s does not support flags)' % tempfile.gettempdir()

try:
    import llfuse
    has_llfuse = True or llfuse  # avoids "unused import"
except ImportError:
    has_llfuse = False

# The mtime get/set precision varies on different OS and Python versions
if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
    st_mtime_ns_round = 0
elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
    st_mtime_ns_round = -6
else:
    st_mtime_ns_round = -9

if sys.platform.startswith('netbsd'):
    st_mtime_ns_round = -4  # only >1 microsecond resolution here?


@contextmanager
def unopened_tempfile():
    with tempfile.TemporaryDirectory() as tempdir:
        yield os.path.join(tempdir, "file")


@functools.lru_cache()
def are_symlinks_supported():
    with unopened_tempfile() as filepath:
        try:
            os.symlink('somewhere', filepath)
            if os.lstat(filepath) and os.readlink(filepath) == 'somewhere':
                return True
        except OSError:
            pass
    return False


@functools.lru_cache()
def are_hardlinks_supported():
    with unopened_tempfile() as file1path, unopened_tempfile() as file2path:
        open(file1path, 'w').close()
        try:
            os.link(file1path, file2path)
            stat1 = os.stat(file1path)
            stat2 = os.stat(file2path)
            if stat1.st_nlink == stat2.st_nlink == 2 and stat1.st_ino == stat2.st_ino:
                return True
        except OSError:
            pass
    return False


@functools.lru_cache()
def are_fifos_supported():
    with unopened_tempfile() as filepath:
        try:
            os.mkfifo(filepath)
            return True
        except OSError:
            return False


@functools.lru_cache()
def is_utime_fully_supported():
    with unopened_tempfile() as filepath:
        # Some filesystems (such as SSHFS) don't support utime on symlinks
        if are_symlinks_supported():
            os.symlink('something', filepath)
        else:
            open(filepath, 'w').close()
        try:
            os.utime(filepath, (1000, 2000), follow_symlinks=False)
            new_stats = os.lstat(filepath)
            if new_stats.st_atime == 1000 and new_stats.st_mtime == 2000:
                return True
        except OSError as err:
            pass
        return False


def no_selinux(x):
    # selinux fails our FUSE tests, thus ignore selinux xattrs
    SELINUX_KEY = 'security.selinux'
    if isinstance(x, dict):
        return {k: v for k, v in x.items() if k != SELINUX_KEY}
    if isinstance(x, list):
        return [k for k in x if k != SELINUX_KEY]


class BaseTestCase(unittest.TestCase):
    """
    """
    assert_in = unittest.TestCase.assertIn
    assert_not_in = unittest.TestCase.assertNotIn
    assert_equal = unittest.TestCase.assertEqual
    assert_not_equal = unittest.TestCase.assertNotEqual
    assert_true = unittest.TestCase.assertTrue

    if raises:
        assert_raises = staticmethod(raises)
    else:
        assert_raises = unittest.TestCase.assertRaises

    @contextmanager
    def assert_creates_file(self, path):
        self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
        yield
        self.assert_true(os.path.exists(path), '{} should exist'.format(path))

    def assert_dirs_equal(self, dir1, dir2, **kwargs):
        diff = filecmp.dircmp(dir1, dir2)
        self._assert_dirs_equal_cmp(diff, **kwargs)

    def _assert_dirs_equal_cmp(self, diff, ignore_bsdflags=False, ignore_xattrs=False, ignore_ns=False):
        self.assert_equal(diff.left_only, [])
        self.assert_equal(diff.right_only, [])
        self.assert_equal(diff.diff_files, [])
        self.assert_equal(diff.funny_files, [])
        for filename in diff.common:
            path1 = os.path.join(diff.left, filename)
            path2 = os.path.join(diff.right, filename)
            s1 = os.lstat(path1)
            s2 = os.lstat(path2)
            # Assume path2 is on FUSE if st_dev is different
            fuse = s1.st_dev != s2.st_dev
            attrs = ['st_uid', 'st_gid', 'st_rdev']
            if not fuse or not os.path.isdir(path1):
                # dir nlink is always 1 on our fuse filesystem
                attrs.append('st_nlink')
            d1 = [filename] + [getattr(s1, a) for a in attrs]
            d2 = [filename] + [getattr(s2, a) for a in attrs]
            d1.insert(1, oct(s1.st_mode))
            d2.insert(1, oct(s2.st_mode))
            if not ignore_bsdflags:
                d1.append(get_flags(path1, s1))
                d2.append(get_flags(path2, s2))
            # ignore st_rdev if file is not a block/char device, fixes #203
            if not stat.S_ISCHR(s1.st_mode) and not stat.S_ISBLK(s1.st_mode):
                d1[4] = None
            if not stat.S_ISCHR(s2.st_mode) and not stat.S_ISBLK(s2.st_mode):
                d2[4] = None
            # If utime isn't fully supported, borg can't set mtime.
            # Therefore, we shouldn't test it in that case.
            if is_utime_fully_supported():
                # Older versions of llfuse do not support ns precision properly
                if ignore_ns:
                    d1.append(int(s1.st_mtime_ns / 1e9))
                    d2.append(int(s2.st_mtime_ns / 1e9))
                elif fuse and not have_fuse_mtime_ns:
                    d1.append(round(s1.st_mtime_ns, -4))
                    d2.append(round(s2.st_mtime_ns, -4))
                else:
                    d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
                    d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
            if not ignore_xattrs:
                d1.append(no_selinux(get_all(path1, follow_symlinks=False)))
                d2.append(no_selinux(get_all(path2, follow_symlinks=False)))
            self.assert_equal(d1, d2)
        for sub_diff in diff.subdirs.values():
            self._assert_dirs_equal_cmp(sub_diff, ignore_bsdflags=ignore_bsdflags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns)

    @contextmanager
    def fuse_mount(self, location, mountpoint, *options):
        os.mkdir(mountpoint)
        args = ['mount', location, mountpoint] + list(options)
        self.cmd(*args, fork=True)
        self.wait_for_mount(mountpoint)
        yield
        umount(mountpoint)
        os.rmdir(mountpoint)
        # Give the daemon some time to exit
        time.sleep(.2)

    def wait_for_mount(self, path, timeout=5):
        """Wait until a filesystem is mounted on `path`
        """
        timeout += time.time()
        while timeout > time.time():
            if os.path.ismount(path):
                return
            time.sleep(.1)
        raise Exception('wait_for_mount(%s) timeout' % path)


class changedir:
    def __init__(self, dir):
        self.dir = dir

    def __enter__(self):
        self.old = os.getcwd()
        os.chdir(self.dir)

    def __exit__(self, *args, **kw):
        os.chdir(self.old)


class environment_variable:
    def __init__(self, **values):
        self.values = values
        self.old_values = {}

    def __enter__(self):
        for k, v in self.values.items():
            self.old_values[k] = os.environ.get(k)
            if v is None:
                os.environ.pop(k, None)
            else:
                os.environ[k] = v

    def __exit__(self, *args, **kw):
        for k, v in self.old_values.items():
            if v is None:
                os.environ.pop(k, None)
            else:
                os.environ[k] = v


class FakeInputs:
    """Simulate multiple user inputs, can be used as input() replacement"""
    def __init__(self, inputs):
        self.inputs = inputs

    def __call__(self, prompt=None):
        if prompt is not None:
            print(prompt, end='')
        try:
            return self.inputs.pop(0)
        except IndexError:
            raise EOFError from None