refactor: extract validation and truth modules from common.py

This commit is contained in:
Abdessamad Derraz
2026-03-29 16:41:24 +02:00
parent 95e16c9e7a
commit 3c7fc26354
9 changed files with 747 additions and 710 deletions

View File

@@ -801,251 +801,9 @@ def filter_systems_by_target(
return filtered
def _parse_validation(validation: list | dict | None) -> list[str]:
"""Extract the validation check list from a file's validation field.
Handles both simple list and divergent (core/upstream) dict forms.
For dicts, uses the ``core`` key since RetroArch users run the core.
"""
if validation is None:
return []
if isinstance(validation, list):
return validation
if isinstance(validation, dict):
return validation.get("core", [])
return []
# Validation types that require console-specific cryptographic keys.
# verify.py cannot reproduce these — size checks still apply if combined.
_CRYPTO_CHECKS = frozenset({"signature", "crypto"})
# All reproducible validation types.
_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"})
def _build_validation_index(profiles: dict) -> dict[str, dict]:
"""Build per-filename validation rules from emulator profiles.
Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None,
"max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None,
"adler32": str|None, "crypto_only": [str], "per_emulator": {emu: detail}}}.
``crypto_only`` lists validation types we cannot reproduce (signature, crypto)
so callers can report them as non-verifiable rather than silently skipping.
``per_emulator`` preserves each core's individual checks, source_ref, and
expected values before merging, for ground truth reporting.
When multiple emulators reference the same file, merges checks (union).
Raises ValueError if two profiles declare conflicting values.
"""
index: dict[str, dict] = {}
for emu_name, profile in profiles.items():
if profile.get("type") in ("launcher", "alias"):
continue
for f in profile.get("files", []):
fname = f.get("name", "")
if not fname:
continue
checks = _parse_validation(f.get("validation"))
if not checks:
continue
if fname not in index:
index[fname] = {
"checks": set(), "sizes": set(),
"min_size": None, "max_size": None,
"crc32": set(), "md5": set(), "sha1": set(), "sha256": set(),
"adler32": set(), "crypto_only": set(),
"emulators": set(), "per_emulator": {},
}
index[fname]["emulators"].add(emu_name)
index[fname]["checks"].update(checks)
# Track non-reproducible crypto checks
index[fname]["crypto_only"].update(
c for c in checks if c in _CRYPTO_CHECKS
)
# Size checks
if "size" in checks:
if f.get("size") is not None:
index[fname]["sizes"].add(f["size"])
if f.get("min_size") is not None:
cur = index[fname]["min_size"]
index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"]
if f.get("max_size") is not None:
cur = index[fname]["max_size"]
index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"]
# Hash checks — collect all accepted hashes as sets (multiple valid
# versions of the same file, e.g. MT-32 ROM versions)
if "crc32" in checks and f.get("crc32"):
crc_val = f["crc32"]
crc_list = crc_val if isinstance(crc_val, list) else [crc_val]
for cv in crc_list:
norm = str(cv).lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["crc32"].add(norm)
for hash_type in ("md5", "sha1", "sha256"):
if hash_type in checks and f.get(hash_type):
val = f[hash_type]
if isinstance(val, list):
for h in val:
index[fname][hash_type].add(str(h).lower())
else:
index[fname][hash_type].add(str(val).lower())
# Adler32 — stored as known_hash_adler32 field (not in validation: list
# for Dolphin, but support it in both forms for future profiles)
adler_val = f.get("known_hash_adler32") or f.get("adler32")
if adler_val:
norm = adler_val.lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["adler32"].add(norm)
# Per-emulator ground truth detail
expected: dict = {}
if "size" in checks:
for key in ("size", "min_size", "max_size"):
if f.get(key) is not None:
expected[key] = f[key]
for hash_type in ("crc32", "md5", "sha1", "sha256"):
if hash_type in checks and f.get(hash_type):
expected[hash_type] = f[hash_type]
adler_val_pe = f.get("known_hash_adler32") or f.get("adler32")
if adler_val_pe:
expected["adler32"] = adler_val_pe
pe_entry = {
"checks": sorted(checks),
"source_ref": f.get("source_ref"),
"expected": expected,
}
pe = index[fname]["per_emulator"]
if emu_name in pe:
# Merge checks from multiple file entries for same emulator
existing = pe[emu_name]
merged_checks = sorted(set(existing["checks"]) | set(pe_entry["checks"]))
existing["checks"] = merged_checks
existing["expected"].update(pe_entry["expected"])
if pe_entry["source_ref"] and not existing["source_ref"]:
existing["source_ref"] = pe_entry["source_ref"]
else:
pe[emu_name] = pe_entry
# Convert sets to sorted tuples/lists for determinism
for v in index.values():
v["checks"] = sorted(v["checks"])
v["crypto_only"] = sorted(v["crypto_only"])
v["emulators"] = sorted(v["emulators"])
# Keep hash sets as frozensets for O(1) lookup in check_file_validation
return index
def build_ground_truth(filename: str, validation_index: dict[str, dict]) -> list[dict]:
"""Format per-emulator ground truth for a file from the validation index.
Returns a sorted list of {emulator, checks, source_ref, expected} dicts.
Returns [] if the file has no emulator validation data.
"""
entry = validation_index.get(filename)
if not entry or not entry.get("per_emulator"):
return []
result = []
for emu_name in sorted(entry["per_emulator"]):
detail = entry["per_emulator"][emu_name]
result.append({
"emulator": emu_name,
"checks": detail["checks"],
"source_ref": detail.get("source_ref"),
"expected": detail.get("expected", {}),
})
return result
def check_file_validation(
local_path: str, filename: str, validation_index: dict[str, dict],
bios_dir: str = "bios",
) -> str | None:
"""Check emulator-level validation on a resolved file.
Supports: size (exact/min/max), crc32, md5, sha1, adler32,
signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256).
Returns None if all checks pass or no validation applies.
Returns a reason string if a check fails.
"""
entry = validation_index.get(filename)
if not entry:
return None
checks = entry["checks"]
# Size checks — sizes is a set of accepted values
if "size" in checks:
actual_size = os.path.getsize(local_path)
if entry["sizes"] and actual_size not in entry["sizes"]:
expected = ",".join(str(s) for s in sorted(entry["sizes"]))
return f"size mismatch: got {actual_size}, accepted [{expected}]"
if entry["min_size"] is not None and actual_size < entry["min_size"]:
return f"size too small: min {entry['min_size']}, got {actual_size}"
if entry["max_size"] is not None and actual_size > entry["max_size"]:
return f"size too large: max {entry['max_size']}, got {actual_size}"
# Hash checks — compute once, reuse for all hash types.
# Each hash field is a set of accepted values (multiple valid ROM versions).
need_hashes = (
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256"))
or entry.get("adler32")
)
if need_hashes:
hashes = compute_hashes(local_path)
if "crc32" in checks and entry["crc32"]:
if hashes["crc32"].lower() not in entry["crc32"]:
expected = ",".join(sorted(entry["crc32"]))
return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]"
if "md5" in checks and entry["md5"]:
if hashes["md5"].lower() not in entry["md5"]:
expected = ",".join(sorted(entry["md5"]))
return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]"
if "sha1" in checks and entry["sha1"]:
if hashes["sha1"].lower() not in entry["sha1"]:
expected = ",".join(sorted(entry["sha1"]))
return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]"
if "sha256" in checks and entry["sha256"]:
if hashes["sha256"].lower() not in entry["sha256"]:
expected = ",".join(sorted(entry["sha256"]))
return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]"
if entry["adler32"]:
if hashes["adler32"].lower() not in entry["adler32"]:
expected = ",".join(sorted(entry["adler32"]))
return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]"
# Signature/crypto checks (3DS RSA, AES)
if entry["crypto_only"]:
from crypto_verify import check_crypto_validation
crypto_reason = check_crypto_validation(local_path, filename, bios_dir)
if crypto_reason:
return crypto_reason
return None
def validate_cli_modes(args, mode_attrs: list[str]) -> None:
"""Validate mutual exclusion of CLI mode arguments."""
modes = sum(1 for attr in mode_attrs if getattr(args, attr, None))
if modes == 0:
raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}")
if modes > 1:
raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}")
def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]:
"""Filter file entries by libretro/standalone mode."""
result = []
for f in files:
fmode = f.get("mode", "")
if standalone and fmode == "libretro":
continue
if not standalone and fmode == "standalone":
continue
result.append(f)
return result
# Validation and mode filtering — extracted to validation.py for SoC.
# Re-exported below for backward compatibility.
LARGE_FILES_RELEASE = "large-files"
@@ -1151,445 +909,13 @@ def list_platform_system_ids(platform_name: str, platforms_dir: str) -> None:
print(f" {sys_id:35s} ({file_count} file{'s' if file_count != 1 else ''}){mfr_display}")
# ---------------------------------------------------------------
# Truth generation — build ground-truth YAML from emulator profiles
# ---------------------------------------------------------------
def _determine_core_mode(
emu_name: str, profile: dict,
cores_config: str | list | None,
standalone_set: set[str] | None,
) -> str:
"""Determine effective mode (libretro/standalone) for a resolved core."""
if cores_config == "all_libretro":
return "libretro"
if standalone_set is not None:
profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])}
if profile_names & standalone_set:
return "standalone"
return "libretro"
ptype = profile.get("type", "libretro")
if "standalone" in ptype and "libretro" in ptype:
return "both"
if "standalone" in ptype:
return "standalone"
return "libretro"
def _enrich_hashes(entry: dict, db: dict) -> None:
"""Fill missing hash fields from the database."""
sha1 = entry.get("sha1", "")
md5 = entry.get("md5", "")
record = None
if sha1 and db.get("files"):
record = db["files"].get(sha1)
if record is None and md5:
by_md5 = db.get("by_md5", {})
md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else ""
ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None
if ref_sha1 and db.get("files"):
record = db["files"].get(ref_sha1)
if record is None:
return
for field in ("sha1", "md5", "sha256", "crc32"):
if not entry.get(field) and record.get(field):
entry[field] = record[field]
def _merge_file_into_system(
system: dict, file_entry: dict, emu_name: str, db: dict | None,
) -> None:
"""Merge a file entry into a system's file list, deduplicating by name."""
files = system.setdefault("files", [])
name_lower = file_entry["name"].lower()
existing = None
for f in files:
if f["name"].lower() == name_lower:
existing = f
break
if existing is not None:
existing["_cores"] = existing.get("_cores", set()) | {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key}
else:
existing.setdefault("_source_refs", set())
if file_entry.get("required") and not existing.get("required"):
existing["required"] = True
for h in ("sha1", "md5", "sha256", "crc32"):
theirs = file_entry.get(h, "")
ours = existing.get(h, "")
if theirs and ours and theirs.lower() != ours.lower():
import sys as _sys
print(
f"WARNING: hash conflict for {file_entry['name']} "
f"({h}: {ours} vs {theirs}, core {emu_name})",
file=_sys.stderr,
)
elif theirs and not ours:
existing[h] = theirs
return
entry: dict = {"name": file_entry["name"]}
if file_entry.get("required") is not None:
entry["required"] = file_entry["required"]
for field in ("sha1", "md5", "sha256", "crc32", "size", "path",
"description", "hle_fallback", "category", "note",
"validation", "min_size", "max_size", "aliases"):
val = file_entry.get(field)
if val is not None:
entry[field] = val
entry["_cores"] = {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
entry["_source_refs"] = {sr_key}
else:
entry["_source_refs"] = set()
if db:
_enrich_hashes(entry, db)
files.append(entry)
def generate_platform_truth(
platform_name: str,
config: dict,
registry_entry: dict,
profiles: dict[str, dict],
db: dict | None = None,
target_cores: set[str] | None = None,
) -> dict:
"""Generate ground-truth system data for a platform from emulator profiles.
Args:
platform_name: platform identifier
config: loaded platform config (via load_platform_config), has cores,
systems, standalone_cores with inheritance resolved
registry_entry: registry metadata for hash_type, verification_mode, etc.
profiles: all loaded emulator profiles
db: optional database for hash enrichment
target_cores: optional hardware target core filter
Returns a dict with platform metadata, systems, and per-file details
including which cores reference each file.
"""
cores_config = config.get("cores")
# Resolve standalone set for mode determination
standalone_set: set[str] | None = None
standalone_cores = config.get("standalone_cores")
if isinstance(standalone_cores, list):
standalone_set = {str(c) for c in standalone_cores}
resolved = resolve_platform_cores(config, profiles, target_cores)
# Build mapping: profile system ID -> platform system ID
# Three strategies, tried in order:
# 1. File-based: if the scraped platform already has this file, use its system
# 2. Exact match: profile system ID == platform system ID
# 3. Normalized match: strip manufacturer prefix + separators
platform_sys_ids = set(config.get("systems", {}).keys())
# File→platform_system reverse index from scraped config
file_to_plat_sys: dict[str, str] = {}
for psid, sys_data in config.get("systems", {}).items():
for fe in sys_data.get("files", []):
fname = fe.get("name", "").lower()
if fname:
file_to_plat_sys[fname] = psid
for alias in fe.get("aliases", []):
file_to_plat_sys[alias.lower()] = psid
# Normalized ID → platform system ID
norm_to_platform: dict[str, str] = {}
for psid in platform_sys_ids:
norm_to_platform[_norm_system_id(psid)] = psid
def _map_sys_id(profile_sid: str, file_name: str = "") -> str:
"""Map a profile system ID to the platform's system ID."""
# 1. File-based lookup (handles composites and name mismatches)
if file_name:
plat_sys = file_to_plat_sys.get(file_name.lower())
if plat_sys:
return plat_sys
# 2. Exact match
if profile_sid in platform_sys_ids:
return profile_sid
# 3. Normalized match
normed = _norm_system_id(profile_sid)
return norm_to_platform.get(normed, profile_sid)
systems: dict[str, dict] = {}
cores_profiled: set[str] = set()
cores_unprofiled: set[str] = set()
# Track which cores contribute to each system
system_cores: dict[str, dict[str, set[str]]] = {}
for emu_name in sorted(resolved):
profile = profiles.get(emu_name)
if not profile:
cores_unprofiled.add(emu_name)
continue
cores_profiled.add(emu_name)
mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set)
raw_files = profile.get("files", [])
if mode == "both":
filtered = raw_files
else:
filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone"))
for fe in filtered:
profile_sid = fe.get("system", "")
if not profile_sid:
sys_ids = profile.get("systems", [])
profile_sid = sys_ids[0] if sys_ids else "unknown"
sys_id = _map_sys_id(profile_sid, fe.get("name", ""))
system = systems.setdefault(sys_id, {})
_merge_file_into_system(system, fe, emu_name, db)
# Track core contribution per system
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Ensure all systems of resolved cores have entries (even with 0 files).
# This documents that the system is covered — the core was analyzed and
# needs no external files for this system.
for emu_name in cores_profiled:
profile = profiles[emu_name]
for prof_sid in profile.get("systems", []):
sys_id = _map_sys_id(prof_sid)
systems.setdefault(sys_id, {})
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Track unprofiled cores per system based on profile system lists
for emu_name in cores_unprofiled:
for sys_id in systems:
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["unprofiled"].add(emu_name)
# Convert sets to sorted lists for serialization
for sys_id, sys_data in systems.items():
for fe in sys_data.get("files", []):
fe["_cores"] = sorted(fe.get("_cores", set()))
fe["_source_refs"] = sorted(fe.get("_source_refs", set()))
# Add per-system coverage
cov = system_cores.get(sys_id, {})
sys_data["_coverage"] = {
"cores_profiled": sorted(cov.get("profiled", set())),
"cores_unprofiled": sorted(cov.get("unprofiled", set())),
}
return {
"platform": platform_name,
"generated": True,
"systems": systems,
"_coverage": {
"cores_resolved": len(resolved),
"cores_profiled": len(cores_profiled),
"cores_unprofiled": sorted(cores_unprofiled),
},
}
# -------------------------------------------------------------------
# Platform truth diffing
# -------------------------------------------------------------------
def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict:
"""Compare files between truth and scraped for a single system."""
# Build truth index: name.lower() -> entry, alias.lower() -> entry
truth_index: dict[str, dict] = {}
for fe in truth_sys.get("files", []):
truth_index[fe["name"].lower()] = fe
for alias in fe.get("aliases", []):
truth_index[alias.lower()] = fe
# Build scraped index: name.lower() -> entry
scraped_index: dict[str, dict] = {}
for fe in scraped_sys.get("files", []):
scraped_index[fe["name"].lower()] = fe
missing: list[dict] = []
hash_mismatch: list[dict] = []
required_mismatch: list[dict] = []
extra_phantom: list[dict] = []
extra_unprofiled: list[dict] = []
matched_truth_names: set[str] = set()
# Compare scraped files against truth
for s_key, s_entry in scraped_index.items():
t_entry = truth_index.get(s_key)
if t_entry is None:
continue
matched_truth_names.add(t_entry["name"].lower())
# Hash comparison
for h in ("sha1", "md5", "crc32"):
t_hash = t_entry.get(h, "")
s_hash = s_entry.get(h, "")
if not t_hash or not s_hash:
continue
# Normalize to list for multi-hash support
t_list = t_hash if isinstance(t_hash, list) else [t_hash]
s_list = s_hash if isinstance(s_hash, list) else [s_hash]
t_set = {v.lower() for v in t_list}
s_set = {v.lower() for v in s_list}
if not t_set & s_set:
hash_mismatch.append({
"name": s_entry["name"],
"hash_type": h,
f"truth_{h}": t_hash,
f"scraped_{h}": s_hash,
"truth_cores": list(t_entry.get("_cores", [])),
})
break
# Required mismatch
t_req = t_entry.get("required")
s_req = s_entry.get("required")
if t_req is not None and s_req is not None and t_req != s_req:
required_mismatch.append({
"name": s_entry["name"],
"truth_required": t_req,
"scraped_required": s_req,
})
# Truth files not matched -> missing
for fe in truth_sys.get("files", []):
if fe["name"].lower() not in matched_truth_names:
missing.append({
"name": fe["name"],
"cores": list(fe.get("_cores", [])),
"source_refs": list(fe.get("_source_refs", [])),
})
# Scraped files not in truth -> extra
coverage = truth_sys.get("_coverage", {})
has_unprofiled = bool(coverage.get("cores_unprofiled"))
for s_key, s_entry in scraped_index.items():
if s_key not in truth_index:
entry = {"name": s_entry["name"]}
if has_unprofiled:
extra_unprofiled.append(entry)
else:
extra_phantom.append(entry)
result: dict = {}
if missing:
result["missing"] = missing
if hash_mismatch:
result["hash_mismatch"] = hash_mismatch
if required_mismatch:
result["required_mismatch"] = required_mismatch
if extra_phantom:
result["extra_phantom"] = extra_phantom
if extra_unprofiled:
result["extra_unprofiled"] = extra_unprofiled
return result
def _has_divergences(sys_div: dict) -> bool:
"""Check if a system divergence dict contains any actual divergences."""
return bool(sys_div)
def _update_summary(summary: dict, sys_div: dict) -> None:
"""Update summary counters from a system divergence dict."""
summary["total_missing"] += len(sys_div.get("missing", []))
summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", []))
summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", []))
summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", []))
summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", []))
def diff_platform_truth(truth: dict, scraped: dict) -> dict:
"""Compare truth YAML against scraped YAML, returning divergences.
System IDs are matched using normalized forms (via _norm_system_id) to
handle naming differences between emulator profiles and scraped platforms
(e.g. 'sega-game-gear' vs 'sega-gamegear').
"""
truth_systems = truth.get("systems", {})
scraped_systems = scraped.get("systems", {})
summary = {
"systems_compared": 0,
"systems_fully_covered": 0,
"systems_partially_covered": 0,
"systems_uncovered": 0,
"total_missing": 0,
"total_extra_phantom": 0,
"total_extra_unprofiled": 0,
"total_hash_mismatch": 0,
"total_required_mismatch": 0,
}
divergences: dict[str, dict] = {}
uncovered_systems: list[str] = []
# Build normalized-ID lookup for truth systems
norm_to_truth: dict[str, str] = {}
for sid in truth_systems:
norm_to_truth[_norm_system_id(sid)] = sid
# Match scraped systems to truth via normalized IDs
matched_truth: set[str] = set()
for s_sid in sorted(scraped_systems):
norm = _norm_system_id(s_sid)
t_sid = norm_to_truth.get(norm)
if t_sid is None:
# Also try exact match (in case normalization is lossy)
if s_sid in truth_systems:
t_sid = s_sid
else:
uncovered_systems.append(s_sid)
summary["systems_uncovered"] += 1
continue
matched_truth.add(t_sid)
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid])
if _has_divergences(sys_div):
divergences[s_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
# Truth systems not matched by any scraped system — all files missing
for t_sid in sorted(truth_systems):
if t_sid in matched_truth:
continue
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], {"files": []})
if _has_divergences(sys_div):
divergences[t_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
result: dict = {"summary": summary}
if divergences:
result["divergences"] = divergences
if uncovered_systems:
result["uncovered_systems"] = uncovered_systems
return result
# Re-exports: validation and truth modules extracted for SoC.
# Existing consumers import from common — these preserve that contract.
from validation import ( # noqa: F401, E402
_build_validation_index, _parse_validation, build_ground_truth,
check_file_validation, filter_files_by_mode, validate_cli_modes,
)
from truth import ( # noqa: F401, E402
diff_platform_truth, generate_platform_truth,
)

View File

@@ -16,7 +16,8 @@ import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from common import diff_platform_truth, list_registered_platforms, load_platform_config
from common import list_registered_platforms, load_platform_config
from truth import diff_platform_truth
try:
import yaml

View File

@@ -27,14 +27,16 @@ from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import (
MANUFACTURER_PREFIXES,
_build_validation_index, build_zip_contents_index, check_file_validation,
check_inside_zip, compute_hashes, fetch_large_file, filter_files_by_mode,
group_identical_platforms, list_emulator_profiles, list_platform_system_ids,
list_registered_platforms,
build_zip_contents_index, check_inside_zip, compute_hashes,
fetch_large_file, group_identical_platforms, list_emulator_profiles,
list_platform_system_ids, list_registered_platforms,
filter_systems_by_target, list_system_ids, load_database,
load_data_dir_registry, load_emulator_profiles, load_platform_config,
md5_composite, resolve_local_file,
)
from validation import (
_build_validation_index, check_file_validation, filter_files_by_mode,
)
from deterministic_zip import rebuild_zip_deterministic
try:

View File

@@ -14,13 +14,13 @@ import sys
sys.path.insert(0, os.path.dirname(__file__))
from common import (
generate_platform_truth,
list_registered_platforms,
load_database,
load_emulator_profiles,
load_platform_config,
load_target_config,
)
from truth import generate_platform_truth
try:
import yaml

451
scripts/truth.py Normal file
View File

@@ -0,0 +1,451 @@
"""Platform truth generation and diffing.
Generates ground-truth YAML from emulator profiles for gap analysis,
and diffs truth against scraped platform data to find divergences.
"""
from __future__ import annotations
import sys
from common import _norm_system_id, resolve_platform_cores
from validation import filter_files_by_mode
def _determine_core_mode(
emu_name: str, profile: dict,
cores_config: str | list | None,
standalone_set: set[str] | None,
) -> str:
"""Determine effective mode (libretro/standalone) for a resolved core."""
if cores_config == "all_libretro":
return "libretro"
if standalone_set is not None:
profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])}
if profile_names & standalone_set:
return "standalone"
return "libretro"
ptype = profile.get("type", "libretro")
if "standalone" in ptype and "libretro" in ptype:
return "both"
if "standalone" in ptype:
return "standalone"
return "libretro"
def _enrich_hashes(entry: dict, db: dict) -> None:
"""Fill missing hash fields from the database."""
sha1 = entry.get("sha1", "")
md5 = entry.get("md5", "")
record = None
if sha1 and db.get("files"):
record = db["files"].get(sha1)
if record is None and md5:
by_md5 = db.get("by_md5", {})
md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else ""
ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None
if ref_sha1 and db.get("files"):
record = db["files"].get(ref_sha1)
if record is None:
return
for field in ("sha1", "md5", "sha256", "crc32"):
if not entry.get(field) and record.get(field):
entry[field] = record[field]
def _merge_file_into_system(
system: dict, file_entry: dict, emu_name: str, db: dict | None,
) -> None:
"""Merge a file entry into a system's file list, deduplicating by name."""
files = system.setdefault("files", [])
name_lower = file_entry["name"].lower()
existing = None
for f in files:
if f["name"].lower() == name_lower:
existing = f
break
if existing is not None:
existing["_cores"] = existing.get("_cores", set()) | {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key}
else:
existing.setdefault("_source_refs", set())
if file_entry.get("required") and not existing.get("required"):
existing["required"] = True
for h in ("sha1", "md5", "sha256", "crc32"):
theirs = file_entry.get(h, "")
ours = existing.get(h, "")
if theirs and ours and theirs.lower() != ours.lower():
print(
f"WARNING: hash conflict for {file_entry['name']} "
f"({h}: {ours} vs {theirs}, core {emu_name})",
file=sys.stderr,
)
elif theirs and not ours:
existing[h] = theirs
return
entry: dict = {"name": file_entry["name"]}
if file_entry.get("required") is not None:
entry["required"] = file_entry["required"]
for field in ("sha1", "md5", "sha256", "crc32", "size", "path",
"description", "hle_fallback", "category", "note",
"validation", "min_size", "max_size", "aliases"):
val = file_entry.get(field)
if val is not None:
entry[field] = val
entry["_cores"] = {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
entry["_source_refs"] = {sr_key}
else:
entry["_source_refs"] = set()
if db:
_enrich_hashes(entry, db)
files.append(entry)
def generate_platform_truth(
platform_name: str,
config: dict,
registry_entry: dict,
profiles: dict[str, dict],
db: dict | None = None,
target_cores: set[str] | None = None,
) -> dict:
"""Generate ground-truth system data for a platform from emulator profiles.
Args:
platform_name: platform identifier
config: loaded platform config (via load_platform_config), has cores,
systems, standalone_cores with inheritance resolved
registry_entry: registry metadata for hash_type, verification_mode, etc.
profiles: all loaded emulator profiles
db: optional database for hash enrichment
target_cores: optional hardware target core filter
Returns a dict with platform metadata, systems, and per-file details
including which cores reference each file.
"""
cores_config = config.get("cores")
# Resolve standalone set for mode determination
standalone_set: set[str] | None = None
standalone_cores = config.get("standalone_cores")
if isinstance(standalone_cores, list):
standalone_set = {str(c) for c in standalone_cores}
resolved = resolve_platform_cores(config, profiles, target_cores)
# Build mapping: profile system ID -> platform system ID
# Three strategies, tried in order:
# 1. File-based: if the scraped platform already has this file, use its system
# 2. Exact match: profile system ID == platform system ID
# 3. Normalized match: strip manufacturer prefix + separators
platform_sys_ids = set(config.get("systems", {}).keys())
# File->platform_system reverse index from scraped config
file_to_plat_sys: dict[str, str] = {}
for psid, sys_data in config.get("systems", {}).items():
for fe in sys_data.get("files", []):
fname = fe.get("name", "").lower()
if fname:
file_to_plat_sys[fname] = psid
for alias in fe.get("aliases", []):
file_to_plat_sys[alias.lower()] = psid
# Normalized ID -> platform system ID
norm_to_platform: dict[str, str] = {}
for psid in platform_sys_ids:
norm_to_platform[_norm_system_id(psid)] = psid
def _map_sys_id(profile_sid: str, file_name: str = "") -> str:
"""Map a profile system ID to the platform's system ID."""
# 1. File-based lookup (handles composites and name mismatches)
if file_name:
plat_sys = file_to_plat_sys.get(file_name.lower())
if plat_sys:
return plat_sys
# 2. Exact match
if profile_sid in platform_sys_ids:
return profile_sid
# 3. Normalized match
normed = _norm_system_id(profile_sid)
return norm_to_platform.get(normed, profile_sid)
systems: dict[str, dict] = {}
cores_profiled: set[str] = set()
cores_unprofiled: set[str] = set()
# Track which cores contribute to each system
system_cores: dict[str, dict[str, set[str]]] = {}
for emu_name in sorted(resolved):
profile = profiles.get(emu_name)
if not profile:
cores_unprofiled.add(emu_name)
continue
cores_profiled.add(emu_name)
mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set)
raw_files = profile.get("files", [])
if mode == "both":
filtered = raw_files
else:
filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone"))
for fe in filtered:
profile_sid = fe.get("system", "")
if not profile_sid:
sys_ids = profile.get("systems", [])
profile_sid = sys_ids[0] if sys_ids else "unknown"
sys_id = _map_sys_id(profile_sid, fe.get("name", ""))
system = systems.setdefault(sys_id, {})
_merge_file_into_system(system, fe, emu_name, db)
# Track core contribution per system
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Ensure all systems of resolved cores have entries (even with 0 files).
# This documents that the system is covered — the core was analyzed and
# needs no external files for this system.
for emu_name in cores_profiled:
profile = profiles[emu_name]
for prof_sid in profile.get("systems", []):
sys_id = _map_sys_id(prof_sid)
systems.setdefault(sys_id, {})
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Track unprofiled cores per system based on profile system lists
for emu_name in cores_unprofiled:
for sys_id in systems:
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["unprofiled"].add(emu_name)
# Convert sets to sorted lists for serialization
for sys_id, sys_data in systems.items():
for fe in sys_data.get("files", []):
fe["_cores"] = sorted(fe.get("_cores", set()))
fe["_source_refs"] = sorted(fe.get("_source_refs", set()))
# Add per-system coverage
cov = system_cores.get(sys_id, {})
sys_data["_coverage"] = {
"cores_profiled": sorted(cov.get("profiled", set())),
"cores_unprofiled": sorted(cov.get("unprofiled", set())),
}
return {
"platform": platform_name,
"generated": True,
"systems": systems,
"_coverage": {
"cores_resolved": len(resolved),
"cores_profiled": len(cores_profiled),
"cores_unprofiled": sorted(cores_unprofiled),
},
}
# -------------------------------------------------------------------
# Platform truth diffing
# -------------------------------------------------------------------
def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict:
"""Compare files between truth and scraped for a single system."""
# Build truth index: name.lower() -> entry, alias.lower() -> entry
truth_index: dict[str, dict] = {}
for fe in truth_sys.get("files", []):
truth_index[fe["name"].lower()] = fe
for alias in fe.get("aliases", []):
truth_index[alias.lower()] = fe
# Build scraped index: name.lower() -> entry
scraped_index: dict[str, dict] = {}
for fe in scraped_sys.get("files", []):
scraped_index[fe["name"].lower()] = fe
missing: list[dict] = []
hash_mismatch: list[dict] = []
required_mismatch: list[dict] = []
extra_phantom: list[dict] = []
extra_unprofiled: list[dict] = []
matched_truth_names: set[str] = set()
# Compare scraped files against truth
for s_key, s_entry in scraped_index.items():
t_entry = truth_index.get(s_key)
if t_entry is None:
continue
matched_truth_names.add(t_entry["name"].lower())
# Hash comparison
for h in ("sha1", "md5", "crc32"):
t_hash = t_entry.get(h, "")
s_hash = s_entry.get(h, "")
if not t_hash or not s_hash:
continue
# Normalize to list for multi-hash support
t_list = t_hash if isinstance(t_hash, list) else [t_hash]
s_list = s_hash if isinstance(s_hash, list) else [s_hash]
t_set = {v.lower() for v in t_list}
s_set = {v.lower() for v in s_list}
if not t_set & s_set:
hash_mismatch.append({
"name": s_entry["name"],
"hash_type": h,
f"truth_{h}": t_hash,
f"scraped_{h}": s_hash,
"truth_cores": list(t_entry.get("_cores", [])),
})
break
# Required mismatch
t_req = t_entry.get("required")
s_req = s_entry.get("required")
if t_req is not None and s_req is not None and t_req != s_req:
required_mismatch.append({
"name": s_entry["name"],
"truth_required": t_req,
"scraped_required": s_req,
})
# Truth files not matched -> missing
for fe in truth_sys.get("files", []):
if fe["name"].lower() not in matched_truth_names:
missing.append({
"name": fe["name"],
"cores": list(fe.get("_cores", [])),
"source_refs": list(fe.get("_source_refs", [])),
})
# Scraped files not in truth -> extra
coverage = truth_sys.get("_coverage", {})
has_unprofiled = bool(coverage.get("cores_unprofiled"))
for s_key, s_entry in scraped_index.items():
if s_key not in truth_index:
entry = {"name": s_entry["name"]}
if has_unprofiled:
extra_unprofiled.append(entry)
else:
extra_phantom.append(entry)
result: dict = {}
if missing:
result["missing"] = missing
if hash_mismatch:
result["hash_mismatch"] = hash_mismatch
if required_mismatch:
result["required_mismatch"] = required_mismatch
if extra_phantom:
result["extra_phantom"] = extra_phantom
if extra_unprofiled:
result["extra_unprofiled"] = extra_unprofiled
return result
def _has_divergences(sys_div: dict) -> bool:
"""Check if a system divergence dict contains any actual divergences."""
return bool(sys_div)
def _update_summary(summary: dict, sys_div: dict) -> None:
"""Update summary counters from a system divergence dict."""
summary["total_missing"] += len(sys_div.get("missing", []))
summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", []))
summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", []))
summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", []))
summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", []))
def diff_platform_truth(truth: dict, scraped: dict) -> dict:
"""Compare truth YAML against scraped YAML, returning divergences.
System IDs are matched using normalized forms (via _norm_system_id) to
handle naming differences between emulator profiles and scraped platforms
(e.g. 'sega-game-gear' vs 'sega-gamegear').
"""
truth_systems = truth.get("systems", {})
scraped_systems = scraped.get("systems", {})
summary = {
"systems_compared": 0,
"systems_fully_covered": 0,
"systems_partially_covered": 0,
"systems_uncovered": 0,
"total_missing": 0,
"total_extra_phantom": 0,
"total_extra_unprofiled": 0,
"total_hash_mismatch": 0,
"total_required_mismatch": 0,
}
divergences: dict[str, dict] = {}
uncovered_systems: list[str] = []
# Build normalized-ID lookup for truth systems
norm_to_truth: dict[str, str] = {}
for sid in truth_systems:
norm_to_truth[_norm_system_id(sid)] = sid
# Match scraped systems to truth via normalized IDs
matched_truth: set[str] = set()
for s_sid in sorted(scraped_systems):
norm = _norm_system_id(s_sid)
t_sid = norm_to_truth.get(norm)
if t_sid is None:
# Also try exact match (in case normalization is lossy)
if s_sid in truth_systems:
t_sid = s_sid
else:
uncovered_systems.append(s_sid)
summary["systems_uncovered"] += 1
continue
matched_truth.add(t_sid)
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid])
if _has_divergences(sys_div):
divergences[s_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
# Truth systems not matched by any scraped system — all files missing
for t_sid in sorted(truth_systems):
if t_sid in matched_truth:
continue
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], {"files": []})
if _has_divergences(sys_div):
divergences[t_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
result: dict = {"summary": summary}
if divergences:
result["divergences"] = divergences
if uncovered_systems:
result["uncovered_systems"] = uncovered_systems
return result

View File

@@ -25,7 +25,7 @@ import sys
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import compute_hashes, list_registered_platforms, load_database as _load_database
from common import compute_hashes, list_registered_platforms, load_database
try:
import yaml
@@ -90,16 +90,6 @@ class ValidationResult:
return "\n".join(lines)
def load_database(db_path: str) -> dict | None:
try:
return _load_database(db_path)
except FileNotFoundError:
return None
except json.JSONDecodeError as e:
print(f"WARNING: corrupt database.json: {e}", file=sys.stderr)
return None
def load_platform_hashes(platforms_dir: str) -> dict:
"""Load all known hashes from platform configs."""
known = {"sha1": set(), "md5": set(), "names": set()}
@@ -241,7 +231,13 @@ def main():
if not files:
parser.error("No files specified. Use --changed or provide file paths.")
db = load_database(args.db)
try:
db = load_database(args.db)
except FileNotFoundError:
db = None
except json.JSONDecodeError as e:
print(f"WARNING: corrupt database.json: {e}", file=sys.stderr)
db = None
platform_hashes = load_platform_hashes(args.platforms_dir)
results = []

258
scripts/validation.py Normal file
View File

@@ -0,0 +1,258 @@
"""Emulator-level file validation logic.
Builds validation indexes from emulator profiles, checks files against
emulator-declared constraints (size, hash, crypto), and formats ground
truth data for reporting.
"""
from __future__ import annotations
import os
from common import compute_hashes
# Validation types that require console-specific cryptographic keys.
# verify.py cannot reproduce these — size checks still apply if combined.
_CRYPTO_CHECKS = frozenset({"signature", "crypto"})
# All reproducible validation types.
_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"})
def _parse_validation(validation: list | dict | None) -> list[str]:
"""Extract the validation check list from a file's validation field.
Handles both simple list and divergent (core/upstream) dict forms.
For dicts, uses the ``core`` key since RetroArch users run the core.
"""
if validation is None:
return []
if isinstance(validation, list):
return validation
if isinstance(validation, dict):
return validation.get("core", [])
return []
def _build_validation_index(profiles: dict) -> dict[str, dict]:
"""Build per-filename validation rules from emulator profiles.
Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None,
"max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None,
"adler32": str|None, "crypto_only": [str], "per_emulator": {emu: detail}}}.
``crypto_only`` lists validation types we cannot reproduce (signature, crypto)
so callers can report them as non-verifiable rather than silently skipping.
``per_emulator`` preserves each core's individual checks, source_ref, and
expected values before merging, for ground truth reporting.
When multiple emulators reference the same file, merges checks (union).
Raises ValueError if two profiles declare conflicting values.
"""
index: dict[str, dict] = {}
for emu_name, profile in profiles.items():
if profile.get("type") in ("launcher", "alias"):
continue
for f in profile.get("files", []):
fname = f.get("name", "")
if not fname:
continue
checks = _parse_validation(f.get("validation"))
if not checks:
continue
if fname not in index:
index[fname] = {
"checks": set(), "sizes": set(),
"min_size": None, "max_size": None,
"crc32": set(), "md5": set(), "sha1": set(), "sha256": set(),
"adler32": set(), "crypto_only": set(),
"emulators": set(), "per_emulator": {},
}
index[fname]["emulators"].add(emu_name)
index[fname]["checks"].update(checks)
# Track non-reproducible crypto checks
index[fname]["crypto_only"].update(
c for c in checks if c in _CRYPTO_CHECKS
)
# Size checks
if "size" in checks:
if f.get("size") is not None:
index[fname]["sizes"].add(f["size"])
if f.get("min_size") is not None:
cur = index[fname]["min_size"]
index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"]
if f.get("max_size") is not None:
cur = index[fname]["max_size"]
index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"]
# Hash checks — collect all accepted hashes as sets (multiple valid
# versions of the same file, e.g. MT-32 ROM versions)
if "crc32" in checks and f.get("crc32"):
crc_val = f["crc32"]
crc_list = crc_val if isinstance(crc_val, list) else [crc_val]
for cv in crc_list:
norm = str(cv).lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["crc32"].add(norm)
for hash_type in ("md5", "sha1", "sha256"):
if hash_type in checks and f.get(hash_type):
val = f[hash_type]
if isinstance(val, list):
for h in val:
index[fname][hash_type].add(str(h).lower())
else:
index[fname][hash_type].add(str(val).lower())
# Adler32 — stored as known_hash_adler32 field (not in validation: list
# for Dolphin, but support it in both forms for future profiles)
adler_val = f.get("known_hash_adler32") or f.get("adler32")
if adler_val:
norm = adler_val.lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["adler32"].add(norm)
# Per-emulator ground truth detail
expected: dict = {}
if "size" in checks:
for key in ("size", "min_size", "max_size"):
if f.get(key) is not None:
expected[key] = f[key]
for hash_type in ("crc32", "md5", "sha1", "sha256"):
if hash_type in checks and f.get(hash_type):
expected[hash_type] = f[hash_type]
adler_val_pe = f.get("known_hash_adler32") or f.get("adler32")
if adler_val_pe:
expected["adler32"] = adler_val_pe
pe_entry = {
"checks": sorted(checks),
"source_ref": f.get("source_ref"),
"expected": expected,
}
pe = index[fname]["per_emulator"]
if emu_name in pe:
# Merge checks from multiple file entries for same emulator
existing = pe[emu_name]
merged_checks = sorted(set(existing["checks"]) | set(pe_entry["checks"]))
existing["checks"] = merged_checks
existing["expected"].update(pe_entry["expected"])
if pe_entry["source_ref"] and not existing["source_ref"]:
existing["source_ref"] = pe_entry["source_ref"]
else:
pe[emu_name] = pe_entry
# Convert sets to sorted tuples/lists for determinism
for v in index.values():
v["checks"] = sorted(v["checks"])
v["crypto_only"] = sorted(v["crypto_only"])
v["emulators"] = sorted(v["emulators"])
# Keep hash sets as frozensets for O(1) lookup in check_file_validation
return index
def build_ground_truth(filename: str, validation_index: dict[str, dict]) -> list[dict]:
"""Format per-emulator ground truth for a file from the validation index.
Returns a sorted list of {emulator, checks, source_ref, expected} dicts.
Returns [] if the file has no emulator validation data.
"""
entry = validation_index.get(filename)
if not entry or not entry.get("per_emulator"):
return []
result = []
for emu_name in sorted(entry["per_emulator"]):
detail = entry["per_emulator"][emu_name]
result.append({
"emulator": emu_name,
"checks": detail["checks"],
"source_ref": detail.get("source_ref"),
"expected": detail.get("expected", {}),
})
return result
def check_file_validation(
local_path: str, filename: str, validation_index: dict[str, dict],
bios_dir: str = "bios",
) -> str | None:
"""Check emulator-level validation on a resolved file.
Supports: size (exact/min/max), crc32, md5, sha1, adler32,
signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256).
Returns None if all checks pass or no validation applies.
Returns a reason string if a check fails.
"""
entry = validation_index.get(filename)
if not entry:
return None
checks = entry["checks"]
# Size checks — sizes is a set of accepted values
if "size" in checks:
actual_size = os.path.getsize(local_path)
if entry["sizes"] and actual_size not in entry["sizes"]:
expected = ",".join(str(s) for s in sorted(entry["sizes"]))
return f"size mismatch: got {actual_size}, accepted [{expected}]"
if entry["min_size"] is not None and actual_size < entry["min_size"]:
return f"size too small: min {entry['min_size']}, got {actual_size}"
if entry["max_size"] is not None and actual_size > entry["max_size"]:
return f"size too large: max {entry['max_size']}, got {actual_size}"
# Hash checks — compute once, reuse for all hash types.
# Each hash field is a set of accepted values (multiple valid ROM versions).
need_hashes = (
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256"))
or entry.get("adler32")
)
if need_hashes:
hashes = compute_hashes(local_path)
if "crc32" in checks and entry["crc32"]:
if hashes["crc32"].lower() not in entry["crc32"]:
expected = ",".join(sorted(entry["crc32"]))
return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]"
if "md5" in checks and entry["md5"]:
if hashes["md5"].lower() not in entry["md5"]:
expected = ",".join(sorted(entry["md5"]))
return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]"
if "sha1" in checks and entry["sha1"]:
if hashes["sha1"].lower() not in entry["sha1"]:
expected = ",".join(sorted(entry["sha1"]))
return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]"
if "sha256" in checks and entry["sha256"]:
if hashes["sha256"].lower() not in entry["sha256"]:
expected = ",".join(sorted(entry["sha256"]))
return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]"
if entry["adler32"]:
if hashes["adler32"].lower() not in entry["adler32"]:
expected = ",".join(sorted(entry["adler32"]))
return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]"
# Signature/crypto checks (3DS RSA, AES)
if entry["crypto_only"]:
from crypto_verify import check_crypto_validation
crypto_reason = check_crypto_validation(local_path, filename, bios_dir)
if crypto_reason:
return crypto_reason
return None
def validate_cli_modes(args, mode_attrs: list[str]) -> None:
"""Validate mutual exclusion of CLI mode arguments."""
modes = sum(1 for attr in mode_attrs if getattr(args, attr, None))
if modes == 0:
raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}")
if modes > 1:
raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}")
def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]:
"""Filter file entries by libretro/standalone mode."""
result = []
for f in files:
fmode = f.get("mode", "")
if standalone and fmode == "libretro":
continue
if not standalone and fmode == "standalone":
continue
result.append(f)
return result

