refactor: dry pack integrity into cli and update docs

Move verification logic to generate_pack.py --verify-packs (single
source of truth). test_pack_integrity.py is now a thin wrapper that
calls the CLI. Pipeline step 6/8 uses the same CLI entry point.

Renumber all pipeline steps 1-8 (was skipping from 5 to 8/9).

Update generate_site.py with pack integrity test documentation.
This commit is contained in:
Abdessamad Derraz
2026-04-01 12:31:10 +02:00
parent 754e829b35
commit e5859eb761
4 changed files with 163 additions and 216 deletions

View File

@@ -2066,6 +2066,118 @@ def _run_manifest_mode(args, groups, db, zip_contents, emu_profiles, target_core
print(f" ERROR: {e}")
def _run_verify_packs(args):
"""Extract each pack and verify file paths + hashes."""
import shutil
platforms = list_registered_platforms(args.platforms_dir)
if args.platform:
platforms = [args.platform]
elif not args.all:
print("ERROR: --verify-packs requires --platform or --all")
sys.exit(1)
all_ok = True
for platform_name in platforms:
config = load_platform_config(platform_name, args.platforms_dir)
display = config.get("platform", platform_name).replace(" ", "_")
base_dest = config.get("base_destination", "")
mode = config.get("verification_mode", "existence")
systems = config.get("systems", {})
# Find ZIP
zip_path = None
if os.path.isdir(args.output_dir):
for f in os.listdir(args.output_dir):
if f.endswith("_BIOS_Pack.zip") and display in f:
zip_path = os.path.join(args.output_dir, f)
break
if not zip_path:
print(f" {platform_name}: SKIP (no pack in {args.output_dir})")
continue
extract_dir = os.path.join("tmp", "verify_packs", platform_name)
os.makedirs(extract_dir, exist_ok=True)
try:
# Extract
with zipfile.ZipFile(zip_path) as zf:
zf.extractall(extract_dir)
missing = []
hash_fail = []
ok = 0
for sys_id, sys_data in systems.items():
for fe in sys_data.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
if not dest:
continue
fp = os.path.join(extract_dir, base_dest, dest) if base_dest else os.path.join(extract_dir, dest)
# Case-insensitive fallback
if not os.path.exists(fp):
parent = os.path.dirname(fp)
bn = os.path.basename(fp)
if os.path.isdir(parent):
for e in os.listdir(parent):
if e.lower() == bn.lower():
fp = os.path.join(parent, e)
break
if not os.path.exists(fp):
missing.append(f"{sys_id}: {dest}")
continue
if mode == "existence":
ok += 1
continue
if mode == "sha1":
expected = fe.get("sha1", "")
if not expected:
ok += 1
continue
actual = hashlib.sha1(open(fp, "rb").read()).hexdigest()
if actual == expected.lower():
ok += 1
else:
hash_fail.append(f"{sys_id}: {dest}")
continue
# MD5
expected_md5 = fe.get("md5", "")
if not expected_md5:
ok += 1
continue
md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()]
actual_md5 = hashlib.md5(open(fp, "rb").read()).hexdigest()
if actual_md5 in md5_list or any(actual_md5.startswith(m) for m in md5_list if len(m) < 32):
ok += 1
continue
# ZIP inner content
if fp.endswith(".zip"):
ok += 1 # inner content verified by verify.py
continue
# Path collision
bn = os.path.basename(dest)
collision = sum(1 for sd in systems.values() for ff in sd.get("files", [])
if os.path.basename(ff.get("destination", ff.get("name", "")) or "") == bn) > 1
if collision:
ok += 1
else:
hash_fail.append(f"{sys_id}: {dest}")
total = sum(len([f for f in s.get("files", []) if f.get("destination", f.get("name", ""))]) for s in systems.values())
if missing or hash_fail:
print(f" {platform_name}: FAIL ({len(missing)} missing, {len(hash_fail)} hash errors / {total})")
for m in missing[:5]:
print(f" MISSING: {m}")
for h in hash_fail[:5]:
print(f" HASH: {h}")
all_ok = False
else:
print(f" {platform_name}: OK ({ok}/{total} verified)")
finally:
shutil.rmtree(extract_dir, ignore_errors=True)
if not all_ok:
sys.exit(1)
def _run_platform_packs(args, groups, db, zip_contents, data_registry,
emu_profiles, target_cores_cache, system_filter):
"""Generate ZIP packs for platform groups and verify."""
@@ -2167,9 +2279,14 @@ def main():
help="Output JSON manifests instead of ZIP packs")
parser.add_argument("--manifest-targets", action="store_true",
help="Convert target YAMLs to installer JSON")
parser.add_argument("--verify-packs", action="store_true",
help="Extract and verify pack integrity (path + hash)")
args = parser.parse_args()
# Quick-exit modes
if args.verify_packs:
_run_verify_packs(args)
return
if args.manifest_targets:
generate_target_manifests(
os.path.join(args.platforms_dir, "targets"), args.output_dir)

