refactor: DRY consolidation + 83 unit tests

Moved shared functions to common.py (single source of truth):
- check_inside_zip (was in verify.py, imported by generate_pack)
- build_zip_contents_index (was duplicated in verify + generate_pack)
- load_emulator_profiles (was in verify, cross_reference, generate_site)
- group_identical_platforms (was in verify + generate_pack)

Added tests/ with 83 unit tests covering:
- resolve_local_file: SHA1, MD5, name, alias, truncated, zip_contents
- verify: existence, md5, zipped_file, multi-hash, severity mapping
- aliases: field parsing, by_name indexing, beetle_psx field rename
- pack: dedup, file_status, zipped_file inner check, EmuDeck entries
- severity: all 12 combinations, platform-native behavior

0 regressions: pipeline.py --all produces identical results.
This commit is contained in:
Abdessamad Derraz
2026-03-19 11:19:50 +01:00
parent 011d0f9441
commit b9cdda07ee
10 changed files with 1243 additions and 148 deletions

View File

@@ -243,6 +243,102 @@ def resolve_local_file(
return None, "not_found"
def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
"""Check a ROM inside a ZIP — replicates Batocera checkInsideZip().
Returns "ok", "untested", "not_in_zip", or "error".
"""
try:
with zipfile.ZipFile(container) as archive:
for fname in archive.namelist():
if fname.casefold() == file_name.casefold():
if expected_md5 == "":
return "ok"
with archive.open(fname) as entry:
actual = md5sum(entry)
return "ok" if actual == expected_md5 else "untested"
return "not_in_zip"
except (zipfile.BadZipFile, OSError, KeyError):
return "error"
def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) -> dict:
"""Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files."""
index: dict[str, str] = {}
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
if info.is_dir() or info.file_size > max_entry_size:
continue
data = zf.read(info.filename)
index[hashlib.md5(data).hexdigest()] = sha1
except (zipfile.BadZipFile, OSError):
continue
return index
def load_emulator_profiles(
emulators_dir: str, skip_aliases: bool = True,
) -> dict[str, dict]:
"""Load all emulator YAML profiles from a directory."""
try:
import yaml
except ImportError:
return {}
profiles = {}
emu_path = Path(emulators_dir)
if not emu_path.exists():
return profiles
for f in sorted(emu_path.glob("*.yml")):
with open(f) as fh:
profile = yaml.safe_load(fh) or {}
if "emulator" not in profile:
continue
if skip_aliases and profile.get("type") == "alias":
continue
profiles[f.stem] = profile
return profiles
def group_identical_platforms(
platforms: list[str], platforms_dir: str,
) -> list[tuple[list[str], str]]:
"""Group platforms that produce identical packs (same files + base_destination).
Returns [(group_of_platform_names, representative), ...].
"""
fingerprints: dict[str, list[str]] = {}
representatives: dict[str, str] = {}
for platform in platforms:
try:
config = load_platform_config(platform, platforms_dir)
except FileNotFoundError:
fingerprints.setdefault(platform, []).append(platform)
representatives.setdefault(platform, platform)
continue
base_dest = config.get("base_destination", "")
entries = []
for sys_id, system in sorted(config.get("systems", {}).items()):
for fe in system.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
full_dest = f"{base_dest}/{dest}" if base_dest else dest
sha1 = fe.get("sha1", "")
md5 = fe.get("md5", "")
entries.append(f"{full_dest}|{sha1}|{md5}")
fp = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest()
fingerprints.setdefault(fp, []).append(platform)
representatives.setdefault(fp, platform)
return [(group, representatives[fp]) for fp, group in fingerprints.items()]
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
"""Extract a ZIP file safely, preventing zip-slip path traversal."""
dest = os.path.realpath(dest_dir)

View File

