From 3c7fc26354b91f8a89709e5f7846b862c4921884 Mon Sep 17 00:00:00 2001 From: Abdessamad Derraz <3028866+Abdess@users.noreply.github.com> Date: Sun, 29 Mar 2026 16:41:24 +0200 Subject: [PATCH] refactor: extract validation and truth modules from common.py --- scripts/common.py | 696 +------------------------------------- scripts/diff_truth.py | 3 +- scripts/generate_pack.py | 10 +- scripts/generate_truth.py | 2 +- scripts/truth.py | 451 ++++++++++++++++++++++++ scripts/validate_pr.py | 20 +- scripts/validation.py | 258 ++++++++++++++ scripts/verify.py | 8 +- tests/test_e2e.py | 9 +- 9 files changed, 747 insertions(+), 710 deletions(-) create mode 100644 scripts/truth.py create mode 100644 scripts/validation.py diff --git a/scripts/common.py b/scripts/common.py index 72bc27bf..52d1d6bc 100644 --- a/scripts/common.py +++ b/scripts/common.py @@ -801,251 +801,9 @@ def filter_systems_by_target( return filtered -def _parse_validation(validation: list | dict | None) -> list[str]: - """Extract the validation check list from a file's validation field. - Handles both simple list and divergent (core/upstream) dict forms. - For dicts, uses the ``core`` key since RetroArch users run the core. - """ - if validation is None: - return [] - if isinstance(validation, list): - return validation - if isinstance(validation, dict): - return validation.get("core", []) - return [] - - -# Validation types that require console-specific cryptographic keys. -# verify.py cannot reproduce these — size checks still apply if combined. -_CRYPTO_CHECKS = frozenset({"signature", "crypto"}) - -# All reproducible validation types. -_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"}) - - -def _build_validation_index(profiles: dict) -> dict[str, dict]: - """Build per-filename validation rules from emulator profiles. - - Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None, - "max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None, - "adler32": str|None, "crypto_only": [str], "per_emulator": {emu: detail}}}. - - ``crypto_only`` lists validation types we cannot reproduce (signature, crypto) - so callers can report them as non-verifiable rather than silently skipping. - - ``per_emulator`` preserves each core's individual checks, source_ref, and - expected values before merging, for ground truth reporting. - - When multiple emulators reference the same file, merges checks (union). - Raises ValueError if two profiles declare conflicting values. - """ - index: dict[str, dict] = {} - for emu_name, profile in profiles.items(): - if profile.get("type") in ("launcher", "alias"): - continue - for f in profile.get("files", []): - fname = f.get("name", "") - if not fname: - continue - checks = _parse_validation(f.get("validation")) - if not checks: - continue - if fname not in index: - index[fname] = { - "checks": set(), "sizes": set(), - "min_size": None, "max_size": None, - "crc32": set(), "md5": set(), "sha1": set(), "sha256": set(), - "adler32": set(), "crypto_only": set(), - "emulators": set(), "per_emulator": {}, - } - index[fname]["emulators"].add(emu_name) - index[fname]["checks"].update(checks) - # Track non-reproducible crypto checks - index[fname]["crypto_only"].update( - c for c in checks if c in _CRYPTO_CHECKS - ) - # Size checks - if "size" in checks: - if f.get("size") is not None: - index[fname]["sizes"].add(f["size"]) - if f.get("min_size") is not None: - cur = index[fname]["min_size"] - index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"] - if f.get("max_size") is not None: - cur = index[fname]["max_size"] - index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"] - # Hash checks — collect all accepted hashes as sets (multiple valid - # versions of the same file, e.g. MT-32 ROM versions) - if "crc32" in checks and f.get("crc32"): - crc_val = f["crc32"] - crc_list = crc_val if isinstance(crc_val, list) else [crc_val] - for cv in crc_list: - norm = str(cv).lower() - if norm.startswith("0x"): - norm = norm[2:] - index[fname]["crc32"].add(norm) - for hash_type in ("md5", "sha1", "sha256"): - if hash_type in checks and f.get(hash_type): - val = f[hash_type] - if isinstance(val, list): - for h in val: - index[fname][hash_type].add(str(h).lower()) - else: - index[fname][hash_type].add(str(val).lower()) - # Adler32 — stored as known_hash_adler32 field (not in validation: list - # for Dolphin, but support it in both forms for future profiles) - adler_val = f.get("known_hash_adler32") or f.get("adler32") - if adler_val: - norm = adler_val.lower() - if norm.startswith("0x"): - norm = norm[2:] - index[fname]["adler32"].add(norm) - # Per-emulator ground truth detail - expected: dict = {} - if "size" in checks: - for key in ("size", "min_size", "max_size"): - if f.get(key) is not None: - expected[key] = f[key] - for hash_type in ("crc32", "md5", "sha1", "sha256"): - if hash_type in checks and f.get(hash_type): - expected[hash_type] = f[hash_type] - adler_val_pe = f.get("known_hash_adler32") or f.get("adler32") - if adler_val_pe: - expected["adler32"] = adler_val_pe - pe_entry = { - "checks": sorted(checks), - "source_ref": f.get("source_ref"), - "expected": expected, - } - pe = index[fname]["per_emulator"] - if emu_name in pe: - # Merge checks from multiple file entries for same emulator - existing = pe[emu_name] - merged_checks = sorted(set(existing["checks"]) | set(pe_entry["checks"])) - existing["checks"] = merged_checks - existing["expected"].update(pe_entry["expected"]) - if pe_entry["source_ref"] and not existing["source_ref"]: - existing["source_ref"] = pe_entry["source_ref"] - else: - pe[emu_name] = pe_entry - # Convert sets to sorted tuples/lists for determinism - for v in index.values(): - v["checks"] = sorted(v["checks"]) - v["crypto_only"] = sorted(v["crypto_only"]) - v["emulators"] = sorted(v["emulators"]) - # Keep hash sets as frozensets for O(1) lookup in check_file_validation - return index - - -def build_ground_truth(filename: str, validation_index: dict[str, dict]) -> list[dict]: - """Format per-emulator ground truth for a file from the validation index. - - Returns a sorted list of {emulator, checks, source_ref, expected} dicts. - Returns [] if the file has no emulator validation data. - """ - entry = validation_index.get(filename) - if not entry or not entry.get("per_emulator"): - return [] - result = [] - for emu_name in sorted(entry["per_emulator"]): - detail = entry["per_emulator"][emu_name] - result.append({ - "emulator": emu_name, - "checks": detail["checks"], - "source_ref": detail.get("source_ref"), - "expected": detail.get("expected", {}), - }) - return result - - -def check_file_validation( - local_path: str, filename: str, validation_index: dict[str, dict], - bios_dir: str = "bios", -) -> str | None: - """Check emulator-level validation on a resolved file. - - Supports: size (exact/min/max), crc32, md5, sha1, adler32, - signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256). - - Returns None if all checks pass or no validation applies. - Returns a reason string if a check fails. - """ - entry = validation_index.get(filename) - if not entry: - return None - checks = entry["checks"] - - # Size checks — sizes is a set of accepted values - if "size" in checks: - actual_size = os.path.getsize(local_path) - if entry["sizes"] and actual_size not in entry["sizes"]: - expected = ",".join(str(s) for s in sorted(entry["sizes"])) - return f"size mismatch: got {actual_size}, accepted [{expected}]" - if entry["min_size"] is not None and actual_size < entry["min_size"]: - return f"size too small: min {entry['min_size']}, got {actual_size}" - if entry["max_size"] is not None and actual_size > entry["max_size"]: - return f"size too large: max {entry['max_size']}, got {actual_size}" - - # Hash checks — compute once, reuse for all hash types. - # Each hash field is a set of accepted values (multiple valid ROM versions). - need_hashes = ( - any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256")) - or entry.get("adler32") - ) - if need_hashes: - hashes = compute_hashes(local_path) - if "crc32" in checks and entry["crc32"]: - if hashes["crc32"].lower() not in entry["crc32"]: - expected = ",".join(sorted(entry["crc32"])) - return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]" - if "md5" in checks and entry["md5"]: - if hashes["md5"].lower() not in entry["md5"]: - expected = ",".join(sorted(entry["md5"])) - return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]" - if "sha1" in checks and entry["sha1"]: - if hashes["sha1"].lower() not in entry["sha1"]: - expected = ",".join(sorted(entry["sha1"])) - return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]" - if "sha256" in checks and entry["sha256"]: - if hashes["sha256"].lower() not in entry["sha256"]: - expected = ",".join(sorted(entry["sha256"])) - return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]" - if entry["adler32"]: - if hashes["adler32"].lower() not in entry["adler32"]: - expected = ",".join(sorted(entry["adler32"])) - return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]" - - # Signature/crypto checks (3DS RSA, AES) - if entry["crypto_only"]: - from crypto_verify import check_crypto_validation - crypto_reason = check_crypto_validation(local_path, filename, bios_dir) - if crypto_reason: - return crypto_reason - - return None - - -def validate_cli_modes(args, mode_attrs: list[str]) -> None: - """Validate mutual exclusion of CLI mode arguments.""" - modes = sum(1 for attr in mode_attrs if getattr(args, attr, None)) - if modes == 0: - raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}") - if modes > 1: - raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}") - - -def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]: - """Filter file entries by libretro/standalone mode.""" - result = [] - for f in files: - fmode = f.get("mode", "") - if standalone and fmode == "libretro": - continue - if not standalone and fmode == "standalone": - continue - result.append(f) - return result +# Validation and mode filtering — extracted to validation.py for SoC. +# Re-exported below for backward compatibility. LARGE_FILES_RELEASE = "large-files" @@ -1151,445 +909,13 @@ def list_platform_system_ids(platform_name: str, platforms_dir: str) -> None: print(f" {sys_id:35s} ({file_count} file{'s' if file_count != 1 else ''}){mfr_display}") -# --------------------------------------------------------------- -# Truth generation — build ground-truth YAML from emulator profiles -# --------------------------------------------------------------- -def _determine_core_mode( - emu_name: str, profile: dict, - cores_config: str | list | None, - standalone_set: set[str] | None, -) -> str: - """Determine effective mode (libretro/standalone) for a resolved core.""" - if cores_config == "all_libretro": - return "libretro" - if standalone_set is not None: - profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])} - if profile_names & standalone_set: - return "standalone" - return "libretro" - ptype = profile.get("type", "libretro") - if "standalone" in ptype and "libretro" in ptype: - return "both" - if "standalone" in ptype: - return "standalone" - return "libretro" - - -def _enrich_hashes(entry: dict, db: dict) -> None: - """Fill missing hash fields from the database.""" - sha1 = entry.get("sha1", "") - md5 = entry.get("md5", "") - - record = None - if sha1 and db.get("files"): - record = db["files"].get(sha1) - if record is None and md5: - by_md5 = db.get("by_md5", {}) - md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else "" - ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None - if ref_sha1 and db.get("files"): - record = db["files"].get(ref_sha1) - if record is None: - return - - for field in ("sha1", "md5", "sha256", "crc32"): - if not entry.get(field) and record.get(field): - entry[field] = record[field] - - -def _merge_file_into_system( - system: dict, file_entry: dict, emu_name: str, db: dict | None, -) -> None: - """Merge a file entry into a system's file list, deduplicating by name.""" - files = system.setdefault("files", []) - name_lower = file_entry["name"].lower() - - existing = None - for f in files: - if f["name"].lower() == name_lower: - existing = f - break - - if existing is not None: - existing["_cores"] = existing.get("_cores", set()) | {emu_name} - sr = file_entry.get("source_ref") - if sr is not None: - sr_key = str(sr) if not isinstance(sr, str) else sr - existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key} - else: - existing.setdefault("_source_refs", set()) - if file_entry.get("required") and not existing.get("required"): - existing["required"] = True - for h in ("sha1", "md5", "sha256", "crc32"): - theirs = file_entry.get(h, "") - ours = existing.get(h, "") - if theirs and ours and theirs.lower() != ours.lower(): - import sys as _sys - print( - f"WARNING: hash conflict for {file_entry['name']} " - f"({h}: {ours} vs {theirs}, core {emu_name})", - file=_sys.stderr, - ) - elif theirs and not ours: - existing[h] = theirs - return - - entry: dict = {"name": file_entry["name"]} - if file_entry.get("required") is not None: - entry["required"] = file_entry["required"] - for field in ("sha1", "md5", "sha256", "crc32", "size", "path", - "description", "hle_fallback", "category", "note", - "validation", "min_size", "max_size", "aliases"): - val = file_entry.get(field) - if val is not None: - entry[field] = val - entry["_cores"] = {emu_name} - sr = file_entry.get("source_ref") - if sr is not None: - sr_key = str(sr) if not isinstance(sr, str) else sr - entry["_source_refs"] = {sr_key} - else: - entry["_source_refs"] = set() - - if db: - _enrich_hashes(entry, db) - - files.append(entry) - - -def generate_platform_truth( - platform_name: str, - config: dict, - registry_entry: dict, - profiles: dict[str, dict], - db: dict | None = None, - target_cores: set[str] | None = None, -) -> dict: - """Generate ground-truth system data for a platform from emulator profiles. - - Args: - platform_name: platform identifier - config: loaded platform config (via load_platform_config), has cores, - systems, standalone_cores with inheritance resolved - registry_entry: registry metadata for hash_type, verification_mode, etc. - profiles: all loaded emulator profiles - db: optional database for hash enrichment - target_cores: optional hardware target core filter - - Returns a dict with platform metadata, systems, and per-file details - including which cores reference each file. - """ - cores_config = config.get("cores") - - # Resolve standalone set for mode determination - standalone_set: set[str] | None = None - standalone_cores = config.get("standalone_cores") - if isinstance(standalone_cores, list): - standalone_set = {str(c) for c in standalone_cores} - - resolved = resolve_platform_cores(config, profiles, target_cores) - - # Build mapping: profile system ID -> platform system ID - # Three strategies, tried in order: - # 1. File-based: if the scraped platform already has this file, use its system - # 2. Exact match: profile system ID == platform system ID - # 3. Normalized match: strip manufacturer prefix + separators - platform_sys_ids = set(config.get("systems", {}).keys()) - - # File→platform_system reverse index from scraped config - file_to_plat_sys: dict[str, str] = {} - for psid, sys_data in config.get("systems", {}).items(): - for fe in sys_data.get("files", []): - fname = fe.get("name", "").lower() - if fname: - file_to_plat_sys[fname] = psid - for alias in fe.get("aliases", []): - file_to_plat_sys[alias.lower()] = psid - - # Normalized ID → platform system ID - norm_to_platform: dict[str, str] = {} - for psid in platform_sys_ids: - norm_to_platform[_norm_system_id(psid)] = psid - - def _map_sys_id(profile_sid: str, file_name: str = "") -> str: - """Map a profile system ID to the platform's system ID.""" - # 1. File-based lookup (handles composites and name mismatches) - if file_name: - plat_sys = file_to_plat_sys.get(file_name.lower()) - if plat_sys: - return plat_sys - # 2. Exact match - if profile_sid in platform_sys_ids: - return profile_sid - # 3. Normalized match - normed = _norm_system_id(profile_sid) - return norm_to_platform.get(normed, profile_sid) - - systems: dict[str, dict] = {} - cores_profiled: set[str] = set() - cores_unprofiled: set[str] = set() - # Track which cores contribute to each system - system_cores: dict[str, dict[str, set[str]]] = {} - - for emu_name in sorted(resolved): - profile = profiles.get(emu_name) - if not profile: - cores_unprofiled.add(emu_name) - continue - cores_profiled.add(emu_name) - - mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set) - raw_files = profile.get("files", []) - if mode == "both": - filtered = raw_files - else: - filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone")) - - for fe in filtered: - profile_sid = fe.get("system", "") - if not profile_sid: - sys_ids = profile.get("systems", []) - profile_sid = sys_ids[0] if sys_ids else "unknown" - sys_id = _map_sys_id(profile_sid, fe.get("name", "")) - system = systems.setdefault(sys_id, {}) - _merge_file_into_system(system, fe, emu_name, db) - # Track core contribution per system - sys_cov = system_cores.setdefault(sys_id, { - "profiled": set(), "unprofiled": set(), - }) - sys_cov["profiled"].add(emu_name) - - # Ensure all systems of resolved cores have entries (even with 0 files). - # This documents that the system is covered — the core was analyzed and - # needs no external files for this system. - for emu_name in cores_profiled: - profile = profiles[emu_name] - for prof_sid in profile.get("systems", []): - sys_id = _map_sys_id(prof_sid) - systems.setdefault(sys_id, {}) - sys_cov = system_cores.setdefault(sys_id, { - "profiled": set(), "unprofiled": set(), - }) - sys_cov["profiled"].add(emu_name) - - # Track unprofiled cores per system based on profile system lists - for emu_name in cores_unprofiled: - for sys_id in systems: - sys_cov = system_cores.setdefault(sys_id, { - "profiled": set(), "unprofiled": set(), - }) - sys_cov["unprofiled"].add(emu_name) - - # Convert sets to sorted lists for serialization - for sys_id, sys_data in systems.items(): - for fe in sys_data.get("files", []): - fe["_cores"] = sorted(fe.get("_cores", set())) - fe["_source_refs"] = sorted(fe.get("_source_refs", set())) - # Add per-system coverage - cov = system_cores.get(sys_id, {}) - sys_data["_coverage"] = { - "cores_profiled": sorted(cov.get("profiled", set())), - "cores_unprofiled": sorted(cov.get("unprofiled", set())), - } - - return { - "platform": platform_name, - "generated": True, - "systems": systems, - "_coverage": { - "cores_resolved": len(resolved), - "cores_profiled": len(cores_profiled), - "cores_unprofiled": sorted(cores_unprofiled), - }, - } - - -# ------------------------------------------------------------------- -# Platform truth diffing -# ------------------------------------------------------------------- - -def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict: - """Compare files between truth and scraped for a single system.""" - # Build truth index: name.lower() -> entry, alias.lower() -> entry - truth_index: dict[str, dict] = {} - for fe in truth_sys.get("files", []): - truth_index[fe["name"].lower()] = fe - for alias in fe.get("aliases", []): - truth_index[alias.lower()] = fe - - # Build scraped index: name.lower() -> entry - scraped_index: dict[str, dict] = {} - for fe in scraped_sys.get("files", []): - scraped_index[fe["name"].lower()] = fe - - missing: list[dict] = [] - hash_mismatch: list[dict] = [] - required_mismatch: list[dict] = [] - extra_phantom: list[dict] = [] - extra_unprofiled: list[dict] = [] - - matched_truth_names: set[str] = set() - - # Compare scraped files against truth - for s_key, s_entry in scraped_index.items(): - t_entry = truth_index.get(s_key) - if t_entry is None: - continue - matched_truth_names.add(t_entry["name"].lower()) - - # Hash comparison - for h in ("sha1", "md5", "crc32"): - t_hash = t_entry.get(h, "") - s_hash = s_entry.get(h, "") - if not t_hash or not s_hash: - continue - # Normalize to list for multi-hash support - t_list = t_hash if isinstance(t_hash, list) else [t_hash] - s_list = s_hash if isinstance(s_hash, list) else [s_hash] - t_set = {v.lower() for v in t_list} - s_set = {v.lower() for v in s_list} - if not t_set & s_set: - hash_mismatch.append({ - "name": s_entry["name"], - "hash_type": h, - f"truth_{h}": t_hash, - f"scraped_{h}": s_hash, - "truth_cores": list(t_entry.get("_cores", [])), - }) - break - - # Required mismatch - t_req = t_entry.get("required") - s_req = s_entry.get("required") - if t_req is not None and s_req is not None and t_req != s_req: - required_mismatch.append({ - "name": s_entry["name"], - "truth_required": t_req, - "scraped_required": s_req, - }) - - # Truth files not matched -> missing - for fe in truth_sys.get("files", []): - if fe["name"].lower() not in matched_truth_names: - missing.append({ - "name": fe["name"], - "cores": list(fe.get("_cores", [])), - "source_refs": list(fe.get("_source_refs", [])), - }) - - # Scraped files not in truth -> extra - coverage = truth_sys.get("_coverage", {}) - has_unprofiled = bool(coverage.get("cores_unprofiled")) - for s_key, s_entry in scraped_index.items(): - if s_key not in truth_index: - entry = {"name": s_entry["name"]} - if has_unprofiled: - extra_unprofiled.append(entry) - else: - extra_phantom.append(entry) - - result: dict = {} - if missing: - result["missing"] = missing - if hash_mismatch: - result["hash_mismatch"] = hash_mismatch - if required_mismatch: - result["required_mismatch"] = required_mismatch - if extra_phantom: - result["extra_phantom"] = extra_phantom - if extra_unprofiled: - result["extra_unprofiled"] = extra_unprofiled - return result - - -def _has_divergences(sys_div: dict) -> bool: - """Check if a system divergence dict contains any actual divergences.""" - return bool(sys_div) - - -def _update_summary(summary: dict, sys_div: dict) -> None: - """Update summary counters from a system divergence dict.""" - summary["total_missing"] += len(sys_div.get("missing", [])) - summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", [])) - summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", [])) - summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", [])) - summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", [])) - - -def diff_platform_truth(truth: dict, scraped: dict) -> dict: - """Compare truth YAML against scraped YAML, returning divergences. - - System IDs are matched using normalized forms (via _norm_system_id) to - handle naming differences between emulator profiles and scraped platforms - (e.g. 'sega-game-gear' vs 'sega-gamegear'). - """ - truth_systems = truth.get("systems", {}) - scraped_systems = scraped.get("systems", {}) - - summary = { - "systems_compared": 0, - "systems_fully_covered": 0, - "systems_partially_covered": 0, - "systems_uncovered": 0, - "total_missing": 0, - "total_extra_phantom": 0, - "total_extra_unprofiled": 0, - "total_hash_mismatch": 0, - "total_required_mismatch": 0, - } - - divergences: dict[str, dict] = {} - uncovered_systems: list[str] = [] - - # Build normalized-ID lookup for truth systems - norm_to_truth: dict[str, str] = {} - for sid in truth_systems: - norm_to_truth[_norm_system_id(sid)] = sid - - # Match scraped systems to truth via normalized IDs - matched_truth: set[str] = set() - - for s_sid in sorted(scraped_systems): - norm = _norm_system_id(s_sid) - t_sid = norm_to_truth.get(norm) - - if t_sid is None: - # Also try exact match (in case normalization is lossy) - if s_sid in truth_systems: - t_sid = s_sid - else: - uncovered_systems.append(s_sid) - summary["systems_uncovered"] += 1 - continue - - matched_truth.add(t_sid) - summary["systems_compared"] += 1 - sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid]) - - if _has_divergences(sys_div): - divergences[s_sid] = sys_div - _update_summary(summary, sys_div) - summary["systems_partially_covered"] += 1 - else: - summary["systems_fully_covered"] += 1 - - # Truth systems not matched by any scraped system — all files missing - for t_sid in sorted(truth_systems): - if t_sid in matched_truth: - continue - summary["systems_compared"] += 1 - sys_div = _diff_system(truth_systems[t_sid], {"files": []}) - if _has_divergences(sys_div): - divergences[t_sid] = sys_div - _update_summary(summary, sys_div) - summary["systems_partially_covered"] += 1 - else: - summary["systems_fully_covered"] += 1 - - result: dict = {"summary": summary} - if divergences: - result["divergences"] = divergences - if uncovered_systems: - result["uncovered_systems"] = uncovered_systems - return result +# Re-exports: validation and truth modules extracted for SoC. +# Existing consumers import from common — these preserve that contract. +from validation import ( # noqa: F401, E402 + _build_validation_index, _parse_validation, build_ground_truth, + check_file_validation, filter_files_by_mode, validate_cli_modes, +) +from truth import ( # noqa: F401, E402 + diff_platform_truth, generate_platform_truth, +) diff --git a/scripts/diff_truth.py b/scripts/diff_truth.py index 56bd3f54..ab01e05a 100644 --- a/scripts/diff_truth.py +++ b/scripts/diff_truth.py @@ -16,7 +16,8 @@ import os import sys sys.path.insert(0, os.path.dirname(__file__)) -from common import diff_platform_truth, list_registered_platforms, load_platform_config +from common import list_registered_platforms, load_platform_config +from truth import diff_platform_truth try: import yaml diff --git a/scripts/generate_pack.py b/scripts/generate_pack.py index af5a81c4..2e77bd3e 100644 --- a/scripts/generate_pack.py +++ b/scripts/generate_pack.py @@ -27,14 +27,16 @@ from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) from common import ( MANUFACTURER_PREFIXES, - _build_validation_index, build_zip_contents_index, check_file_validation, - check_inside_zip, compute_hashes, fetch_large_file, filter_files_by_mode, - group_identical_platforms, list_emulator_profiles, list_platform_system_ids, - list_registered_platforms, + build_zip_contents_index, check_inside_zip, compute_hashes, + fetch_large_file, group_identical_platforms, list_emulator_profiles, + list_platform_system_ids, list_registered_platforms, filter_systems_by_target, list_system_ids, load_database, load_data_dir_registry, load_emulator_profiles, load_platform_config, md5_composite, resolve_local_file, ) +from validation import ( + _build_validation_index, check_file_validation, filter_files_by_mode, +) from deterministic_zip import rebuild_zip_deterministic try: diff --git a/scripts/generate_truth.py b/scripts/generate_truth.py index 6fb6692b..0d0fb9cc 100644 --- a/scripts/generate_truth.py +++ b/scripts/generate_truth.py @@ -14,13 +14,13 @@ import sys sys.path.insert(0, os.path.dirname(__file__)) from common import ( - generate_platform_truth, list_registered_platforms, load_database, load_emulator_profiles, load_platform_config, load_target_config, ) +from truth import generate_platform_truth try: import yaml diff --git a/scripts/truth.py b/scripts/truth.py new file mode 100644 index 00000000..fd2e4034 --- /dev/null +++ b/scripts/truth.py @@ -0,0 +1,451 @@ +"""Platform truth generation and diffing. + +Generates ground-truth YAML from emulator profiles for gap analysis, +and diffs truth against scraped platform data to find divergences. +""" + +from __future__ import annotations + +import sys + +from common import _norm_system_id, resolve_platform_cores +from validation import filter_files_by_mode + + +def _determine_core_mode( + emu_name: str, profile: dict, + cores_config: str | list | None, + standalone_set: set[str] | None, +) -> str: + """Determine effective mode (libretro/standalone) for a resolved core.""" + if cores_config == "all_libretro": + return "libretro" + if standalone_set is not None: + profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])} + if profile_names & standalone_set: + return "standalone" + return "libretro" + ptype = profile.get("type", "libretro") + if "standalone" in ptype and "libretro" in ptype: + return "both" + if "standalone" in ptype: + return "standalone" + return "libretro" + + +def _enrich_hashes(entry: dict, db: dict) -> None: + """Fill missing hash fields from the database.""" + sha1 = entry.get("sha1", "") + md5 = entry.get("md5", "") + + record = None + if sha1 and db.get("files"): + record = db["files"].get(sha1) + if record is None and md5: + by_md5 = db.get("by_md5", {}) + md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else "" + ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None + if ref_sha1 and db.get("files"): + record = db["files"].get(ref_sha1) + if record is None: + return + + for field in ("sha1", "md5", "sha256", "crc32"): + if not entry.get(field) and record.get(field): + entry[field] = record[field] + + +def _merge_file_into_system( + system: dict, file_entry: dict, emu_name: str, db: dict | None, +) -> None: + """Merge a file entry into a system's file list, deduplicating by name.""" + files = system.setdefault("files", []) + name_lower = file_entry["name"].lower() + + existing = None + for f in files: + if f["name"].lower() == name_lower: + existing = f + break + + if existing is not None: + existing["_cores"] = existing.get("_cores", set()) | {emu_name} + sr = file_entry.get("source_ref") + if sr is not None: + sr_key = str(sr) if not isinstance(sr, str) else sr + existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key} + else: + existing.setdefault("_source_refs", set()) + if file_entry.get("required") and not existing.get("required"): + existing["required"] = True + for h in ("sha1", "md5", "sha256", "crc32"): + theirs = file_entry.get(h, "") + ours = existing.get(h, "") + if theirs and ours and theirs.lower() != ours.lower(): + print( + f"WARNING: hash conflict for {file_entry['name']} " + f"({h}: {ours} vs {theirs}, core {emu_name})", + file=sys.stderr, + ) + elif theirs and not ours: + existing[h] = theirs + return + + entry: dict = {"name": file_entry["name"]} + if file_entry.get("required") is not None: + entry["required"] = file_entry["required"] + for field in ("sha1", "md5", "sha256", "crc32", "size", "path", + "description", "hle_fallback", "category", "note", + "validation", "min_size", "max_size", "aliases"): + val = file_entry.get(field) + if val is not None: + entry[field] = val + entry["_cores"] = {emu_name} + sr = file_entry.get("source_ref") + if sr is not None: + sr_key = str(sr) if not isinstance(sr, str) else sr + entry["_source_refs"] = {sr_key} + else: + entry["_source_refs"] = set() + + if db: + _enrich_hashes(entry, db) + + files.append(entry) + + +def generate_platform_truth( + platform_name: str, + config: dict, + registry_entry: dict, + profiles: dict[str, dict], + db: dict | None = None, + target_cores: set[str] | None = None, +) -> dict: + """Generate ground-truth system data for a platform from emulator profiles. + + Args: + platform_name: platform identifier + config: loaded platform config (via load_platform_config), has cores, + systems, standalone_cores with inheritance resolved + registry_entry: registry metadata for hash_type, verification_mode, etc. + profiles: all loaded emulator profiles + db: optional database for hash enrichment + target_cores: optional hardware target core filter + + Returns a dict with platform metadata, systems, and per-file details + including which cores reference each file. + """ + cores_config = config.get("cores") + + # Resolve standalone set for mode determination + standalone_set: set[str] | None = None + standalone_cores = config.get("standalone_cores") + if isinstance(standalone_cores, list): + standalone_set = {str(c) for c in standalone_cores} + + resolved = resolve_platform_cores(config, profiles, target_cores) + + # Build mapping: profile system ID -> platform system ID + # Three strategies, tried in order: + # 1. File-based: if the scraped platform already has this file, use its system + # 2. Exact match: profile system ID == platform system ID + # 3. Normalized match: strip manufacturer prefix + separators + platform_sys_ids = set(config.get("systems", {}).keys()) + + # File->platform_system reverse index from scraped config + file_to_plat_sys: dict[str, str] = {} + for psid, sys_data in config.get("systems", {}).items(): + for fe in sys_data.get("files", []): + fname = fe.get("name", "").lower() + if fname: + file_to_plat_sys[fname] = psid + for alias in fe.get("aliases", []): + file_to_plat_sys[alias.lower()] = psid + + # Normalized ID -> platform system ID + norm_to_platform: dict[str, str] = {} + for psid in platform_sys_ids: + norm_to_platform[_norm_system_id(psid)] = psid + + def _map_sys_id(profile_sid: str, file_name: str = "") -> str: + """Map a profile system ID to the platform's system ID.""" + # 1. File-based lookup (handles composites and name mismatches) + if file_name: + plat_sys = file_to_plat_sys.get(file_name.lower()) + if plat_sys: + return plat_sys + # 2. Exact match + if profile_sid in platform_sys_ids: + return profile_sid + # 3. Normalized match + normed = _norm_system_id(profile_sid) + return norm_to_platform.get(normed, profile_sid) + + systems: dict[str, dict] = {} + cores_profiled: set[str] = set() + cores_unprofiled: set[str] = set() + # Track which cores contribute to each system + system_cores: dict[str, dict[str, set[str]]] = {} + + for emu_name in sorted(resolved): + profile = profiles.get(emu_name) + if not profile: + cores_unprofiled.add(emu_name) + continue + cores_profiled.add(emu_name) + + mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set) + raw_files = profile.get("files", []) + if mode == "both": + filtered = raw_files + else: + filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone")) + + for fe in filtered: + profile_sid = fe.get("system", "") + if not profile_sid: + sys_ids = profile.get("systems", []) + profile_sid = sys_ids[0] if sys_ids else "unknown" + sys_id = _map_sys_id(profile_sid, fe.get("name", "")) + system = systems.setdefault(sys_id, {}) + _merge_file_into_system(system, fe, emu_name, db) + # Track core contribution per system + sys_cov = system_cores.setdefault(sys_id, { + "profiled": set(), "unprofiled": set(), + }) + sys_cov["profiled"].add(emu_name) + + # Ensure all systems of resolved cores have entries (even with 0 files). + # This documents that the system is covered — the core was analyzed and + # needs no external files for this system. + for emu_name in cores_profiled: + profile = profiles[emu_name] + for prof_sid in profile.get("systems", []): + sys_id = _map_sys_id(prof_sid) + systems.setdefault(sys_id, {}) + sys_cov = system_cores.setdefault(sys_id, { + "profiled": set(), "unprofiled": set(), + }) + sys_cov["profiled"].add(emu_name) + + # Track unprofiled cores per system based on profile system lists + for emu_name in cores_unprofiled: + for sys_id in systems: + sys_cov = system_cores.setdefault(sys_id, { + "profiled": set(), "unprofiled": set(), + }) + sys_cov["unprofiled"].add(emu_name) + + # Convert sets to sorted lists for serialization + for sys_id, sys_data in systems.items(): + for fe in sys_data.get("files", []): + fe["_cores"] = sorted(fe.get("_cores", set())) + fe["_source_refs"] = sorted(fe.get("_source_refs", set())) + # Add per-system coverage + cov = system_cores.get(sys_id, {}) + sys_data["_coverage"] = { + "cores_profiled": sorted(cov.get("profiled", set())), + "cores_unprofiled": sorted(cov.get("unprofiled", set())), + } + + return { + "platform": platform_name, + "generated": True, + "systems": systems, + "_coverage": { + "cores_resolved": len(resolved), + "cores_profiled": len(cores_profiled), + "cores_unprofiled": sorted(cores_unprofiled), + }, + } + + +# ------------------------------------------------------------------- +# Platform truth diffing +# ------------------------------------------------------------------- + +def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict: + """Compare files between truth and scraped for a single system.""" + # Build truth index: name.lower() -> entry, alias.lower() -> entry + truth_index: dict[str, dict] = {} + for fe in truth_sys.get("files", []): + truth_index[fe["name"].lower()] = fe + for alias in fe.get("aliases", []): + truth_index[alias.lower()] = fe + + # Build scraped index: name.lower() -> entry + scraped_index: dict[str, dict] = {} + for fe in scraped_sys.get("files", []): + scraped_index[fe["name"].lower()] = fe + + missing: list[dict] = [] + hash_mismatch: list[dict] = [] + required_mismatch: list[dict] = [] + extra_phantom: list[dict] = [] + extra_unprofiled: list[dict] = [] + + matched_truth_names: set[str] = set() + + # Compare scraped files against truth + for s_key, s_entry in scraped_index.items(): + t_entry = truth_index.get(s_key) + if t_entry is None: + continue + matched_truth_names.add(t_entry["name"].lower()) + + # Hash comparison + for h in ("sha1", "md5", "crc32"): + t_hash = t_entry.get(h, "") + s_hash = s_entry.get(h, "") + if not t_hash or not s_hash: + continue + # Normalize to list for multi-hash support + t_list = t_hash if isinstance(t_hash, list) else [t_hash] + s_list = s_hash if isinstance(s_hash, list) else [s_hash] + t_set = {v.lower() for v in t_list} + s_set = {v.lower() for v in s_list} + if not t_set & s_set: + hash_mismatch.append({ + "name": s_entry["name"], + "hash_type": h, + f"truth_{h}": t_hash, + f"scraped_{h}": s_hash, + "truth_cores": list(t_entry.get("_cores", [])), + }) + break + + # Required mismatch + t_req = t_entry.get("required") + s_req = s_entry.get("required") + if t_req is not None and s_req is not None and t_req != s_req: + required_mismatch.append({ + "name": s_entry["name"], + "truth_required": t_req, + "scraped_required": s_req, + }) + + # Truth files not matched -> missing + for fe in truth_sys.get("files", []): + if fe["name"].lower() not in matched_truth_names: + missing.append({ + "name": fe["name"], + "cores": list(fe.get("_cores", [])), + "source_refs": list(fe.get("_source_refs", [])), + }) + + # Scraped files not in truth -> extra + coverage = truth_sys.get("_coverage", {}) + has_unprofiled = bool(coverage.get("cores_unprofiled")) + for s_key, s_entry in scraped_index.items(): + if s_key not in truth_index: + entry = {"name": s_entry["name"]} + if has_unprofiled: + extra_unprofiled.append(entry) + else: + extra_phantom.append(entry) + + result: dict = {} + if missing: + result["missing"] = missing + if hash_mismatch: + result["hash_mismatch"] = hash_mismatch + if required_mismatch: + result["required_mismatch"] = required_mismatch + if extra_phantom: + result["extra_phantom"] = extra_phantom + if extra_unprofiled: + result["extra_unprofiled"] = extra_unprofiled + return result + + +def _has_divergences(sys_div: dict) -> bool: + """Check if a system divergence dict contains any actual divergences.""" + return bool(sys_div) + + +def _update_summary(summary: dict, sys_div: dict) -> None: + """Update summary counters from a system divergence dict.""" + summary["total_missing"] += len(sys_div.get("missing", [])) + summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", [])) + summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", [])) + summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", [])) + summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", [])) + + +def diff_platform_truth(truth: dict, scraped: dict) -> dict: + """Compare truth YAML against scraped YAML, returning divergences. + + System IDs are matched using normalized forms (via _norm_system_id) to + handle naming differences between emulator profiles and scraped platforms + (e.g. 'sega-game-gear' vs 'sega-gamegear'). + """ + truth_systems = truth.get("systems", {}) + scraped_systems = scraped.get("systems", {}) + + summary = { + "systems_compared": 0, + "systems_fully_covered": 0, + "systems_partially_covered": 0, + "systems_uncovered": 0, + "total_missing": 0, + "total_extra_phantom": 0, + "total_extra_unprofiled": 0, + "total_hash_mismatch": 0, + "total_required_mismatch": 0, + } + + divergences: dict[str, dict] = {} + uncovered_systems: list[str] = [] + + # Build normalized-ID lookup for truth systems + norm_to_truth: dict[str, str] = {} + for sid in truth_systems: + norm_to_truth[_norm_system_id(sid)] = sid + + # Match scraped systems to truth via normalized IDs + matched_truth: set[str] = set() + + for s_sid in sorted(scraped_systems): + norm = _norm_system_id(s_sid) + t_sid = norm_to_truth.get(norm) + + if t_sid is None: + # Also try exact match (in case normalization is lossy) + if s_sid in truth_systems: + t_sid = s_sid + else: + uncovered_systems.append(s_sid) + summary["systems_uncovered"] += 1 + continue + + matched_truth.add(t_sid) + summary["systems_compared"] += 1 + sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid]) + + if _has_divergences(sys_div): + divergences[s_sid] = sys_div + _update_summary(summary, sys_div) + summary["systems_partially_covered"] += 1 + else: + summary["systems_fully_covered"] += 1 + + # Truth systems not matched by any scraped system — all files missing + for t_sid in sorted(truth_systems): + if t_sid in matched_truth: + continue + summary["systems_compared"] += 1 + sys_div = _diff_system(truth_systems[t_sid], {"files": []}) + if _has_divergences(sys_div): + divergences[t_sid] = sys_div + _update_summary(summary, sys_div) + summary["systems_partially_covered"] += 1 + else: + summary["systems_fully_covered"] += 1 + + result: dict = {"summary": summary} + if divergences: + result["divergences"] = divergences + if uncovered_systems: + result["uncovered_systems"] = uncovered_systems + return result diff --git a/scripts/validate_pr.py b/scripts/validate_pr.py index f0d64215..62a448fe 100644 --- a/scripts/validate_pr.py +++ b/scripts/validate_pr.py @@ -25,7 +25,7 @@ import sys from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) -from common import compute_hashes, list_registered_platforms, load_database as _load_database +from common import compute_hashes, list_registered_platforms, load_database try: import yaml @@ -90,16 +90,6 @@ class ValidationResult: return "\n".join(lines) -def load_database(db_path: str) -> dict | None: - try: - return _load_database(db_path) - except FileNotFoundError: - return None - except json.JSONDecodeError as e: - print(f"WARNING: corrupt database.json: {e}", file=sys.stderr) - return None - - def load_platform_hashes(platforms_dir: str) -> dict: """Load all known hashes from platform configs.""" known = {"sha1": set(), "md5": set(), "names": set()} @@ -241,7 +231,13 @@ def main(): if not files: parser.error("No files specified. Use --changed or provide file paths.") - db = load_database(args.db) + try: + db = load_database(args.db) + except FileNotFoundError: + db = None + except json.JSONDecodeError as e: + print(f"WARNING: corrupt database.json: {e}", file=sys.stderr) + db = None platform_hashes = load_platform_hashes(args.platforms_dir) results = [] diff --git a/scripts/validation.py b/scripts/validation.py new file mode 100644 index 00000000..9e4d40d0 --- /dev/null +++ b/scripts/validation.py @@ -0,0 +1,258 @@ +"""Emulator-level file validation logic. + +Builds validation indexes from emulator profiles, checks files against +emulator-declared constraints (size, hash, crypto), and formats ground +truth data for reporting. +""" + +from __future__ import annotations + +import os + +from common import compute_hashes + +# Validation types that require console-specific cryptographic keys. +# verify.py cannot reproduce these — size checks still apply if combined. +_CRYPTO_CHECKS = frozenset({"signature", "crypto"}) + +# All reproducible validation types. +_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"}) + + +def _parse_validation(validation: list | dict | None) -> list[str]: + """Extract the validation check list from a file's validation field. + + Handles both simple list and divergent (core/upstream) dict forms. + For dicts, uses the ``core`` key since RetroArch users run the core. + """ + if validation is None: + return [] + if isinstance(validation, list): + return validation + if isinstance(validation, dict): + return validation.get("core", []) + return [] + + +def _build_validation_index(profiles: dict) -> dict[str, dict]: + """Build per-filename validation rules from emulator profiles. + + Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None, + "max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None, + "adler32": str|None, "crypto_only": [str], "per_emulator": {emu: detail}}}. + + ``crypto_only`` lists validation types we cannot reproduce (signature, crypto) + so callers can report them as non-verifiable rather than silently skipping. + + ``per_emulator`` preserves each core's individual checks, source_ref, and + expected values before merging, for ground truth reporting. + + When multiple emulators reference the same file, merges checks (union). + Raises ValueError if two profiles declare conflicting values. + """ + index: dict[str, dict] = {} + for emu_name, profile in profiles.items(): + if profile.get("type") in ("launcher", "alias"): + continue + for f in profile.get("files", []): + fname = f.get("name", "") + if not fname: + continue + checks = _parse_validation(f.get("validation")) + if not checks: + continue + if fname not in index: + index[fname] = { + "checks": set(), "sizes": set(), + "min_size": None, "max_size": None, + "crc32": set(), "md5": set(), "sha1": set(), "sha256": set(), + "adler32": set(), "crypto_only": set(), + "emulators": set(), "per_emulator": {}, + } + index[fname]["emulators"].add(emu_name) + index[fname]["checks"].update(checks) + # Track non-reproducible crypto checks + index[fname]["crypto_only"].update( + c for c in checks if c in _CRYPTO_CHECKS + ) + # Size checks + if "size" in checks: + if f.get("size") is not None: + index[fname]["sizes"].add(f["size"]) + if f.get("min_size") is not None: + cur = index[fname]["min_size"] + index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"] + if f.get("max_size") is not None: + cur = index[fname]["max_size"] + index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"] + # Hash checks — collect all accepted hashes as sets (multiple valid + # versions of the same file, e.g. MT-32 ROM versions) + if "crc32" in checks and f.get("crc32"): + crc_val = f["crc32"] + crc_list = crc_val if isinstance(crc_val, list) else [crc_val] + for cv in crc_list: + norm = str(cv).lower() + if norm.startswith("0x"): + norm = norm[2:] + index[fname]["crc32"].add(norm) + for hash_type in ("md5", "sha1", "sha256"): + if hash_type in checks and f.get(hash_type): + val = f[hash_type] + if isinstance(val, list): + for h in val: + index[fname][hash_type].add(str(h).lower()) + else: + index[fname][hash_type].add(str(val).lower()) + # Adler32 — stored as known_hash_adler32 field (not in validation: list + # for Dolphin, but support it in both forms for future profiles) + adler_val = f.get("known_hash_adler32") or f.get("adler32") + if adler_val: + norm = adler_val.lower() + if norm.startswith("0x"): + norm = norm[2:] + index[fname]["adler32"].add(norm) + # Per-emulator ground truth detail + expected: dict = {} + if "size" in checks: + for key in ("size", "min_size", "max_size"): + if f.get(key) is not None: + expected[key] = f[key] + for hash_type in ("crc32", "md5", "sha1", "sha256"): + if hash_type in checks and f.get(hash_type): + expected[hash_type] = f[hash_type] + adler_val_pe = f.get("known_hash_adler32") or f.get("adler32") + if adler_val_pe: + expected["adler32"] = adler_val_pe + pe_entry = { + "checks": sorted(checks), + "source_ref": f.get("source_ref"), + "expected": expected, + } + pe = index[fname]["per_emulator"] + if emu_name in pe: + # Merge checks from multiple file entries for same emulator + existing = pe[emu_name] + merged_checks = sorted(set(existing["checks"]) | set(pe_entry["checks"])) + existing["checks"] = merged_checks + existing["expected"].update(pe_entry["expected"]) + if pe_entry["source_ref"] and not existing["source_ref"]: + existing["source_ref"] = pe_entry["source_ref"] + else: + pe[emu_name] = pe_entry + # Convert sets to sorted tuples/lists for determinism + for v in index.values(): + v["checks"] = sorted(v["checks"]) + v["crypto_only"] = sorted(v["crypto_only"]) + v["emulators"] = sorted(v["emulators"]) + # Keep hash sets as frozensets for O(1) lookup in check_file_validation + return index + + +def build_ground_truth(filename: str, validation_index: dict[str, dict]) -> list[dict]: + """Format per-emulator ground truth for a file from the validation index. + + Returns a sorted list of {emulator, checks, source_ref, expected} dicts. + Returns [] if the file has no emulator validation data. + """ + entry = validation_index.get(filename) + if not entry or not entry.get("per_emulator"): + return [] + result = [] + for emu_name in sorted(entry["per_emulator"]): + detail = entry["per_emulator"][emu_name] + result.append({ + "emulator": emu_name, + "checks": detail["checks"], + "source_ref": detail.get("source_ref"), + "expected": detail.get("expected", {}), + }) + return result + + +def check_file_validation( + local_path: str, filename: str, validation_index: dict[str, dict], + bios_dir: str = "bios", +) -> str | None: + """Check emulator-level validation on a resolved file. + + Supports: size (exact/min/max), crc32, md5, sha1, adler32, + signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256). + + Returns None if all checks pass or no validation applies. + Returns a reason string if a check fails. + """ + entry = validation_index.get(filename) + if not entry: + return None + checks = entry["checks"] + + # Size checks — sizes is a set of accepted values + if "size" in checks: + actual_size = os.path.getsize(local_path) + if entry["sizes"] and actual_size not in entry["sizes"]: + expected = ",".join(str(s) for s in sorted(entry["sizes"])) + return f"size mismatch: got {actual_size}, accepted [{expected}]" + if entry["min_size"] is not None and actual_size < entry["min_size"]: + return f"size too small: min {entry['min_size']}, got {actual_size}" + if entry["max_size"] is not None and actual_size > entry["max_size"]: + return f"size too large: max {entry['max_size']}, got {actual_size}" + + # Hash checks — compute once, reuse for all hash types. + # Each hash field is a set of accepted values (multiple valid ROM versions). + need_hashes = ( + any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256")) + or entry.get("adler32") + ) + if need_hashes: + hashes = compute_hashes(local_path) + if "crc32" in checks and entry["crc32"]: + if hashes["crc32"].lower() not in entry["crc32"]: + expected = ",".join(sorted(entry["crc32"])) + return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]" + if "md5" in checks and entry["md5"]: + if hashes["md5"].lower() not in entry["md5"]: + expected = ",".join(sorted(entry["md5"])) + return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]" + if "sha1" in checks and entry["sha1"]: + if hashes["sha1"].lower() not in entry["sha1"]: + expected = ",".join(sorted(entry["sha1"])) + return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]" + if "sha256" in checks and entry["sha256"]: + if hashes["sha256"].lower() not in entry["sha256"]: + expected = ",".join(sorted(entry["sha256"])) + return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]" + if entry["adler32"]: + if hashes["adler32"].lower() not in entry["adler32"]: + expected = ",".join(sorted(entry["adler32"])) + return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]" + + # Signature/crypto checks (3DS RSA, AES) + if entry["crypto_only"]: + from crypto_verify import check_crypto_validation + crypto_reason = check_crypto_validation(local_path, filename, bios_dir) + if crypto_reason: + return crypto_reason + + return None + + +def validate_cli_modes(args, mode_attrs: list[str]) -> None: + """Validate mutual exclusion of CLI mode arguments.""" + modes = sum(1 for attr in mode_attrs if getattr(args, attr, None)) + if modes == 0: + raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}") + if modes > 1: + raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}") + + +def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]: + """Filter file entries by libretro/standalone mode.""" + result = [] + for f in files: + fmode = f.get("mode", "") + if standalone and fmode == "libretro": + continue + if not standalone and fmode == "standalone": + continue + result.append(f) + return result diff --git a/scripts/verify.py b/scripts/verify.py index 43f48ca8..ca936a55 100644 --- a/scripts/verify.py +++ b/scripts/verify.py @@ -36,14 +36,16 @@ except ImportError: sys.path.insert(0, os.path.dirname(__file__)) from common import ( - _build_validation_index, _parse_validation, build_ground_truth, - build_zip_contents_index, check_file_validation, - check_inside_zip, compute_hashes, filter_files_by_mode, + build_zip_contents_index, check_inside_zip, compute_hashes, filter_systems_by_target, group_identical_platforms, list_emulator_profiles, list_system_ids, load_data_dir_registry, load_emulator_profiles, load_platform_config, md5sum, md5_composite, resolve_local_file, resolve_platform_cores, ) +from validation import ( + _build_validation_index, _parse_validation, build_ground_truth, + check_file_validation, filter_files_by_mode, +) DEFAULT_DB = "database.json" DEFAULT_PLATFORMS_DIR = "platforms" DEFAULT_EMULATORS_DIR = "emulators" diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 6603307d..819405fb 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -30,14 +30,15 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) import yaml from common import ( - _build_validation_index, build_zip_contents_index, check_file_validation, - check_inside_zip, compute_hashes, diff_platform_truth, - filter_files_by_mode, - generate_platform_truth, + build_zip_contents_index, check_inside_zip, compute_hashes, group_identical_platforms, load_emulator_profiles, load_platform_config, md5_composite, md5sum, parse_md5_list, resolve_local_file, resolve_platform_cores, safe_extract_zip, ) +from validation import ( + _build_validation_index, check_file_validation, filter_files_by_mode, +) +from truth import diff_platform_truth, generate_platform_truth from verify import ( Severity, Status, verify_platform, find_undeclared_files, find_exclusion_notes, verify_emulator, _effective_validation_label,