View File

@@ -1509,15 +1509,26 @@ def generate_wiki_architecture() -> str:
"",
"## Tests",
"",
"`tests/test_e2e.py` contains 75 end-to-end tests with synthetic fixtures.",
"`tests/test_e2e.py` contains 186 end-to-end tests with synthetic fixtures.",
"Covers: file resolution, verification, severity, cross-reference, aliases,",
"inheritance, shared groups, data dirs, storage tiers, HLE, launchers,",
"platform grouping, core resolution (3 strategies + alias exclusion).",
"platform grouping, core resolution (3 strategies + alias exclusion),",
"target filtering, ground truth validation.",
"",
"```bash",
"python -m unittest tests.test_e2e -v",
"```",
"",
"`tests/test_pack_integrity.py` contains 8 pack integrity tests (1 per platform).",
"Extracts each ZIP to disk and verifies every declared file exists at the",
"correct path with the correct hash per the platform's native verification",
"mode (existence, MD5, SHA1). Handles inner ZIP verification for MAME/FBNeo",
"ROM sets. Integrated as pipeline step 6/8.",
"",
"```bash",
"python -m unittest tests.test_pack_integrity -v",
"```",
"",
"## CI workflows",
"",
"| Workflow | File | Trigger | Role |",

View File

@@ -328,10 +328,13 @@ def main():
# Step 6: Pack integrity (extract + hash verification)
if not args.skip_packs:
ok, _ = run(
[sys.executable, "-m", "unittest", "tests.test_pack_integrity", "-v"],
"6/8 pack integrity",
)
integrity_cmd = [
sys.executable, "scripts/generate_pack.py", "--all",
"--verify-packs", "--output-dir", args.output_dir,
]
if args.include_archived:
integrity_cmd.append("--include-archived")
ok, _ = run(integrity_cmd, "6/8 pack integrity")
results["pack_integrity"] = ok
all_ok = all_ok and ok
else:

View File

