feat: full ground truth validation in verify.py

adler32 hash via zlib.adler32(), min_size/max_size range checks,
signature/crypto tracked as non-reproducible (console-specific keys).
compute_hashes now returns adler32. 69 tests pass including 3 new
tests for adler32, size ranges, and crypto tracking.
This commit is contained in:
Abdessamad Derraz
2026-03-24 11:11:38 +01:00
parent 470bb6ceb9
commit 8141a34faa
3 changed files with 118 additions and 12 deletions

View File

@@ -20,22 +20,25 @@ except ImportError:
def compute_hashes(filepath: str | Path) -> dict[str, str]: def compute_hashes(filepath: str | Path) -> dict[str, str]:
"""Compute SHA1, MD5, SHA256, CRC32 for a file.""" """Compute SHA1, MD5, SHA256, CRC32, Adler32 for a file."""
sha1 = hashlib.sha1() sha1 = hashlib.sha1()
md5 = hashlib.md5() md5 = hashlib.md5()
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
crc = 0 crc = 0
adler = 1 # zlib.adler32 initial value
with open(filepath, "rb") as f: with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""): for chunk in iter(lambda: f.read(65536), b""):
sha1.update(chunk) sha1.update(chunk)
md5.update(chunk) md5.update(chunk)
sha256.update(chunk) sha256.update(chunk)
crc = zlib.crc32(chunk, crc) crc = zlib.crc32(chunk, crc)
adler = zlib.adler32(chunk, adler)
return { return {
"sha1": sha1.hexdigest(), "sha1": sha1.hexdigest(),
"md5": md5.hexdigest(), "md5": md5.hexdigest(),
"sha256": sha256.hexdigest(), "sha256": sha256.hexdigest(),
"crc32": format(crc & 0xFFFFFFFF, "08x"), "crc32": format(crc & 0xFFFFFFFF, "08x"),
"adler32": format(adler & 0xFFFFFFFF, "08x"),
} }

View File

