summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorbigtedde <big.tedde@gmail.com>2023-07-26 12:18:25 -0700
committerbigtedde <big.tedde@gmail.com>2023-07-26 12:18:25 -0700
commitd2f32986f30f28f1edc7c74f38c0f3d979535c62 (patch)
tree93410f85f1be3d4800096e30733c9993f2d4c6d0 /src
parentf33efc0a10b6671e2cfc8ccaf496df8ac7e8f5f0 (diff)
removed TestCaseBase from testsuite/archive.py
Diffstat (limited to 'src')
-rw-r--r--src/borg/testsuite/archive.py184
1 files changed, 92 insertions, 92 deletions
diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py
index 670a3a71a..cefd6293b 100644
--- a/src/borg/testsuite/archive.py
+++ b/src/borg/testsuite/archive.py
@@ -7,7 +7,6 @@ from unittest.mock import Mock
import pytest
-from . import BaseTestCase
from . import rejected_dotdot_paths
from ..crypto.key import PlaintextKey
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
@@ -133,97 +132,98 @@ class MockCache:
return id, len(data)
-class ChunkBufferTestCase(BaseTestCase):
- def test_cache_chunk_buffer(self):
- data = [Item(path="p1"), Item(path="p2")]
- cache = MockCache()
- key = PlaintextKey(None)
- chunks = CacheChunkBuffer(cache, key, None)
- for d in data:
- chunks.add(d)
- chunks.flush()
- chunks.flush(flush=True)
- self.assert_equal(len(chunks.chunks), 2)
- unpacker = msgpack.Unpacker()
- for id in chunks.chunks:
- unpacker.feed(cache.objects[id])
- self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
-
- def test_partial_cache_chunk_buffer(self):
- big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
- data = [Item(path="full", target=big), Item(path="partial", target=big)]
- cache = MockCache()
- key = PlaintextKey(None)
- chunks = CacheChunkBuffer(cache, key, None)
- for d in data:
- chunks.add(d)
- chunks.flush(flush=False)
- # the code is expected to leave the last partial chunk in the buffer
- self.assert_equal(len(chunks.chunks), 3)
- assert chunks.buffer.tell() > 0
- # now really flush
- chunks.flush(flush=True)
- self.assert_equal(len(chunks.chunks), 4)
- assert chunks.buffer.tell() == 0
- unpacker = msgpack.Unpacker()
- for id in chunks.chunks:
- unpacker.feed(cache.objects[id])
- self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
-
-
-class RobustUnpackerTestCase(BaseTestCase):
- def make_chunks(self, items):
- return b"".join(msgpack.packb({"path": item}) for item in items)
-
- def _validator(self, value):
- return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
-
- def process(self, input):
- unpacker = RobustUnpacker(validator=self._validator, item_keys=ITEM_KEYS)
- result = []
- for should_sync, chunks in input:
- if should_sync:
- unpacker.resync()
- for data in chunks:
- unpacker.feed(data)
- for item in unpacker:
- result.append(item)
- return result
-
- def test_extra_garbage_no_sync(self):
- chunks = [
- (False, [self.make_chunks(["foo", "bar"])]),
- (False, [b"garbage"] + [self.make_chunks(["boo", "baz"])]),
- ]
- result = self.process(chunks)
- self.assert_equal(
- result, [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
- )
-
- def split(self, left, length):
- parts = []
- while left:
- parts.append(left[:length])
- left = left[length:]
- return parts
-
- def test_correct_stream(self):
- chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 2)
- input = [(False, chunks)]
- result = self.process(input)
- self.assert_equal(result, [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}])
-
- def test_missing_chunk(self):
- chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
- input = [(False, chunks[:3]), (True, chunks[4:])]
- result = self.process(input)
- self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
-
- def test_corrupt_chunk(self):
- chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
- input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
- result = self.process(input)
- self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
+def test_cache_chunk_buffer():
+ data = [Item(path="p1"), Item(path="p2")]
+ cache = MockCache()
+ key = PlaintextKey(None)
+ chunks = CacheChunkBuffer(cache, key, None)
+ for d in data:
+ chunks.add(d)
+ chunks.flush()
+ chunks.flush(flush=True)
+ assert len(chunks.chunks) == 2
+ unpacker = msgpack.Unpacker()
+ for id in chunks.chunks:
+ unpacker.feed(cache.objects[id])
+ assert data == [Item(internal_dict=d) for d in unpacker]
+
+
+def test_partial_cache_chunk_buffer():
+ big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
+ data = [Item(path="full", target=big), Item(path="partial", target=big)]
+ cache = MockCache()
+ key = PlaintextKey(None)
+ chunks = CacheChunkBuffer(cache, key, None)
+ for d in data:
+ chunks.add(d)
+ chunks.flush(flush=False)
+ # the code is expected to leave the last partial chunk in the buffer
+ assert len(chunks.chunks) == 3
+ assert chunks.buffer.tell() > 0
+ # now really flush
+ chunks.flush(flush=True)
+ assert len(chunks.chunks) == 4
+ assert chunks.buffer.tell() == 0
+ unpacker = msgpack.Unpacker()
+ for id in chunks.chunks:
+ unpacker.feed(cache.objects[id])
+ assert data == [Item(internal_dict=d) for d in unpacker]
+
+
+def make_chunks(items):
+ return b"".join(msgpack.packb({"path": item}) for item in items)
+
+
+def _validator(value):
+ return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
+
+
+def process(input):
+ unpacker = RobustUnpacker(validator=_validator, item_keys=ITEM_KEYS)
+ result = []
+ for should_sync, chunks in input:
+ if should_sync:
+ unpacker.resync()
+ for data in chunks:
+ unpacker.feed(data)
+ for item in unpacker:
+ result.append(item)
+ return result
+
+
+def test_extra_garbage_no_sync():
+ chunks = [(False, [make_chunks(["foo", "bar"])]), (False, [b"garbage"] + [make_chunks(["boo", "baz"])])]
+ res = process(chunks)
+ assert res == [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
+
+
+def split(left, length):
+ parts = []
+ while left:
+ parts.append(left[:length])
+ left = left[length:]
+ return parts
+
+
+def test_correct_stream():
+ chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 2)
+ input = [(False, chunks)]
+ result = process(input)
+ assert result == [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}]
+
+
+def test_missing_chunk():
+ chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
+ input = [(False, chunks[:3]), (True, chunks[4:])]
+ result = process(input)
+ assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
+
+
+def test_corrupt_chunk():
+ chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
+ input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
+ result = process(input)
+ assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
@pytest.fixture