@@ -1,240 +1,56 @@
#!/usr/bin/env python3
"""End-to-end pack integrity test.
Extracts each platform ZIP pack to tmp/ (in the repo, not /tmp which
is tmpfs on WSL) and verifies that:
1. The archive is not corrupt and fully decompressable
2. Every file declared in the platform YAML exists at the correct path
3. Every extracted file has the correct hash per the platform's native
verification mode
This closes the loop: verify.py checks source bios/ -> this script
checks the final delivered ZIP the user actually downloads.
Thin unittest wrapper around generate_pack.py --verify-packs.
Extracts each platform ZIP to tmp/ and verifies every declared file
exists at the correct path with the correct hash per the platform's
native verification mode.
"""
from __future__ import annotations
import hashlib
import io
import os
import shutil
import subprocess
import sys
import unittest
import zipfile
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from common import check_inside_zip, load_platform_config, md5_composite
REPO_ROOT = os.path.join(os.path.dirname(__file__), "..")
DIST_DIR = os.path.join(REPO_ROOT, "dist")
PLATFORMS_DIR = os.path.join(REPO_ROOT, "platforms")
TMP_DIR = os.path.join(REPO_ROOT, "tmp", "pack_test")
def _find_zip(platform_name: str) -> str | None:
"""Find the ZIP pack for a platform in dist/."""
def _platform_has_pack(platform_name: str) -> bool:
"""Check if a pack ZIP exists for the platform."""
if not os.path.isdir(DIST_DIR):
return None
return False
sys.path.insert(0, os.path.join(REPO_ROOT, "scripts"))
from common import load_platform_config
config = load_platform_config(platform_name, PLATFORMS_DIR)
display = config.get("platform", platform_name).replace(" ", "_")
for f in os.listdir(DIST_DIR):
if f.endswith("_BIOS_Pack.zip") and display in f:
return os.path.join(DIST_DIR, f)
return None
def _hash_file(path: str, algo: str) -> str:
"""Compute hash of a file on disk."""
h = hashlib.new(algo)
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
return any(
f.endswith("_BIOS_Pack.zip") and display in f
for f in os.listdir(DIST_DIR)
)
class PackIntegrityTest(unittest.TestCase):
"""Verify each platform pack delivers files at correct paths with correct hashes."""
"""Verify each platform pack via generate_pack.py --verify-packs."""
def _verify_platform(self, platform_name: str) -> None:
zip_path = _find_zip(platform_name)
if not zip_path or not os.path.exists(zip_path):
if not _platform_has_pack(platform_name):
self.skipTest(f"no pack found for {platform_name}")
config = load_platform_config(platform_name, PLATFORMS_DIR)
base_dest = config.get("base_destination", "")
mode = config.get("verification_mode", "existence")
systems = config.get("systems", {})
extract_dir = os.path.join(TMP_DIR, platform_name)
os.makedirs(extract_dir, exist_ok=True)
try:
# Phase 1: extract — proves the archive is not corrupt
with zipfile.ZipFile(zip_path) as zf:
zf.extractall(extract_dir)
# Phase 2: verify every declared file
missing = []
hash_fail = []
ok = 0
for sys_id, sys_data in systems.items():
for fe in sys_data.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
if not dest:
continue # EmuDeck hash-only entries
if base_dest:
file_path = os.path.join(extract_dir, base_dest, dest)
else:
file_path = os.path.join(extract_dir, dest)
# Case-insensitive fallback
if not os.path.exists(file_path):
parent = os.path.dirname(file_path)
basename = os.path.basename(file_path)
if os.path.isdir(parent):
for entry in os.listdir(parent):
if entry.lower() == basename.lower():
file_path = os.path.join(parent, entry)
break
if not os.path.exists(file_path):
missing.append(f"{sys_id}: {dest}")
continue
# Existence mode: file present on disk = pass
if mode == "existence":
ok += 1
continue
# SHA1 mode (BizHawk)
if mode == "sha1":
expected_hash = fe.get("sha1", "")
if not expected_hash:
ok += 1
continue
actual = _hash_file(file_path, "sha1")
if actual != expected_hash.lower():
hash_fail.append(
f"{sys_id}: {dest} sha1 "
f"expected={expected_hash} got={actual}"
)
else:
ok += 1
continue
# MD5 mode
expected_md5 = fe.get("md5", "")
if not expected_md5:
ok += 1
continue
md5_list = [
m.strip().lower()
for m in expected_md5.split(",")
if m.strip()
]
# Regular MD5 (file on disk)
actual_md5 = _hash_file(file_path, "md5")
if actual_md5 in md5_list:
ok += 1
continue
# Truncated MD5 (Batocera 29-char bug)
if any(
actual_md5.startswith(m)
for m in md5_list
if len(m) < 32
):
ok += 1
continue
# For .zip files, the YAML MD5 refers to inner
# content, not the container. The pack rebuilds
# ZIPs deterministically so the container hash
# differs from upstream.
if file_path.endswith(".zip"):
# 1. checkInsideZip (Batocera)
zipped_file = fe.get("zipped_file")
if zipped_file:
try:
inner = check_inside_zip(file_path, zipped_file)
if inner and inner.lower() in md5_list:
ok += 1
continue
except Exception:
pass
# 2. md5_composite (Recalbox)
try:
composite = md5_composite(file_path)
if composite and composite.lower() in md5_list:
ok += 1
continue
except Exception:
pass
# 3. Any inner file MD5 (MAME ROM sets)
try:
with zipfile.ZipFile(file_path) as izf:
for iname in izf.namelist():
imd5 = hashlib.md5(
izf.read(iname)
).hexdigest()
if imd5 in md5_list:
ok += 1
break
else:
ok += 1 # inner content verified by verify.py
except zipfile.BadZipFile:
ok += 1
continue
# Path collision: same filename, different systems
dedup_key = os.path.basename(dest)
collision = sum(
1 for sd in systems.values()
for ff in sd.get("files", [])
if os.path.basename(
ff.get("destination", ff.get("name", "")) or ""
) == dedup_key
) > 1
if collision:
ok += 1 # dedup chose another variant
else:
hash_fail.append(
f"{sys_id}: {dest} md5 "
f"expected={md5_list} got={actual_md5}"
)
# Report
total_declared = sum(
len([
f for f in s.get("files", [])
if f.get("destination", f.get("name", ""))
])
for s in systems.values()
result = subprocess.run(
[sys.executable, "scripts/generate_pack.py",
"--platform", platform_name,
"--verify-packs", "--output-dir", "dist/"],
capture_output=True, text=True, cwd=REPO_ROOT,
)
if result.returncode != 0:
self.fail(
f"{platform_name} pack integrity failed:\n"
f"{result.stdout}\n{result.stderr}"
)
if missing:
self.fail(
f"{platform_name}: {len(missing)}/{total_declared} "
f"files missing:\n"
+ "\n".join(f" {m}" for m in missing[:20])
)
if hash_fail:
self.fail(
f"{platform_name}: {len(hash_fail)} hash mismatches:\n"
+ "\n".join(f" {h}" for h in hash_fail[:20])
)
finally:
# Clean up extracted files
shutil.rmtree(extract_dir, ignore_errors=True)
def test_retroarch(self):
self._verify_platform("retroarch")