@@ -86,17 +86,28 @@ def _parse_validation(validation: list | dict | None) -> list[str]:
return [] return []
# Validation types that require console-specific cryptographic keys.
# verify.py cannot reproduce these — size checks still apply if combined.
_CRYPTO_CHECKS = frozenset({"signature", "crypto"})
# All reproducible validation types.
_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"})
def _build_validation_index(profiles: dict) -> dict[str, dict]: def _build_validation_index(profiles: dict) -> dict[str, dict]:
"""Build per-filename validation rules from emulator profiles. """Build per-filename validation rules from emulator profiles.
Returns {filename: {"checks": [str], "size": int|None, "crc32": str|None, Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None,
"md5": str|None, "sha1": str|None}}. "max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None,
"adler32": str|None, "crypto_only": [str]}}.
``crypto_only`` lists validation types we cannot reproduce (signature, crypto)
so callers can report them as non-verifiable rather than silently skipping.
When multiple emulators reference the same file, merges checks (union). When multiple emulators reference the same file, merges checks (union).
Raises ValueError if two profiles declare conflicting values for Raises ValueError if two profiles declare conflicting values.
the same filename (indicates a profile bug).
""" """
index: dict[str, dict] = {} index: dict[str, dict] = {}
# Track which emulator set each value, for conflict reporting
sources: dict[str, dict[str, str]] = {} sources: dict[str, dict[str, str]] = {}
for emu_name, profile in profiles.items(): for emu_name, profile in profiles.items():
if profile.get("type") in ("launcher", "alias"): if profile.get("type") in ("launcher", "alias"):
@@ -113,9 +124,15 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
"checks": set(), "size": None, "checks": set(), "size": None,
"min_size": None, "max_size": None, "min_size": None, "max_size": None,
"crc32": None, "md5": None, "sha1": None, "crc32": None, "md5": None, "sha1": None,
"adler32": None, "crypto_only": set(),
} }
sources[fname] = {} sources[fname] = {}
index[fname]["checks"].update(checks) index[fname]["checks"].update(checks)
# Track non-reproducible crypto checks
index[fname]["crypto_only"].update(
c for c in checks if c in _CRYPTO_CHECKS
)
# Size checks
if "size" in checks: if "size" in checks:
if f.get("size") is not None: if f.get("size") is not None:
new_size = f["size"] new_size = f["size"]
@@ -132,6 +149,7 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
index[fname]["min_size"] = f["min_size"] index[fname]["min_size"] = f["min_size"]
if f.get("max_size") is not None: if f.get("max_size") is not None:
index[fname]["max_size"] = f["max_size"] index[fname]["max_size"] = f["max_size"]
# Hash checks (crc32, md5, sha1, adler32)
if "crc32" in checks and f.get("crc32"): if "crc32" in checks and f.get("crc32"):
new_crc = f["crc32"].lower() new_crc = f["crc32"].lower()
if new_crc.startswith("0x"): if new_crc.startswith("0x"):
@@ -162,24 +180,46 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
) )
index[fname][hash_type] = f[hash_type] index[fname][hash_type] = f[hash_type]
sources[fname][hash_type] = emu_name sources[fname][hash_type] = emu_name
# Adler32 — stored as known_hash_adler32 field (not in validation: list
# for Dolphin, but support it in both forms for future profiles)
adler_val = f.get("known_hash_adler32") or f.get("adler32")
if adler_val:
norm = adler_val.lower()
if norm.startswith("0x"):
norm = norm[2:]
prev_adler = index[fname]["adler32"]
if prev_adler is not None and prev_adler != norm:
prev_emu = sources[fname].get("adler32", "?")
raise ValueError(
f"validation conflict for '{fname}': "
f"adler32={prev_adler} ({prev_emu}) vs adler32={norm} ({emu_name})"
)
index[fname]["adler32"] = norm
sources[fname]["adler32"] = emu_name
# Convert sets to sorted lists for determinism # Convert sets to sorted lists for determinism
for v in index.values(): for v in index.values():
v["checks"] = sorted(v["checks"]) v["checks"] = sorted(v["checks"])
v["crypto_only"] = sorted(v["crypto_only"])
return index return index
def check_file_validation( def check_file_validation(
local_path: str, filename: str, validation_index: dict[str, dict], local_path: str, filename: str, validation_index: dict[str, dict],
) -> str | None: ) -> str | None:
"""Check emulator-level validation (size, crc32, md5, sha1) on a resolved file. """Check emulator-level validation on a resolved file.
Returns None if all checks pass or no validation applies. Supports: size (exact/min/max), crc32, md5, sha1, adler32.
Reports but cannot reproduce: signature, crypto (console-specific keys).
Returns None if all reproducible checks pass or no validation applies.
Returns a reason string if a check fails. Returns a reason string if a check fails.
""" """
entry = validation_index.get(filename) entry = validation_index.get(filename)
if not entry: if not entry:
return None return None
checks = entry["checks"] checks = entry["checks"]
# Size checks
if "size" in checks: if "size" in checks:
actual_size = os.path.getsize(local_path) actual_size = os.path.getsize(local_path)
if entry["size"] is not None and actual_size != entry["size"]: if entry["size"] is not None and actual_size != entry["size"]:
@@ -188,9 +228,11 @@ def check_file_validation(
return f"size too small: min {entry['min_size']}, got {actual_size}" return f"size too small: min {entry['min_size']}, got {actual_size}"
if entry["max_size"] is not None and actual_size > entry["max_size"]: if entry["max_size"] is not None and actual_size > entry["max_size"]:
return f"size too large: max {entry['max_size']}, got {actual_size}" return f"size too large: max {entry['max_size']}, got {actual_size}"
# Hash checks — compute once, reuse
need_hashes = any( # Hash checks — compute once, reuse for all hash types
h in checks and entry.get(h) for h in ("crc32", "md5", "sha1") need_hashes = (
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1"))
or entry.get("adler32")
) )
if need_hashes: if need_hashes:
hashes = compute_hashes(local_path) hashes = compute_hashes(local_path)
@@ -206,6 +248,18 @@ def check_file_validation(
if "sha1" in checks and entry["sha1"]: if "sha1" in checks and entry["sha1"]:
if hashes["sha1"].lower() != entry["sha1"].lower(): if hashes["sha1"].lower() != entry["sha1"].lower():
return f"sha1 mismatch: expected {entry['sha1']}, got {hashes['sha1']}" return f"sha1 mismatch: expected {entry['sha1']}, got {hashes['sha1']}"
# Adler32 — check if known_hash_adler32 is available (even if not
# in the validation: list, Dolphin uses it as informational check)
if entry["adler32"]:
if hashes["adler32"].lower() != entry["adler32"]:
return (
f"adler32 mismatch: expected 0x{entry['adler32']}, "
f"got 0x{hashes['adler32']}"
)
# Note: signature/crypto checks require console-specific keys and
# cannot be reproduced. Size checks above still apply when combined
# (e.g. validation: [size, signature]).
return None return None

View File

@@ -370,8 +370,27 @@ class TestE2E(unittest.TestCase):
# MD5 validation — wrong md5 # MD5 validation — wrong md5
{"name": "alias_target.bin", "required": False, {"name": "alias_target.bin", "required": False,
"validation": ["md5"], "md5": "0000000000000000000000000000dead"}, "validation": ["md5"], "md5": "0000000000000000000000000000dead"},
# Adler32 — known_hash_adler32 field
{"name": "present_req.bin", "required": True,
"known_hash_adler32": None}, # placeholder, set below
# Min/max size range validation
{"name": "present_req.bin", "required": True,
"validation": ["size"], "min_size": 10, "max_size": 100},
# Signature — crypto check we can't reproduce, but size applies
{"name": "correct_hash.bin", "required": True,
"validation": ["size", "signature"], "size": 17},
], ],
} }
# Compute the actual adler32 of present_req.bin for the test fixture
import zlib as _zlib
with open(self.files["present_req.bin"]["path"], "rb") as _f:
_data = _f.read()
_adler = format(_zlib.adler32(_data) & 0xFFFFFFFF, "08x")
# Set the adler32 entry (the one with known_hash_adler32=None)
for entry in emu_val["files"]:
if entry.get("known_hash_adler32") is None and "known_hash_adler32" in entry:
entry["known_hash_adler32"] = f"0x{_adler}"
break
with open(os.path.join(self.emulators_dir, "test_validation.yml"), "w") as fh: with open(os.path.join(self.emulators_dir, "test_validation.yml"), "w") as fh:
yaml.dump(emu_val, fh) yaml.dump(emu_val, fh)
@@ -805,6 +824,36 @@ class TestE2E(unittest.TestCase):
self.assertIsNotNone(index["correct_hash.bin"]["md5"]) self.assertIsNotNone(index["correct_hash.bin"]["md5"])
self.assertIsNotNone(index["correct_hash.bin"]["sha1"]) self.assertIsNotNone(index["correct_hash.bin"]["sha1"])
def test_82_validation_adler32_pass(self):
"""File with correct adler32 passes validation."""
profiles = load_emulator_profiles(self.emulators_dir)
index = _build_validation_index(profiles)
path = self.files["present_req.bin"]["path"]
reason = check_file_validation(path, "present_req.bin", index)
self.assertIsNone(reason)
def test_83_validation_min_max_size_pass(self):
"""File within min/max size range passes validation."""
profiles = load_emulator_profiles(self.emulators_dir)
index = _build_validation_index(profiles)
path = self.files["present_req.bin"]["path"]
reason = check_file_validation(path, "present_req.bin", index)
self.assertIsNone(reason)
# Verify the index has min/max
self.assertEqual(index["present_req.bin"]["min_size"], 10)
self.assertEqual(index["present_req.bin"]["max_size"], 100)
def test_84_validation_crypto_tracked(self):
"""Signature/crypto checks are tracked as non-reproducible."""
profiles = load_emulator_profiles(self.emulators_dir)
index = _build_validation_index(profiles)
# correct_hash.bin has [size, signature]
self.assertIn("signature", index["correct_hash.bin"]["crypto_only"])
# Size check still applies despite signature being non-reproducible
path = self.files["correct_hash.bin"]["path"]
reason = check_file_validation(path, "correct_hash.bin", index)
self.assertIsNone(reason) # size=16 matches
def test_76_validation_no_effect_when_no_field(self): def test_76_validation_no_effect_when_no_field(self):
"""Files without validation field are unaffected.""" """Files without validation field are unaffected."""
profiles = load_emulator_profiles(self.emulators_dir) profiles = load_emulator_profiles(self.emulators_dir)
@@ -918,7 +967,7 @@ class TestE2E(unittest.TestCase):
"""Validation label reflects the checks used.""" """Validation label reflects the checks used."""
result = verify_emulator(["test_validation"], self.emulators_dir, self.db) result = verify_emulator(["test_validation"], self.emulators_dir, self.db)
# test_validation has crc32, md5, sha1, size → all listed # test_validation has crc32, md5, sha1, size → all listed
self.assertEqual(result["verification_mode"], "crc32+md5+sha1+size") self.assertEqual(result["verification_mode"], "crc32+md5+sha1+signature+size")
def test_99_filter_files_by_mode(self): def test_99_filter_files_by_mode(self):
"""_filter_files_by_mode correctly filters standalone/libretro.""" """_filter_files_by_mode correctly filters standalone/libretro."""