View File

@@ -36,14 +36,16 @@ except ImportError:
sys.path.insert(0, os.path.dirname(__file__))
from common import (
_build_validation_index, _parse_validation, build_ground_truth,
build_zip_contents_index, check_file_validation,
check_inside_zip, compute_hashes, filter_files_by_mode,
build_zip_contents_index, check_inside_zip, compute_hashes,
filter_systems_by_target, group_identical_platforms, list_emulator_profiles,
list_system_ids, load_data_dir_registry, load_emulator_profiles,
load_platform_config, md5sum, md5_composite, resolve_local_file,
resolve_platform_cores,
)
from validation import (
_build_validation_index, _parse_validation, build_ground_truth,
check_file_validation, filter_files_by_mode,
)
DEFAULT_DB = "database.json"
DEFAULT_PLATFORMS_DIR = "platforms"
DEFAULT_EMULATORS_DIR = "emulators"

View File

@@ -30,14 +30,15 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
import yaml
from common import (
_build_validation_index, build_zip_contents_index, check_file_validation,
check_inside_zip, compute_hashes, diff_platform_truth,
filter_files_by_mode,
generate_platform_truth,
build_zip_contents_index, check_inside_zip, compute_hashes,
group_identical_platforms, load_emulator_profiles, load_platform_config,
md5_composite, md5sum, parse_md5_list, resolve_local_file,
resolve_platform_cores, safe_extract_zip,
)
from validation import (
_build_validation_index, check_file_validation, filter_files_by_mode,
)
from truth import diff_platform_truth, generate_platform_truth
from verify import (
Severity, Status, verify_platform, find_undeclared_files, find_exclusion_notes,
verify_emulator, _effective_validation_label,