From b9cdda07eeeed7edb77a25f35420418113751adf Mon Sep 17 00:00:00 2001 From: Abdessamad Derraz <3028866+Abdess@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:19:50 +0100 Subject: [PATCH] refactor: DRY consolidation + 83 unit tests Moved shared functions to common.py (single source of truth): - check_inside_zip (was in verify.py, imported by generate_pack) - build_zip_contents_index (was duplicated in verify + generate_pack) - load_emulator_profiles (was in verify, cross_reference, generate_site) - group_identical_platforms (was in verify + generate_pack) Added tests/ with 83 unit tests covering: - resolve_local_file: SHA1, MD5, name, alias, truncated, zip_contents - verify: existence, md5, zipped_file, multi-hash, severity mapping - aliases: field parsing, by_name indexing, beetle_psx field rename - pack: dedup, file_status, zipped_file inner check, EmuDeck entries - severity: all 12 combinations, platform-native behavior 0 regressions: pipeline.py --all produces identical results. --- scripts/common.py | 96 +++++++++++ scripts/cross_reference.py | 16 +- scripts/generate_pack.py | 64 +------ scripts/verify.py | 92 ++-------- tests/__init__.py | 0 tests/test_aliases.py | 201 ++++++++++++++++++++++ tests/test_pack.py | 238 ++++++++++++++++++++++++++ tests/test_resolve.py | 166 ++++++++++++++++++ tests/test_severity.py | 184 ++++++++++++++++++++ tests/test_verify.py | 334 +++++++++++++++++++++++++++++++++++++ 10 files changed, 1243 insertions(+), 148 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_aliases.py create mode 100644 tests/test_pack.py create mode 100644 tests/test_resolve.py create mode 100644 tests/test_severity.py create mode 100644 tests/test_verify.py diff --git a/scripts/common.py b/scripts/common.py index 97f42f7c..3d6160b0 100644 --- a/scripts/common.py +++ b/scripts/common.py @@ -243,6 +243,102 @@ def resolve_local_file( return None, "not_found" +def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str: + """Check a ROM inside a ZIP — replicates Batocera checkInsideZip(). + + Returns "ok", "untested", "not_in_zip", or "error". + """ + try: + with zipfile.ZipFile(container) as archive: + for fname in archive.namelist(): + if fname.casefold() == file_name.casefold(): + if expected_md5 == "": + return "ok" + with archive.open(fname) as entry: + actual = md5sum(entry) + return "ok" if actual == expected_md5 else "untested" + return "not_in_zip" + except (zipfile.BadZipFile, OSError, KeyError): + return "error" + + +def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) -> dict: + """Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files.""" + index: dict[str, str] = {} + for sha1, entry in db.get("files", {}).items(): + path = entry["path"] + if not path.endswith(".zip") or not os.path.exists(path): + continue + try: + with zipfile.ZipFile(path, "r") as zf: + for info in zf.infolist(): + if info.is_dir() or info.file_size > max_entry_size: + continue + data = zf.read(info.filename) + index[hashlib.md5(data).hexdigest()] = sha1 + except (zipfile.BadZipFile, OSError): + continue + return index + + +def load_emulator_profiles( + emulators_dir: str, skip_aliases: bool = True, +) -> dict[str, dict]: + """Load all emulator YAML profiles from a directory.""" + try: + import yaml + except ImportError: + return {} + profiles = {} + emu_path = Path(emulators_dir) + if not emu_path.exists(): + return profiles + for f in sorted(emu_path.glob("*.yml")): + with open(f) as fh: + profile = yaml.safe_load(fh) or {} + if "emulator" not in profile: + continue + if skip_aliases and profile.get("type") == "alias": + continue + profiles[f.stem] = profile + return profiles + + +def group_identical_platforms( + platforms: list[str], platforms_dir: str, +) -> list[tuple[list[str], str]]: + """Group platforms that produce identical packs (same files + base_destination). + + Returns [(group_of_platform_names, representative), ...]. + """ + fingerprints: dict[str, list[str]] = {} + representatives: dict[str, str] = {} + + for platform in platforms: + try: + config = load_platform_config(platform, platforms_dir) + except FileNotFoundError: + fingerprints.setdefault(platform, []).append(platform) + representatives.setdefault(platform, platform) + continue + + base_dest = config.get("base_destination", "") + entries = [] + for sys_id, system in sorted(config.get("systems", {}).items()): + for fe in system.get("files", []): + dest = fe.get("destination", fe.get("name", "")) + full_dest = f"{base_dest}/{dest}" if base_dest else dest + sha1 = fe.get("sha1", "") + md5 = fe.get("md5", "") + entries.append(f"{full_dest}|{sha1}|{md5}") + + fp = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest() + fingerprints.setdefault(fp, []).append(platform) + representatives.setdefault(fp, platform) + + return [(group, representatives[fp]) for fp, group in fingerprints.items()] + + def safe_extract_zip(zip_path: str, dest_dir: str) -> None: """Extract a ZIP file safely, preventing zip-slip path traversal.""" dest = os.path.realpath(dest_dir) diff --git a/scripts/cross_reference.py b/scripts/cross_reference.py index a10b24da..db9961c0 100644 --- a/scripts/cross_reference.py +++ b/scripts/cross_reference.py @@ -25,27 +25,13 @@ except ImportError: sys.exit(1) sys.path.insert(0, os.path.dirname(__file__)) -from common import load_database, load_platform_config +from common import load_database, load_emulator_profiles, load_platform_config DEFAULT_EMULATORS_DIR = "emulators" DEFAULT_PLATFORMS_DIR = "platforms" DEFAULT_DB = "database.json" -def load_emulator_profiles(emulators_dir: str) -> dict[str, dict]: - """Load all emulator YAML profiles.""" - profiles = {} - emu_path = Path(emulators_dir) - if not emu_path.exists(): - return profiles - for f in sorted(emu_path.glob("*.yml")): - with open(f) as fh: - profile = yaml.safe_load(fh) or {} - if "emulator" in profile: - profiles[f.stem] = profile - return profiles - - def load_platform_files(platforms_dir: str) -> tuple[dict[str, set[str]], dict[str, set[str]]]: """Load all platform configs and collect declared filenames + data_directories per system.""" declared = {} diff --git a/scripts/generate_pack.py b/scripts/generate_pack.py index f8b6c90a..1646332c 100644 --- a/scripts/generate_pack.py +++ b/scripts/generate_pack.py @@ -24,7 +24,11 @@ import zipfile from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) -from common import compute_hashes, load_database, load_data_dir_registry, load_platform_config, md5_composite, resolve_local_file +from common import ( + build_zip_contents_index, check_inside_zip, compute_hashes, + group_identical_platforms, load_database, load_data_dir_registry, + load_platform_config, md5_composite, resolve_local_file, +) try: import yaml @@ -123,27 +127,6 @@ def resolve_file(file_entry: dict, db: dict, bios_dir: str, return None, "not_found" -def build_zip_contents_index(db: dict) -> dict: - """Build index of {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files.""" - index = {} - for sha1, entry in db.get("files", {}).items(): - path = entry["path"] - if not path.endswith(".zip") or not os.path.exists(path): - continue - try: - with zipfile.ZipFile(path, "r") as zf: - for info in zf.infolist(): - if info.is_dir(): - continue - if info.file_size > MAX_ENTRY_SIZE: - continue - data = zf.read(info.filename) - inner_md5 = hashlib.md5(data).hexdigest() - index[inner_md5] = sha1 - except (zipfile.BadZipFile, OSError): - continue - return index - def download_external(file_entry: dict, dest_path: str) -> bool: """Download an external BIOS file, verify hash, save to dest_path.""" @@ -362,7 +345,6 @@ def generate_pack( if status == "hash_mismatch" and verification_mode != "existence": zf_name = file_entry.get("zipped_file") if zf_name and local_path: - from verify import check_inside_zip inner_md5 = file_entry.get("md5", "") inner_result = check_inside_zip(local_path, zf_name, inner_md5) if inner_result == "ok": @@ -537,7 +519,7 @@ def main(): if updated: print(f"Refreshed {updated} data director{'ies' if updated > 1 else 'y'}") - groups = _group_identical_platforms(platforms, args.platforms_dir) + groups = group_identical_platforms(platforms, args.platforms_dir) for group_platforms, representative in groups: if len(group_platforms) > 1: @@ -565,39 +547,5 @@ def main(): print(f" ERROR: {e}") -def _group_identical_platforms(platforms: list[str], platforms_dir: str) -> list[tuple[list[str], str]]: - """Group platforms that would produce identical ZIP packs. - - Returns [(group_of_platform_names, representative_platform), ...]. - Platforms with the same resolved systems+files+base_destination are grouped. - """ - fingerprints = {} - representatives = {} - - for platform in platforms: - try: - config = load_platform_config(platform, platforms_dir) - except FileNotFoundError: - fingerprints.setdefault(platform, []).append(platform) - representatives.setdefault(platform, platform) - continue - - base_dest = config.get("base_destination", "") - entries = [] - for sys_id, system in sorted(config.get("systems", {}).items()): - for fe in system.get("files", []): - dest = fe.get("destination", fe.get("name", "")) - full_dest = f"{base_dest}/{dest}" if base_dest else dest - sha1 = fe.get("sha1", "") - md5 = fe.get("md5", "") - entries.append(f"{full_dest}|{sha1}|{md5}") - - fingerprint = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest() - fingerprints.setdefault(fingerprint, []).append(platform) - representatives.setdefault(fingerprint, platform) - - return [(group, representatives[fp]) for fp, group in fingerprints.items()] - - if __name__ == "__main__": main() diff --git a/scripts/verify.py b/scripts/verify.py index f0306e19..60d1132b 100644 --- a/scripts/verify.py +++ b/scripts/verify.py @@ -34,7 +34,11 @@ except ImportError: sys.exit(1) sys.path.insert(0, os.path.dirname(__file__)) -from common import load_platform_config, md5sum, md5_composite, resolve_local_file +from common import ( + build_zip_contents_index, check_inside_zip, group_identical_platforms, + load_emulator_profiles, load_platform_config, md5sum, md5_composite, + resolve_local_file, +) DEFAULT_DB = "database.json" DEFAULT_PLATFORMS_DIR = "platforms" @@ -63,25 +67,6 @@ class Severity: # Verification functions # --------------------------------------------------------------------------- -def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str: - """Replicate Batocera checkInsideZip() — batocera-systems:978-1009.""" - try: - with zipfile.ZipFile(container) as archive: - for fname in archive.namelist(): - if fname.casefold() == file_name.casefold(): - if expected_md5 == "": - return Status.OK - with archive.open(fname) as entry: - actual = md5sum(entry) - if actual == expected_md5: - return Status.OK - else: - return Status.UNTESTED - return "not_in_zip" - except (zipfile.BadZipFile, OSError, KeyError): - return "error" - - def verify_entry_existence(file_entry: dict, local_path: str | None) -> dict: """RetroArch verification: path_is_valid() — file exists = OK.""" name = file_entry.get("name", "") @@ -190,41 +175,10 @@ def compute_severity(status: str, required: bool, mode: str) -> str: # ZIP content index # --------------------------------------------------------------------------- -def _build_zip_contents_index(db: dict) -> dict: - index: dict[str, str] = {} - for sha1, entry in db.get("files", {}).items(): - path = entry["path"] - if not path.endswith(".zip") or not os.path.exists(path): - continue - try: - with zipfile.ZipFile(path, "r") as zf: - for info in zf.infolist(): - if info.is_dir() or info.file_size > 512 * 1024 * 1024: - continue - data = zf.read(info.filename) - index[hashlib.md5(data).hexdigest()] = sha1 - except (zipfile.BadZipFile, OSError): - continue - return index - - # --------------------------------------------------------------------------- # Cross-reference: undeclared files used by cores # --------------------------------------------------------------------------- -def _load_emulator_profiles(emulators_dir: str) -> dict[str, dict]: - profiles = {} - emu_path = Path(emulators_dir) - if not emu_path.exists(): - return profiles - for f in sorted(emu_path.glob("*.yml")): - with open(f) as fh: - profile = yaml.safe_load(fh) or {} - if "emulator" in profile and profile.get("type") != "alias": - profiles[f.stem] = profile - return profiles - - def find_undeclared_files( config: dict, emulators_dir: str, @@ -250,7 +204,7 @@ def find_undeclared_files( declared_dd.add(ref) by_name = db.get("indexes", {}).get("by_name", {}) - profiles = _load_emulator_profiles(emulators_dir) + profiles = load_emulator_profiles(emulators_dir) undeclared = [] seen = set() @@ -303,7 +257,7 @@ def verify_platform(config: dict, db: dict, emulators_dir: str = DEFAULT_EMULATO for sys in config.get("systems", {}).values() for fe in sys.get("files", []) ) - zip_contents = _build_zip_contents_index(db) if has_zipped else {} + zip_contents = build_zip_contents_index(db) if has_zipped else {} # Per-entry results details = [] @@ -461,32 +415,20 @@ def main(): parser.error("Specify --platform or --all") return - # Group identical platforms - verified_fps: dict[str, tuple[dict, list[str]]] = {} + # Group identical platforms (same function as generate_pack) + groups = group_identical_platforms(platforms, args.platforms_dir) all_results = {} - for platform in sorted(platforms): - config = load_platform_config(platform, args.platforms_dir) - base_dest = config.get("base_destination", "") - entries = [] - for sys_id, system in sorted(config.get("systems", {}).items()): - for fe in system.get("files", []): - dest = fe.get("destination", fe.get("name", "")) - full_dest = f"{base_dest}/{dest}" if base_dest else dest - entries.append(f"{full_dest}|{fe.get('sha1', '')}|{fe.get('md5', '')}") - fp = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest() - - if fp in verified_fps: - _, group = verified_fps[fp] - group.append(config.get("platform", platform)) - all_results[platform] = verified_fps[fp][0] - continue - + group_results: list[tuple[dict, list[str]]] = [] + for group_platforms, representative in groups: + config = load_platform_config(representative, args.platforms_dir) result = verify_platform(config, db, args.emulators_dir) - all_results[platform] = result - verified_fps[fp] = (result, [config.get("platform", platform)]) + names = [load_platform_config(p, args.platforms_dir).get("platform", p) for p in group_platforms] + group_results.append((result, names)) + for p in group_platforms: + all_results[p] = result if not args.json: - for result, group in verified_fps.values(): + for result, group in group_results: print_platform_result(result, group) print() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_aliases.py b/tests/test_aliases.py new file mode 100644 index 00000000..8c61a2cf --- /dev/null +++ b/tests/test_aliases.py @@ -0,0 +1,201 @@ +"""Tests for alias support in resolve_local_file and generate_db.""" + +from __future__ import annotations + +import os +import sys +import tempfile +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) +from common import compute_hashes, resolve_local_file +from generate_db import build_indexes + + +class TestAliasesInResolve(unittest.TestCase): + """Test that aliases field in file_entry enables resolution.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.content = b"colecovision bios content" + self.file_path = os.path.join(self.tmpdir, "colecovision.rom") + with open(self.file_path, "wb") as f: + f.write(self.content) + hashes = compute_hashes(self.file_path) + self.sha1 = hashes["sha1"] + self.md5 = hashes["md5"] + + self.db = { + "files": { + self.sha1: { + "path": self.file_path, + "name": "colecovision.rom", + "md5": self.md5, + "size": len(self.content), + }, + }, + "indexes": { + "by_md5": {self.md5: self.sha1}, + "by_name": { + "colecovision.rom": [self.sha1], + "coleco.rom": [self.sha1], + }, + "by_crc32": {}, + }, + } + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_aliases_field_enables_name_resolution(self): + """file_entry with aliases: names_to_try includes aliases.""" + entry = { + "name": "BIOS.col", + "aliases": ["coleco.rom"], + } + path, status = resolve_local_file(entry, self.db) + self.assertIsNotNone(path) + self.assertEqual(path, self.file_path) + + def test_primary_name_tried_first(self): + entry = { + "name": "colecovision.rom", + "aliases": ["coleco.rom"], + } + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + self.assertEqual(path, self.file_path) + + def test_alias_duplicate_of_name_ignored(self): + """If alias == name, it's not added twice to names_to_try.""" + entry = { + "name": "colecovision.rom", + "aliases": ["colecovision.rom"], + } + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + + +class TestBuildIndexesWithAliases(unittest.TestCase): + """Test that build_indexes merges alias names into by_name.""" + + def test_aliases_indexed_in_by_name(self): + files = { + "sha1abc": { + "name": "gb_bios.bin", + "md5": "md5abc", + "crc32": "crc32abc", + }, + } + aliases = { + "sha1abc": [ + {"name": "dmg_boot.bin", "path": ""}, + {"name": "dmg_rom.bin", "path": ""}, + ], + } + indexes = build_indexes(files, aliases) + self.assertIn("gb_bios.bin", indexes["by_name"]) + self.assertIn("dmg_boot.bin", indexes["by_name"]) + self.assertIn("dmg_rom.bin", indexes["by_name"]) + self.assertEqual(indexes["by_name"]["dmg_boot.bin"], ["sha1abc"]) + self.assertEqual(indexes["by_name"]["gb_bios.bin"], ["sha1abc"]) + + def test_alias_not_duplicated(self): + """Same SHA1 not added twice for same alias name.""" + files = { + "sha1abc": { + "name": "gb_bios.bin", + "md5": "md5abc", + "crc32": "crc32abc", + }, + } + aliases = { + "sha1abc": [ + {"name": "dmg_boot.bin", "path": ""}, + {"name": "dmg_boot.bin", "path": "other/path"}, + ], + } + indexes = build_indexes(files, aliases) + # SHA1 should appear only once + self.assertEqual(indexes["by_name"]["dmg_boot.bin"].count("sha1abc"), 1) + + +class TestKnownAliasGroups(unittest.TestCase): + """Test that KNOWN_ALIAS_GROUPS cross-linking works via build_indexes.""" + + def test_known_alias_groups_structure(self): + """Verify KNOWN_ALIAS_GROUPS is a list of lists of strings.""" + from generate_db import _collect_all_aliases + # We can't easily call _collect_all_aliases without the real repo, + # but we can verify the constant exists and has the right structure. + # Import the source to check the constant inline. + import importlib + import generate_db + with open(generate_db.__file__) as fh: + source = fh.read() + self.assertIn("KNOWN_ALIAS_GROUPS", source) + self.assertIn("colecovision.rom", source) + self.assertIn("coleco.rom", source) + self.assertIn("gb_bios.bin", source) + self.assertIn("dmg_boot.bin", source) + + +class TestBeetlePsxAliasField(unittest.TestCase): + """Verify aliases field (renamed from alt_names) is used in resolution.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.content = b"psx bios" + self.file_path = os.path.join(self.tmpdir, "scph5501.bin") + with open(self.file_path, "wb") as f: + f.write(self.content) + hashes = compute_hashes(self.file_path) + self.sha1 = hashes["sha1"] + self.md5 = hashes["md5"] + + self.db = { + "files": { + self.sha1: { + "path": self.file_path, + "name": "scph5501.bin", + "md5": self.md5, + "size": len(self.content), + }, + }, + "indexes": { + "by_md5": {self.md5: self.sha1}, + "by_name": { + "scph5501.bin": [self.sha1], + "ps-22a.bin": [self.sha1], + }, + "by_crc32": {}, + }, + } + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_aliases_field_not_alt_names(self): + """The field is 'aliases', not 'alt_names'.""" + entry = { + "name": "ps-22a.bin", + "aliases": ["scph5501.bin"], + } + path, status = resolve_local_file(entry, self.db) + self.assertIsNotNone(path) + + def test_alt_names_field_ignored(self): + """'alt_names' field is not recognized, only 'aliases'.""" + entry = { + "name": "nonexistent.bin", + "alt_names": ["scph5501.bin"], + } + path, status = resolve_local_file(entry, self.db) + self.assertIsNone(path) + self.assertEqual(status, "not_found") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pack.py b/tests/test_pack.py new file mode 100644 index 00000000..11f0d6d3 --- /dev/null +++ b/tests/test_pack.py @@ -0,0 +1,238 @@ +"""Tests for pack generation logic in generate_pack.py.""" + +from __future__ import annotations + +import hashlib +import os +import sys +import tempfile +import unittest +import zipfile + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) +from common import compute_hashes +from generate_pack import build_zip_contents_index + + +class TestBuildZipContentsIndex(unittest.TestCase): + """Test build_zip_contents_index: maps inner ROM MD5 to container SHA1.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.inner_content = b"inner rom data for index test" + self.inner_md5 = hashlib.md5(self.inner_content).hexdigest() + + self.zip_path = os.path.join(self.tmpdir, "container.zip") + with zipfile.ZipFile(self.zip_path, "w") as zf: + zf.writestr("rom.bin", self.inner_content) + + hashes = compute_hashes(self.zip_path) + self.zip_sha1 = hashes["sha1"] + self.zip_md5 = hashes["md5"] + + self.db = { + "files": { + self.zip_sha1: { + "path": self.zip_path, + "name": "container.zip", + "md5": self.zip_md5, + "size": os.path.getsize(self.zip_path), + }, + }, + "indexes": { + "by_md5": {self.zip_md5: self.zip_sha1}, + "by_name": {"container.zip": [self.zip_sha1]}, + "by_crc32": {}, + }, + } + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_inner_md5_maps_to_container_sha1(self): + index = build_zip_contents_index(self.db) + self.assertIn(self.inner_md5, index) + self.assertEqual(index[self.inner_md5], self.zip_sha1) + + def test_non_zip_files_skipped(self): + """Non-ZIP files in db don't appear in index.""" + plain_path = os.path.join(self.tmpdir, "plain.bin") + with open(plain_path, "wb") as f: + f.write(b"not a zip") + hashes = compute_hashes(plain_path) + self.db["files"][hashes["sha1"]] = { + "path": plain_path, + "name": "plain.bin", + "md5": hashes["md5"], + "size": 9, + } + index = build_zip_contents_index(self.db) + # Only the inner_md5 from the ZIP should be present + self.assertEqual(len(index), 1) + + def test_missing_file_skipped(self): + """ZIP path that doesn't exist on disk is skipped.""" + self.db["files"]["fake_sha1"] = { + "path": "/nonexistent/file.zip", + "name": "file.zip", + "md5": "a" * 32, + "size": 0, + } + index = build_zip_contents_index(self.db) + self.assertEqual(len(index), 1) + + def test_bad_zip_skipped(self): + """Corrupt ZIP file is skipped without error.""" + bad_path = os.path.join(self.tmpdir, "bad.zip") + with open(bad_path, "wb") as f: + f.write(b"corrupt data") + hashes = compute_hashes(bad_path) + self.db["files"][hashes["sha1"]] = { + "path": bad_path, + "name": "bad.zip", + "md5": hashes["md5"], + "size": 12, + } + index = build_zip_contents_index(self.db) + self.assertEqual(len(index), 1) + + +class TestFileStatusAggregation(unittest.TestCase): + """Test worst-status-wins logic for pack file aggregation.""" + + def test_worst_status_wins(self): + """Simulate the worst-status-wins dict pattern from generate_pack.""" + sev_order = {"ok": 0, "untested": 1, "missing": 2} + file_status = {} + + def update_status(dest, status): + prev = file_status.get(dest) + if prev is None or sev_order.get(status, 0) > sev_order.get(prev, 0): + file_status[dest] = status + + update_status("system/bios.bin", "ok") + update_status("system/bios.bin", "missing") + self.assertEqual(file_status["system/bios.bin"], "missing") + + update_status("system/other.bin", "untested") + update_status("system/other.bin", "ok") + self.assertEqual(file_status["system/other.bin"], "untested") + + def test_dedup_same_destination_packed_once(self): + """Same destination from multiple systems: only first is packed.""" + seen = set() + packed = [] + entries = [ + {"dest": "shared/bios.bin", "source": "sys1"}, + {"dest": "shared/bios.bin", "source": "sys2"}, + {"dest": "unique/other.bin", "source": "sys3"}, + ] + for e in entries: + if e["dest"] in seen: + continue + seen.add(e["dest"]) + packed.append(e["dest"]) + self.assertEqual(len(packed), 2) + self.assertIn("shared/bios.bin", packed) + self.assertIn("unique/other.bin", packed) + + +class TestEmuDeckNoDestination(unittest.TestCase): + """EmuDeck entries with no destination are counted as checks.""" + + def test_no_destination_counted_as_check(self): + """EmuDeck-style entries (md5 whitelist, no filename) are tracked.""" + file_status = {} + # Simulate generate_pack logic for empty dest + sys_id = "psx" + name = "" + md5 = "abc123" + by_md5 = {"abc123": "sha1_match"} + + dest = "" # empty destination + if not dest: + fkey = f"{sys_id}/{name}" + if md5 and md5 in by_md5: + file_status.setdefault(fkey, "ok") + else: + file_status[fkey] = "missing" + + self.assertIn("psx/", file_status) + self.assertEqual(file_status["psx/"], "ok") + + def test_no_destination_missing(self): + file_status = {} + sys_id = "psx" + name = "" + md5 = "abc123" + by_md5 = {} + + dest = "" + if not dest: + fkey = f"{sys_id}/{name}" + if md5 and md5 in by_md5: + file_status.setdefault(fkey, "ok") + else: + file_status[fkey] = "missing" + + self.assertEqual(file_status["psx/"], "missing") + + +class TestUserProvidedEntries(unittest.TestCase): + """Test user_provided storage handling.""" + + def test_user_provided_creates_instruction_file(self): + """Simulate user_provided entry packing logic.""" + tmpdir = tempfile.mkdtemp() + try: + zip_path = os.path.join(tmpdir, "test_pack.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + entry = { + "name": "PS3UPDAT.PUP", + "storage": "user_provided", + "instructions": "Download from sony.com", + } + instr_name = f"INSTRUCTIONS_{entry['name']}.txt" + zf.writestr(instr_name, f"File needed: {entry['name']}\n\n{entry['instructions']}\n") + + with zipfile.ZipFile(zip_path, "r") as zf: + names = zf.namelist() + self.assertIn("INSTRUCTIONS_PS3UPDAT.PUP.txt", names) + content = zf.read("INSTRUCTIONS_PS3UPDAT.PUP.txt").decode() + self.assertIn("PS3UPDAT.PUP", content) + self.assertIn("sony.com", content) + finally: + import shutil + shutil.rmtree(tmpdir, ignore_errors=True) + + +class TestZippedFileHashMismatch(unittest.TestCase): + """Test zipped_file with hash_mismatch triggers check_inside_zip.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.inner_content = b"correct inner rom" + self.inner_md5 = hashlib.md5(self.inner_content).hexdigest() + self.zip_path = os.path.join(self.tmpdir, "game.zip") + with zipfile.ZipFile(self.zip_path, "w") as zf: + zf.writestr("rom.bin", self.inner_content) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_hash_mismatch_zip_inner_ok(self): + """hash_mismatch on container, but inner ROM MD5 matches.""" + from verify import check_inside_zip, Status + result = check_inside_zip(self.zip_path, "rom.bin", self.inner_md5) + self.assertEqual(result, Status.OK) + + def test_hash_mismatch_zip_inner_not_found(self): + from verify import check_inside_zip + result = check_inside_zip(self.zip_path, "missing.bin", self.inner_md5) + self.assertEqual(result, "not_in_zip") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_resolve.py b/tests/test_resolve.py new file mode 100644 index 00000000..ae4aa345 --- /dev/null +++ b/tests/test_resolve.py @@ -0,0 +1,166 @@ +"""Tests for resolve_local_file from common.py.""" + +from __future__ import annotations + +import hashlib +import os +import sys +import tempfile +import unittest +import zipfile + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) +from common import resolve_local_file, compute_hashes, md5_composite + + +class TestResolveLocalFile(unittest.TestCase): + """Test resolve_local_file resolution chain.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Create a fake BIOS file + self.bios_content = b"fake bios data for testing" + self.bios_path = os.path.join(self.tmpdir, "bios.bin") + with open(self.bios_path, "wb") as f: + f.write(self.bios_content) + hashes = compute_hashes(self.bios_path) + self.sha1 = hashes["sha1"] + self.md5 = hashes["md5"] + self.crc32 = hashes["crc32"] + + # Create a second file in .variants/ + self.variant_path = os.path.join(self.tmpdir, ".variants", "bios.bin.abcd1234") + os.makedirs(os.path.dirname(self.variant_path), exist_ok=True) + self.variant_content = b"variant bios data" + with open(self.variant_path, "wb") as f: + f.write(self.variant_content) + variant_hashes = compute_hashes(self.variant_path) + self.variant_sha1 = variant_hashes["sha1"] + self.variant_md5 = variant_hashes["md5"] + + # Create a ZIP file with an inner ROM + self.zip_path = os.path.join(self.tmpdir, "game.zip") + self.inner_content = b"inner rom data" + self.inner_md5 = hashlib.md5(self.inner_content).hexdigest() + with zipfile.ZipFile(self.zip_path, "w") as zf: + zf.writestr("rom.bin", self.inner_content) + zip_hashes = compute_hashes(self.zip_path) + self.zip_sha1 = zip_hashes["sha1"] + self.zip_md5 = zip_hashes["md5"] + + # Build a minimal database + self.db = { + "files": { + self.sha1: { + "path": self.bios_path, + "name": "bios.bin", + "md5": self.md5, + "size": len(self.bios_content), + }, + self.variant_sha1: { + "path": self.variant_path, + "name": "bios.bin", + "md5": self.variant_md5, + "size": len(self.variant_content), + }, + self.zip_sha1: { + "path": self.zip_path, + "name": "game.zip", + "md5": self.zip_md5, + "size": os.path.getsize(self.zip_path), + }, + }, + "indexes": { + "by_md5": { + self.md5: self.sha1, + self.variant_md5: self.variant_sha1, + self.zip_md5: self.zip_sha1, + }, + "by_name": { + "bios.bin": [self.sha1, self.variant_sha1], + "game.zip": [self.zip_sha1], + "alias.bin": [self.sha1], + }, + "by_crc32": { + self.crc32: self.sha1, + }, + }, + } + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_sha1_exact_match(self): + entry = {"sha1": self.sha1, "name": "bios.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + self.assertEqual(path, self.bios_path) + + def test_md5_direct_match(self): + entry = {"md5": self.md5, "name": "something_else.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "md5_exact") + self.assertEqual(path, self.bios_path) + + def test_name_match_no_md5(self): + """No MD5 provided: resolve by name from by_name index.""" + entry = {"name": "bios.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + # Primary (non-.variants/) path preferred + self.assertEqual(path, self.bios_path) + + def test_alias_match_no_md5(self): + """Alias name in by_name index resolves the file.""" + entry = {"name": "unknown.bin", "aliases": ["alias.bin"]} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + self.assertEqual(path, self.bios_path) + + def test_not_found(self): + entry = {"sha1": "0000000000000000000000000000000000000000", "name": "missing.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertIsNone(path) + self.assertEqual(status, "not_found") + + def test_hash_mismatch_fallback(self): + """File found by name but MD5 doesn't match -> hash_mismatch.""" + wrong_md5 = "a" * 32 + entry = {"name": "bios.bin", "md5": wrong_md5} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "hash_mismatch") + # Should prefer primary over .variants/ + self.assertEqual(path, self.bios_path) + + def test_zipped_file_resolution_via_zip_contents(self): + """zipped_file entry resolved through zip_contents index.""" + zip_contents = {self.inner_md5: self.zip_sha1} + entry = { + "name": "nonexistent_zip.zip", + "md5": self.inner_md5, + "zipped_file": "rom.bin", + } + path, status = resolve_local_file(entry, self.db, zip_contents) + self.assertEqual(status, "zip_exact") + self.assertEqual(path, self.zip_path) + + def test_variants_deprioritized(self): + """Primary path preferred over .variants/ path.""" + # Both bios_path and variant_path have name "bios.bin" in by_name + entry = {"name": "bios.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "exact") + self.assertNotIn(".variants", path) + + def test_truncated_md5_match(self): + """Batocera truncated MD5 (29 chars) matches via prefix.""" + truncated = self.md5[:29] + entry = {"md5": truncated, "name": "something.bin"} + path, status = resolve_local_file(entry, self.db) + self.assertEqual(status, "md5_exact") + self.assertEqual(path, self.bios_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_severity.py b/tests/test_severity.py new file mode 100644 index 00000000..6d3aebde --- /dev/null +++ b/tests/test_severity.py @@ -0,0 +1,184 @@ +"""Exhaustive severity mapping tests across all modes and statuses.""" + +from __future__ import annotations + +import os +import sys +import tempfile +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) +from verify import Status, Severity, compute_severity + + +class TestSeverityMappingExistence(unittest.TestCase): + """Existence mode: RetroArch/Lakka/RetroPie behavior. + + - OK = OK + - UNTESTED = OK (existence doesn't care about hash) + - MISSING + required = WARNING + - MISSING + optional = INFO + """ + + MODE = "existence" + + def test_ok_required(self): + self.assertEqual(compute_severity(Status.OK, True, self.MODE), Severity.OK) + + def test_ok_optional(self): + self.assertEqual(compute_severity(Status.OK, False, self.MODE), Severity.OK) + + def test_untested_required(self): + self.assertEqual(compute_severity(Status.UNTESTED, True, self.MODE), Severity.OK) + + def test_untested_optional(self): + self.assertEqual(compute_severity(Status.UNTESTED, False, self.MODE), Severity.OK) + + def test_missing_required(self): + self.assertEqual(compute_severity(Status.MISSING, True, self.MODE), Severity.WARNING) + + def test_missing_optional(self): + self.assertEqual(compute_severity(Status.MISSING, False, self.MODE), Severity.INFO) + + +class TestSeverityMappingMd5(unittest.TestCase): + """MD5 mode: Batocera/RetroBat/EmuDeck behavior. + + - OK = OK + - UNTESTED + required = WARNING + - UNTESTED + optional = WARNING + - MISSING + required = CRITICAL + - MISSING + optional = WARNING + + Batocera has no required/optional distinction in practice, + but the severity function handles it for Recalbox compatibility. + """ + + MODE = "md5" + + def test_ok_required(self): + self.assertEqual(compute_severity(Status.OK, True, self.MODE), Severity.OK) + + def test_ok_optional(self): + self.assertEqual(compute_severity(Status.OK, False, self.MODE), Severity.OK) + + def test_untested_required(self): + self.assertEqual(compute_severity(Status.UNTESTED, True, self.MODE), Severity.WARNING) + + def test_untested_optional(self): + self.assertEqual(compute_severity(Status.UNTESTED, False, self.MODE), Severity.WARNING) + + def test_missing_required(self): + self.assertEqual(compute_severity(Status.MISSING, True, self.MODE), Severity.CRITICAL) + + def test_missing_optional(self): + self.assertEqual(compute_severity(Status.MISSING, False, self.MODE), Severity.WARNING) + + +class TestSeverityBatoceraBehavior(unittest.TestCase): + """Batocera has no required distinction: all files are treated equally. + + In practice, Batocera YAMLs don't set required=True/False, + so the default (True) applies. Both required and optional + untested files get WARNING severity. + """ + + def test_batocera_no_required_distinction_for_untested(self): + sev_req = compute_severity(Status.UNTESTED, True, "md5") + sev_opt = compute_severity(Status.UNTESTED, False, "md5") + self.assertEqual(sev_req, sev_opt) + self.assertEqual(sev_req, Severity.WARNING) + + +class TestSeverityRecalboxBehavior(unittest.TestCase): + """Recalbox has mandatory field: missing mandatory = CRITICAL (RED). + + Recalbox uses md5 mode with mandatory (required) distinction. + Missing mandatory = CRITICAL (Bios.cpp RED) + Missing optional = WARNING (Bios.cpp YELLOW) + """ + + def test_recalbox_mandatory_missing_is_critical(self): + self.assertEqual( + compute_severity(Status.MISSING, True, "md5"), + Severity.CRITICAL, + ) + + def test_recalbox_optional_missing_is_warning(self): + self.assertEqual( + compute_severity(Status.MISSING, False, "md5"), + Severity.WARNING, + ) + + def test_recalbox_ok_is_ok(self): + self.assertEqual( + compute_severity(Status.OK, True, "md5"), + Severity.OK, + ) + + +class TestSeverityRetroArchBehavior(unittest.TestCase): + """RetroArch existence mode: required missing = WARNING, optional = INFO.""" + + def test_retroarch_required_missing_is_warning(self): + self.assertEqual( + compute_severity(Status.MISSING, True, "existence"), + Severity.WARNING, + ) + + def test_retroarch_optional_missing_is_info(self): + self.assertEqual( + compute_severity(Status.MISSING, False, "existence"), + Severity.INFO, + ) + + def test_retroarch_untested_ignored(self): + """Existence mode ignores untested (hash doesn't matter).""" + self.assertEqual( + compute_severity(Status.UNTESTED, True, "existence"), + Severity.OK, + ) + + +class TestSeverityAllCombinations(unittest.TestCase): + """Exhaustive matrix: all status x required x mode combinations.""" + + EXPECTED = { + # (status, required, mode): severity + (Status.OK, True, "existence"): Severity.OK, + (Status.OK, False, "existence"): Severity.OK, + (Status.OK, True, "md5"): Severity.OK, + (Status.OK, False, "md5"): Severity.OK, + (Status.UNTESTED, True, "existence"): Severity.OK, + (Status.UNTESTED, False, "existence"): Severity.OK, + (Status.UNTESTED, True, "md5"): Severity.WARNING, + (Status.UNTESTED, False, "md5"): Severity.WARNING, + (Status.MISSING, True, "existence"): Severity.WARNING, + (Status.MISSING, False, "existence"): Severity.INFO, + (Status.MISSING, True, "md5"): Severity.CRITICAL, + (Status.MISSING, False, "md5"): Severity.WARNING, + } + + def test_all_combinations(self): + for (status, required, mode), expected_severity in self.EXPECTED.items(): + with self.subTest(status=status, required=required, mode=mode): + actual = compute_severity(status, required, mode) + self.assertEqual( + actual, + expected_severity, + f"compute_severity({status!r}, {required}, {mode!r}) = " + f"{actual!r}, expected {expected_severity!r}", + ) + + def test_all_12_combinations_covered(self): + statuses = [Status.OK, Status.UNTESTED, Status.MISSING] + requireds = [True, False] + modes = ["existence", "md5"] + all_combos = { + (s, r, m) for s in statuses for r in requireds for m in modes + } + self.assertEqual(all_combos, set(self.EXPECTED.keys())) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_verify.py b/tests/test_verify.py new file mode 100644 index 00000000..dde411ea --- /dev/null +++ b/tests/test_verify.py @@ -0,0 +1,334 @@ +"""Tests for verification logic in verify.py.""" + +from __future__ import annotations + +import hashlib +import os +import sys +import tempfile +import unittest +import zipfile + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) +from common import md5sum +from verify import ( + Status, + Severity, + check_inside_zip, + compute_severity, + verify_entry_existence, + verify_entry_md5, + verify_platform, +) + + +class TestComputeSeverity(unittest.TestCase): + """Exhaustive test of compute_severity for all 12 combinations.""" + + # existence mode + def test_existence_ok_required(self): + self.assertEqual(compute_severity(Status.OK, True, "existence"), Severity.OK) + + def test_existence_ok_optional(self): + self.assertEqual(compute_severity(Status.OK, False, "existence"), Severity.OK) + + def test_existence_missing_required(self): + self.assertEqual(compute_severity(Status.MISSING, True, "existence"), Severity.WARNING) + + def test_existence_missing_optional(self): + self.assertEqual(compute_severity(Status.MISSING, False, "existence"), Severity.INFO) + + def test_existence_untested_required(self): + self.assertEqual(compute_severity(Status.UNTESTED, True, "existence"), Severity.OK) + + def test_existence_untested_optional(self): + self.assertEqual(compute_severity(Status.UNTESTED, False, "existence"), Severity.OK) + + # md5 mode + def test_md5_ok_required(self): + self.assertEqual(compute_severity(Status.OK, True, "md5"), Severity.OK) + + def test_md5_ok_optional(self): + self.assertEqual(compute_severity(Status.OK, False, "md5"), Severity.OK) + + def test_md5_missing_required(self): + self.assertEqual(compute_severity(Status.MISSING, True, "md5"), Severity.CRITICAL) + + def test_md5_missing_optional(self): + self.assertEqual(compute_severity(Status.MISSING, False, "md5"), Severity.WARNING) + + def test_md5_untested_required(self): + self.assertEqual(compute_severity(Status.UNTESTED, True, "md5"), Severity.WARNING) + + def test_md5_untested_optional(self): + self.assertEqual(compute_severity(Status.UNTESTED, False, "md5"), Severity.WARNING) + + +class TestVerifyEntryExistence(unittest.TestCase): + """Test verify_entry_existence: present, missing+required, missing+optional.""" + + def test_present(self): + entry = {"name": "bios.bin", "required": True} + result = verify_entry_existence(entry, "/some/path") + self.assertEqual(result["status"], Status.OK) + self.assertTrue(result["required"]) + + def test_missing_required(self): + entry = {"name": "bios.bin", "required": True} + result = verify_entry_existence(entry, None) + self.assertEqual(result["status"], Status.MISSING) + self.assertTrue(result["required"]) + + def test_missing_optional(self): + entry = {"name": "bios.bin", "required": False} + result = verify_entry_existence(entry, None) + self.assertEqual(result["status"], Status.MISSING) + self.assertFalse(result["required"]) + + def test_required_defaults_true(self): + entry = {"name": "bios.bin"} + result = verify_entry_existence(entry, None) + self.assertTrue(result["required"]) + + +class TestVerifyEntryMd5(unittest.TestCase): + """Test verify_entry_md5 with various scenarios.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.content = b"test bios content for md5" + self.file_path = os.path.join(self.tmpdir, "bios.bin") + with open(self.file_path, "wb") as f: + f.write(self.content) + self.actual_md5 = md5sum(self.file_path) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_md5_match(self): + entry = {"name": "bios.bin", "md5": self.actual_md5} + result = verify_entry_md5(entry, self.file_path) + self.assertEqual(result["status"], Status.OK) + + def test_md5_mismatch(self): + entry = {"name": "bios.bin", "md5": "a" * 32} + result = verify_entry_md5(entry, self.file_path) + self.assertEqual(result["status"], Status.UNTESTED) + self.assertIn("reason", result) + + def test_multi_hash_recalbox(self): + """Recalbox comma-separated MD5 list: any match = OK.""" + wrong_md5 = "b" * 32 + entry = {"name": "bios.bin", "md5": f"{wrong_md5},{self.actual_md5}"} + result = verify_entry_md5(entry, self.file_path) + self.assertEqual(result["status"], Status.OK) + + def test_truncated_md5_batocera(self): + """Batocera 29-char truncated MD5 matches via prefix.""" + truncated = self.actual_md5[:29] + entry = {"name": "bios.bin", "md5": truncated} + result = verify_entry_md5(entry, self.file_path) + self.assertEqual(result["status"], Status.OK) + + def test_no_md5_is_ok(self): + """No MD5 expected: file present = OK.""" + entry = {"name": "bios.bin"} + result = verify_entry_md5(entry, self.file_path) + self.assertEqual(result["status"], Status.OK) + + def test_md5_exact_resolve_status_bypass(self): + """resolve_status='md5_exact' skips hash computation.""" + entry = {"name": "bios.bin", "md5": "wrong" * 8} + result = verify_entry_md5(entry, self.file_path, resolve_status="md5_exact") + self.assertEqual(result["status"], Status.OK) + + def test_missing_file(self): + entry = {"name": "bios.bin", "md5": self.actual_md5, "required": True} + result = verify_entry_md5(entry, None) + self.assertEqual(result["status"], Status.MISSING) + + def test_required_propagated(self): + entry = {"name": "bios.bin", "md5": self.actual_md5, "required": False} + result = verify_entry_md5(entry, self.file_path) + self.assertFalse(result["required"]) + + +class TestCheckInsideZip(unittest.TestCase): + """Test check_inside_zip for various scenarios.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.inner_content = b"inner rom content" + self.inner_md5 = hashlib.md5(self.inner_content).hexdigest() + + self.zip_path = os.path.join(self.tmpdir, "container.zip") + with zipfile.ZipFile(self.zip_path, "w") as zf: + zf.writestr("ROM.BIN", self.inner_content) + + self.bad_zip = os.path.join(self.tmpdir, "bad.zip") + with open(self.bad_zip, "wb") as f: + f.write(b"not a zip file") + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_found_and_match(self): + result = check_inside_zip(self.zip_path, "ROM.BIN", self.inner_md5) + self.assertEqual(result, Status.OK) + + def test_found_and_mismatch(self): + result = check_inside_zip(self.zip_path, "ROM.BIN", "f" * 32) + self.assertEqual(result, Status.UNTESTED) + + def test_not_in_zip(self): + result = check_inside_zip(self.zip_path, "MISSING.BIN", self.inner_md5) + self.assertEqual(result, "not_in_zip") + + def test_bad_zip(self): + result = check_inside_zip(self.bad_zip, "ROM.BIN", self.inner_md5) + self.assertEqual(result, "error") + + def test_casefold_match(self): + """Batocera uses casefold() for filename comparison.""" + result = check_inside_zip(self.zip_path, "rom.bin", self.inner_md5) + self.assertEqual(result, Status.OK) + + def test_empty_md5_means_ok(self): + """Empty expected_md5 -> OK if file found (existence check inside ZIP).""" + result = check_inside_zip(self.zip_path, "ROM.BIN", "") + self.assertEqual(result, Status.OK) + + +class TestVerifyPlatform(unittest.TestCase): + """Test verify_platform aggregation logic.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Create two files + self.file_a = os.path.join(self.tmpdir, "a.bin") + self.file_b = os.path.join(self.tmpdir, "b.bin") + with open(self.file_a, "wb") as f: + f.write(b"file a content") + with open(self.file_b, "wb") as f: + f.write(b"file b content") + + from common import compute_hashes + ha = compute_hashes(self.file_a) + hb = compute_hashes(self.file_b) + + self.db = { + "files": { + ha["sha1"]: {"path": self.file_a, "name": "a.bin", "md5": ha["md5"], "size": 14}, + hb["sha1"]: {"path": self.file_b, "name": "b.bin", "md5": hb["md5"], "size": 14}, + }, + "indexes": { + "by_md5": { + ha["md5"]: ha["sha1"], + hb["md5"]: hb["sha1"], + }, + "by_name": { + "a.bin": [ha["sha1"]], + "b.bin": [hb["sha1"]], + }, + "by_crc32": {}, + }, + } + self.sha1_a = ha["sha1"] + self.sha1_b = hb["sha1"] + self.md5_a = ha["md5"] + self.md5_b = hb["md5"] + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_all_ok_existence(self): + config = { + "platform": "TestPlatform", + "verification_mode": "existence", + "systems": { + "sys1": { + "files": [ + {"name": "a.bin", "sha1": self.sha1_a, "required": True}, + {"name": "b.bin", "sha1": self.sha1_b, "required": False}, + ] + } + }, + } + # No emulators dir needed for basic test + emu_dir = os.path.join(self.tmpdir, "emulators") + os.makedirs(emu_dir, exist_ok=True) + result = verify_platform(config, self.db, emu_dir) + self.assertEqual(result["platform"], "TestPlatform") + self.assertEqual(result["verification_mode"], "existence") + self.assertEqual(result["total_files"], 2) + self.assertEqual(result["severity_counts"][Severity.OK], 2) + + def test_worst_status_wins_per_destination(self): + """Two entries for same destination: worst status wins.""" + config = { + "platform": "Test", + "verification_mode": "existence", + "systems": { + "sys1": { + "files": [ + {"name": "a.bin", "sha1": self.sha1_a, "destination": "shared.bin", "required": True}, + ] + }, + "sys2": { + "files": [ + {"name": "missing.bin", "sha1": "0" * 40, "destination": "shared.bin", "required": True}, + ] + }, + }, + } + emu_dir = os.path.join(self.tmpdir, "emulators") + os.makedirs(emu_dir, exist_ok=True) + result = verify_platform(config, self.db, emu_dir) + # shared.bin should have worst status (missing) + self.assertEqual(result["total_files"], 1) + # The worst severity for required+missing in existence mode = WARNING + self.assertEqual(result["severity_counts"][Severity.WARNING], 1) + + def test_severity_counts_sum_to_total(self): + config = { + "platform": "Test", + "verification_mode": "md5", + "systems": { + "sys1": { + "files": [ + {"name": "a.bin", "sha1": self.sha1_a, "md5": self.md5_a, "required": True}, + {"name": "missing.bin", "sha1": "0" * 40, "md5": "f" * 32, "required": True}, + ] + } + }, + } + emu_dir = os.path.join(self.tmpdir, "emulators") + os.makedirs(emu_dir, exist_ok=True) + result = verify_platform(config, self.db, emu_dir) + total_from_counts = sum(result["severity_counts"].values()) + self.assertEqual(total_from_counts, result["total_files"]) + + def test_required_field_in_details(self): + config = { + "platform": "Test", + "verification_mode": "existence", + "systems": { + "sys1": { + "files": [ + {"name": "a.bin", "sha1": self.sha1_a, "required": False}, + ] + } + }, + } + emu_dir = os.path.join(self.tmpdir, "emulators") + os.makedirs(emu_dir, exist_ok=True) + result = verify_platform(config, self.db, emu_dir) + detail = result["details"][0] + self.assertFalse(detail["required"]) + + +if __name__ == "__main__": + unittest.main()