@@ -25,27 +25,13 @@ except ImportError:
sys.exit(1)
sys.path.insert(0, os.path.dirname(__file__))
from common import load_database, load_platform_config
from common import load_database, load_emulator_profiles, load_platform_config
DEFAULT_EMULATORS_DIR = "emulators"
DEFAULT_PLATFORMS_DIR = "platforms"
DEFAULT_DB = "database.json"
def load_emulator_profiles(emulators_dir: str) -> dict[str, dict]:
"""Load all emulator YAML profiles."""
profiles = {}
emu_path = Path(emulators_dir)
if not emu_path.exists():
return profiles
for f in sorted(emu_path.glob("*.yml")):
with open(f) as fh:
profile = yaml.safe_load(fh) or {}
if "emulator" in profile:
profiles[f.stem] = profile
return profiles
def load_platform_files(platforms_dir: str) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
"""Load all platform configs and collect declared filenames + data_directories per system."""
declared = {}

View File

@@ -24,7 +24,11 @@ import zipfile
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import compute_hashes, load_database, load_data_dir_registry, load_platform_config, md5_composite, resolve_local_file
from common import (
build_zip_contents_index, check_inside_zip, compute_hashes,
group_identical_platforms, load_database, load_data_dir_registry,
load_platform_config, md5_composite, resolve_local_file,
)
try:
import yaml
@@ -123,27 +127,6 @@ def resolve_file(file_entry: dict, db: dict, bios_dir: str,
return None, "not_found"
def build_zip_contents_index(db: dict) -> dict:
"""Build index of {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files."""
index = {}
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
if info.is_dir():
continue
if info.file_size > MAX_ENTRY_SIZE:
continue
data = zf.read(info.filename)
inner_md5 = hashlib.md5(data).hexdigest()
index[inner_md5] = sha1
except (zipfile.BadZipFile, OSError):
continue
return index
def download_external(file_entry: dict, dest_path: str) -> bool:
"""Download an external BIOS file, verify hash, save to dest_path."""
@@ -362,7 +345,6 @@ def generate_pack(
if status == "hash_mismatch" and verification_mode != "existence":
zf_name = file_entry.get("zipped_file")
if zf_name and local_path:
from verify import check_inside_zip
inner_md5 = file_entry.get("md5", "")
inner_result = check_inside_zip(local_path, zf_name, inner_md5)
if inner_result == "ok":
@@ -537,7 +519,7 @@ def main():
if updated:
print(f"Refreshed {updated} data director{'ies' if updated > 1 else 'y'}")
groups = _group_identical_platforms(platforms, args.platforms_dir)
groups = group_identical_platforms(platforms, args.platforms_dir)
for group_platforms, representative in groups:
if len(group_platforms) > 1:
@@ -565,39 +547,5 @@ def main():
print(f" ERROR: {e}")
def _group_identical_platforms(platforms: list[str], platforms_dir: str) -> list[tuple[list[str], str]]:
"""Group platforms that would produce identical ZIP packs.
Returns [(group_of_platform_names, representative_platform), ...].
Platforms with the same resolved systems+files+base_destination are grouped.
"""
fingerprints = {}
representatives = {}
for platform in platforms:
try:
config = load_platform_config(platform, platforms_dir)
except FileNotFoundError:
fingerprints.setdefault(platform, []).append(platform)
representatives.setdefault(platform, platform)
continue
base_dest = config.get("base_destination", "")
entries = []
for sys_id, system in sorted(config.get("systems", {}).items()):
for fe in system.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
full_dest = f"{base_dest}/{dest}" if base_dest else dest
sha1 = fe.get("sha1", "")
md5 = fe.get("md5", "")
entries.append(f"{full_dest}|{sha1}|{md5}")
fingerprint = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest()
fingerprints.setdefault(fingerprint, []).append(platform)
representatives.setdefault(fingerprint, platform)
return [(group, representatives[fp]) for fp, group in fingerprints.items()]
if __name__ == "__main__":
main()

View File

@@ -34,7 +34,11 @@ except ImportError:
sys.exit(1)
sys.path.insert(0, os.path.dirname(__file__))
from common import load_platform_config, md5sum, md5_composite, resolve_local_file
from common import (
build_zip_contents_index, check_inside_zip, group_identical_platforms,
load_emulator_profiles, load_platform_config, md5sum, md5_composite,
resolve_local_file,
)
DEFAULT_DB = "database.json"
DEFAULT_PLATFORMS_DIR = "platforms"
@@ -63,25 +67,6 @@ class Severity:
# Verification functions
# ---------------------------------------------------------------------------
def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
"""Replicate Batocera checkInsideZip() — batocera-systems:978-1009."""
try:
with zipfile.ZipFile(container) as archive:
for fname in archive.namelist():
if fname.casefold() == file_name.casefold():
if expected_md5 == "":
return Status.OK
with archive.open(fname) as entry:
actual = md5sum(entry)
if actual == expected_md5:
return Status.OK
else:
return Status.UNTESTED
return "not_in_zip"
except (zipfile.BadZipFile, OSError, KeyError):
return "error"
def verify_entry_existence(file_entry: dict, local_path: str | None) -> dict:
"""RetroArch verification: path_is_valid() — file exists = OK."""
name = file_entry.get("name", "")
@@ -190,41 +175,10 @@ def compute_severity(status: str, required: bool, mode: str) -> str:
# ZIP content index
# ---------------------------------------------------------------------------
def _build_zip_contents_index(db: dict) -> dict:
index: dict[str, str] = {}
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
if info.is_dir() or info.file_size > 512 * 1024 * 1024:
continue
data = zf.read(info.filename)
index[hashlib.md5(data).hexdigest()] = sha1
except (zipfile.BadZipFile, OSError):
continue
return index
# ---------------------------------------------------------------------------
# Cross-reference: undeclared files used by cores
# ---------------------------------------------------------------------------
def _load_emulator_profiles(emulators_dir: str) -> dict[str, dict]:
profiles = {}
emu_path = Path(emulators_dir)
if not emu_path.exists():
return profiles
for f in sorted(emu_path.glob("*.yml")):
with open(f) as fh:
profile = yaml.safe_load(fh) or {}
if "emulator" in profile and profile.get("type") != "alias":
profiles[f.stem] = profile
return profiles
def find_undeclared_files(
config: dict,
emulators_dir: str,
@@ -250,7 +204,7 @@ def find_undeclared_files(
declared_dd.add(ref)
by_name = db.get("indexes", {}).get("by_name", {})
profiles = _load_emulator_profiles(emulators_dir)
profiles = load_emulator_profiles(emulators_dir)
undeclared = []
seen = set()
@@ -303,7 +257,7 @@ def verify_platform(config: dict, db: dict, emulators_dir: str = DEFAULT_EMULATO
for sys in config.get("systems", {}).values()
for fe in sys.get("files", [])
)
zip_contents = _build_zip_contents_index(db) if has_zipped else {}
zip_contents = build_zip_contents_index(db) if has_zipped else {}
# Per-entry results
details = []
@@ -461,32 +415,20 @@ def main():
parser.error("Specify --platform or --all")
return
# Group identical platforms
verified_fps: dict[str, tuple[dict, list[str]]] = {}
# Group identical platforms (same function as generate_pack)
groups = group_identical_platforms(platforms, args.platforms_dir)
all_results = {}
for platform in sorted(platforms):
config = load_platform_config(platform, args.platforms_dir)
base_dest = config.get("base_destination", "")
entries = []
for sys_id, system in sorted(config.get("systems", {}).items()):
for fe in system.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
full_dest = f"{base_dest}/{dest}" if base_dest else dest
entries.append(f"{full_dest}|{fe.get('sha1', '')}|{fe.get('md5', '')}")
fp = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest()
if fp in verified_fps:
_, group = verified_fps[fp]
group.append(config.get("platform", platform))
all_results[platform] = verified_fps[fp][0]
continue
group_results: list[tuple[dict, list[str]]] = []
for group_platforms, representative in groups:
config = load_platform_config(representative, args.platforms_dir)
result = verify_platform(config, db, args.emulators_dir)
all_results[platform] = result
verified_fps[fp] = (result, [config.get("platform", platform)])
names = [load_platform_config(p, args.platforms_dir).get("platform", p) for p in group_platforms]
group_results.append((result, names))
for p in group_platforms:
all_results[p] = result
if not args.json:
for result, group in verified_fps.values():
for result, group in group_results:
print_platform_result(result, group)
print()