diff options
author | bigtedde <big.tedde@gmail.com> | 2023-07-26 12:18:25 -0700 |
---|---|---|
committer | bigtedde <big.tedde@gmail.com> | 2023-07-26 12:18:25 -0700 |
commit | d2f32986f30f28f1edc7c74f38c0f3d979535c62 (patch) | |
tree | 93410f85f1be3d4800096e30733c9993f2d4c6d0 /src | |
parent | f33efc0a10b6671e2cfc8ccaf496df8ac7e8f5f0 (diff) |
removed TestCaseBase from testsuite/archive.py
Diffstat (limited to 'src')
-rw-r--r-- | src/borg/testsuite/archive.py | 184 |
1 files changed, 92 insertions, 92 deletions
diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index 670a3a71a..cefd6293b 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -7,7 +7,6 @@ from unittest.mock import Mock import pytest -from . import BaseTestCase from . import rejected_dotdot_paths from ..crypto.key import PlaintextKey from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics @@ -133,97 +132,98 @@ class MockCache: return id, len(data) -class ChunkBufferTestCase(BaseTestCase): - def test_cache_chunk_buffer(self): - data = [Item(path="p1"), Item(path="p2")] - cache = MockCache() - key = PlaintextKey(None) - chunks = CacheChunkBuffer(cache, key, None) - for d in data: - chunks.add(d) - chunks.flush() - chunks.flush(flush=True) - self.assert_equal(len(chunks.chunks), 2) - unpacker = msgpack.Unpacker() - for id in chunks.chunks: - unpacker.feed(cache.objects[id]) - self.assert_equal(data, [Item(internal_dict=d) for d in unpacker]) - - def test_partial_cache_chunk_buffer(self): - big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000 - data = [Item(path="full", target=big), Item(path="partial", target=big)] - cache = MockCache() - key = PlaintextKey(None) - chunks = CacheChunkBuffer(cache, key, None) - for d in data: - chunks.add(d) - chunks.flush(flush=False) - # the code is expected to leave the last partial chunk in the buffer - self.assert_equal(len(chunks.chunks), 3) - assert chunks.buffer.tell() > 0 - # now really flush - chunks.flush(flush=True) - self.assert_equal(len(chunks.chunks), 4) - assert chunks.buffer.tell() == 0 - unpacker = msgpack.Unpacker() - for id in chunks.chunks: - unpacker.feed(cache.objects[id]) - self.assert_equal(data, [Item(internal_dict=d) for d in unpacker]) - - -class RobustUnpackerTestCase(BaseTestCase): - def make_chunks(self, items): - return b"".join(msgpack.packb({"path": item}) for item in items) - - def _validator(self, value): - return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz") - - def process(self, input): - unpacker = RobustUnpacker(validator=self._validator, item_keys=ITEM_KEYS) - result = [] - for should_sync, chunks in input: - if should_sync: - unpacker.resync() - for data in chunks: - unpacker.feed(data) - for item in unpacker: - result.append(item) - return result - - def test_extra_garbage_no_sync(self): - chunks = [ - (False, [self.make_chunks(["foo", "bar"])]), - (False, [b"garbage"] + [self.make_chunks(["boo", "baz"])]), - ] - result = self.process(chunks) - self.assert_equal( - result, [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}] - ) - - def split(self, left, length): - parts = [] - while left: - parts.append(left[:length]) - left = left[length:] - return parts - - def test_correct_stream(self): - chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 2) - input = [(False, chunks)] - result = self.process(input) - self.assert_equal(result, [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}]) - - def test_missing_chunk(self): - chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4) - input = [(False, chunks[:3]), (True, chunks[4:])] - result = self.process(input) - self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]) - - def test_corrupt_chunk(self): - chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4) - input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])] - result = self.process(input) - self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]) +def test_cache_chunk_buffer(): + data = [Item(path="p1"), Item(path="p2")] + cache = MockCache() + key = PlaintextKey(None) + chunks = CacheChunkBuffer(cache, key, None) + for d in data: + chunks.add(d) + chunks.flush() + chunks.flush(flush=True) + assert len(chunks.chunks) == 2 + unpacker = msgpack.Unpacker() + for id in chunks.chunks: + unpacker.feed(cache.objects[id]) + assert data == [Item(internal_dict=d) for d in unpacker] + + +def test_partial_cache_chunk_buffer(): + big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000 + data = [Item(path="full", target=big), Item(path="partial", target=big)] + cache = MockCache() + key = PlaintextKey(None) + chunks = CacheChunkBuffer(cache, key, None) + for d in data: + chunks.add(d) + chunks.flush(flush=False) + # the code is expected to leave the last partial chunk in the buffer + assert len(chunks.chunks) == 3 + assert chunks.buffer.tell() > 0 + # now really flush + chunks.flush(flush=True) + assert len(chunks.chunks) == 4 + assert chunks.buffer.tell() == 0 + unpacker = msgpack.Unpacker() + for id in chunks.chunks: + unpacker.feed(cache.objects[id]) + assert data == [Item(internal_dict=d) for d in unpacker] + + +def make_chunks(items): + return b"".join(msgpack.packb({"path": item}) for item in items) + + +def _validator(value): + return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz") + + +def process(input): + unpacker = RobustUnpacker(validator=_validator, item_keys=ITEM_KEYS) + result = [] + for should_sync, chunks in input: + if should_sync: + unpacker.resync() + for data in chunks: + unpacker.feed(data) + for item in unpacker: + result.append(item) + return result + + +def test_extra_garbage_no_sync(): + chunks = [(False, [make_chunks(["foo", "bar"])]), (False, [b"garbage"] + [make_chunks(["boo", "baz"])])] + res = process(chunks) + assert res == [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}] + + +def split(left, length): + parts = [] + while left: + parts.append(left[:length]) + left = left[length:] + return parts + + +def test_correct_stream(): + chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 2) + input = [(False, chunks)] + result = process(input) + assert result == [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}] + + +def test_missing_chunk(): + chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4) + input = [(False, chunks[:3]), (True, chunks[4:])] + result = process(input) + assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}] + + +def test_corrupt_chunk(): + chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4) + input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])] + result = process(input) + assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}] @pytest.fixture |