Files
libretro/scripts/truth.py
2026-03-30 07:53:59 +02:00

494 lines
18 KiB
Python

"""Platform truth generation and diffing.
Generates ground-truth YAML from emulator profiles for gap analysis,
and diffs truth against scraped platform data to find divergences.
"""
from __future__ import annotations
import sys
from common import _norm_system_id, resolve_platform_cores
from validation import filter_files_by_mode
def _determine_core_mode(
emu_name: str, profile: dict,
cores_config: str | list | None,
standalone_set: set[str] | None,
) -> str:
"""Determine effective mode (libretro/standalone) for a resolved core."""
if cores_config == "all_libretro":
return "libretro"
if standalone_set is not None:
profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])}
if profile_names & standalone_set:
return "standalone"
return "libretro"
ptype = profile.get("type", "libretro")
if "standalone" in ptype and "libretro" in ptype:
return "both"
if "standalone" in ptype:
return "standalone"
return "libretro"
def _enrich_hashes(entry: dict, db: dict) -> None:
"""Fill missing hash fields from the database."""
sha1 = entry.get("sha1", "")
md5 = entry.get("md5", "")
# Hashes can be lists (multi-hash) — use first string value
if isinstance(sha1, list):
sha1 = sha1[0] if sha1 else ""
if isinstance(md5, list):
md5 = md5[0] if md5 else ""
record = None
if sha1 and isinstance(sha1, str) and db.get("files"):
record = db["files"].get(sha1)
if record is None and md5:
by_md5 = db.get("by_md5", {})
md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else ""
ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None
if ref_sha1 and db.get("files"):
record = db["files"].get(ref_sha1)
if record is None:
return
for field in ("sha1", "md5", "sha256", "crc32"):
if not entry.get(field) and record.get(field):
entry[field] = record[field]
def _merge_file_into_system(
system: dict, file_entry: dict, emu_name: str, db: dict | None,
) -> None:
"""Merge a file entry into a system's file list, deduplicating by name."""
files = system.setdefault("files", [])
name_lower = file_entry["name"].lower()
existing = None
for f in files:
if f["name"].lower() == name_lower:
existing = f
break
if existing is not None:
existing["_cores"] = existing.get("_cores", set()) | {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key}
else:
existing.setdefault("_source_refs", set())
if file_entry.get("required") and not existing.get("required"):
existing["required"] = True
for h in ("sha1", "md5", "sha256", "crc32"):
theirs = file_entry.get(h, "")
ours = existing.get(h, "")
if theirs and ours and theirs.lower() != ours.lower():
print(
f"WARNING: hash conflict for {file_entry['name']} "
f"({h}: {ours} vs {theirs}, core {emu_name})",
file=sys.stderr,
)
elif theirs and not ours:
existing[h] = theirs
return
entry: dict = {"name": file_entry["name"]}
if file_entry.get("required") is not None:
entry["required"] = file_entry["required"]
for field in ("sha1", "md5", "sha256", "crc32", "size", "path",
"description", "hle_fallback", "category", "note",
"validation", "min_size", "max_size", "aliases"):
val = file_entry.get(field)
if val is not None:
entry[field] = val
entry["_cores"] = {emu_name}
sr = file_entry.get("source_ref")
if sr is not None:
sr_key = str(sr) if not isinstance(sr, str) else sr
entry["_source_refs"] = {sr_key}
else:
entry["_source_refs"] = set()
if db:
_enrich_hashes(entry, db)
files.append(entry)
def generate_platform_truth(
platform_name: str,
config: dict,
registry_entry: dict,
profiles: dict[str, dict],
db: dict | None = None,
target_cores: set[str] | None = None,
) -> dict:
"""Generate ground-truth system data for a platform from emulator profiles.
Args:
platform_name: platform identifier
config: loaded platform config (via load_platform_config), has cores,
systems, standalone_cores with inheritance resolved
registry_entry: registry metadata for hash_type, verification_mode, etc.
profiles: all loaded emulator profiles
db: optional database for hash enrichment
target_cores: optional hardware target core filter
Returns a dict with platform metadata, systems, and per-file details
including which cores reference each file.
"""
cores_config = config.get("cores")
# Resolve standalone set for mode determination
standalone_set: set[str] | None = None
standalone_cores = config.get("standalone_cores")
if isinstance(standalone_cores, list):
standalone_set = {str(c) for c in standalone_cores}
resolved = resolve_platform_cores(config, profiles, target_cores)
# Build mapping: profile system ID -> platform system ID
# Three strategies, tried in order:
# 1. File-based: if the scraped platform already has this file, use its system
# 2. Exact match: profile system ID == platform system ID
# 3. Normalized match: strip manufacturer prefix + separators
platform_sys_ids = set(config.get("systems", {}).keys())
# File->platform_system reverse index from scraped config
file_to_plat_sys: dict[str, str] = {}
for psid, sys_data in config.get("systems", {}).items():
for fe in sys_data.get("files", []):
fname = fe.get("name", "").lower()
if fname:
file_to_plat_sys[fname] = psid
for alias in fe.get("aliases", []):
file_to_plat_sys[alias.lower()] = psid
# Normalized ID -> platform system ID
norm_to_platform: dict[str, str] = {}
for psid in platform_sys_ids:
norm_to_platform[_norm_system_id(psid)] = psid
def _map_sys_id(profile_sid: str, file_name: str = "") -> str:
"""Map a profile system ID to the platform's system ID."""
# 1. File-based lookup (handles composites and name mismatches)
if file_name:
plat_sys = file_to_plat_sys.get(file_name.lower())
if plat_sys:
return plat_sys
# 2. Exact match
if profile_sid in platform_sys_ids:
return profile_sid
# 3. Normalized match
normed = _norm_system_id(profile_sid)
return norm_to_platform.get(normed, profile_sid)
systems: dict[str, dict] = {}
cores_profiled: set[str] = set()
cores_unprofiled: set[str] = set()
# Track which cores contribute to each system
system_cores: dict[str, dict[str, set[str]]] = {}
for emu_name in sorted(resolved):
profile = profiles.get(emu_name)
if not profile:
cores_unprofiled.add(emu_name)
continue
cores_profiled.add(emu_name)
mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set)
raw_files = profile.get("files", [])
if mode == "both":
filtered = raw_files
else:
filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone"))
for fe in filtered:
profile_sid = fe.get("system", "")
if not profile_sid:
sys_ids = profile.get("systems", [])
profile_sid = sys_ids[0] if sys_ids else "unknown"
sys_id = _map_sys_id(profile_sid, fe.get("name", ""))
system = systems.setdefault(sys_id, {})
_merge_file_into_system(system, fe, emu_name, db)
# Track core contribution per system
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Ensure all systems of resolved cores have entries (even with 0 files).
# This documents that the system is covered -the core was analyzed and
# needs no external files for this system.
for emu_name in cores_profiled:
profile = profiles[emu_name]
for prof_sid in profile.get("systems", []):
sys_id = _map_sys_id(prof_sid)
systems.setdefault(sys_id, {})
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["profiled"].add(emu_name)
# Track unprofiled cores per system based on profile system lists
for emu_name in cores_unprofiled:
for sys_id in systems:
sys_cov = system_cores.setdefault(sys_id, {
"profiled": set(), "unprofiled": set(),
})
sys_cov["unprofiled"].add(emu_name)
# Convert sets to sorted lists for serialization
for sys_id, sys_data in systems.items():
for fe in sys_data.get("files", []):
fe["_cores"] = sorted(fe.get("_cores", set()))
fe["_source_refs"] = sorted(fe.get("_source_refs", set()))
# Add per-system coverage
cov = system_cores.get(sys_id, {})
sys_data["_coverage"] = {
"cores_profiled": sorted(cov.get("profiled", set())),
"cores_unprofiled": sorted(cov.get("unprofiled", set())),
}
return {
"platform": platform_name,
"generated": True,
"systems": systems,
"_coverage": {
"cores_resolved": len(resolved),
"cores_profiled": len(cores_profiled),
"cores_unprofiled": sorted(cores_unprofiled),
},
}
# Platform truth diffing
def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict:
"""Compare files between truth and scraped for a single system."""
# Build truth index: name.lower() -> entry, alias.lower() -> entry
truth_index: dict[str, dict] = {}
for fe in truth_sys.get("files", []):
truth_index[fe["name"].lower()] = fe
for alias in fe.get("aliases", []):
truth_index[alias.lower()] = fe
# Build scraped index: name.lower() -> entry
scraped_index: dict[str, dict] = {}
for fe in scraped_sys.get("files", []):
scraped_index[fe["name"].lower()] = fe
missing: list[dict] = []
hash_mismatch: list[dict] = []
required_mismatch: list[dict] = []
extra_phantom: list[dict] = []
extra_unprofiled: list[dict] = []
matched_truth_names: set[str] = set()
# Compare scraped files against truth
for s_key, s_entry in scraped_index.items():
t_entry = truth_index.get(s_key)
if t_entry is None:
continue
matched_truth_names.add(t_entry["name"].lower())
# Hash comparison
for h in ("sha1", "md5", "crc32"):
t_hash = t_entry.get(h, "")
s_hash = s_entry.get(h, "")
if not t_hash or not s_hash:
continue
# Normalize to list for multi-hash support
t_list = t_hash if isinstance(t_hash, list) else [t_hash]
s_list = s_hash if isinstance(s_hash, list) else [s_hash]
t_set = {v.lower() for v in t_list}
s_set = {v.lower() for v in s_list}
if not t_set & s_set:
hash_mismatch.append({
"name": s_entry["name"],
"hash_type": h,
f"truth_{h}": t_hash,
f"scraped_{h}": s_hash,
"truth_cores": list(t_entry.get("_cores", [])),
})
break
# Required mismatch
t_req = t_entry.get("required")
s_req = s_entry.get("required")
if t_req is not None and s_req is not None and t_req != s_req:
required_mismatch.append({
"name": s_entry["name"],
"truth_required": t_req,
"scraped_required": s_req,
})
# Collect unmatched files from both sides
unmatched_truth = [
fe for fe in truth_sys.get("files", [])
if fe["name"].lower() not in matched_truth_names
]
unmatched_scraped = {
s_key: s_entry for s_key, s_entry in scraped_index.items()
if s_key not in truth_index
}
# Hash-based fallback: detect platform renames (e.g. Batocera ROM → ROM1)
# If an unmatched scraped file shares a hash with an unmatched truth file,
# it's the same file under a different name — a platform rename, not a gap.
rename_matched_truth: set[str] = set()
rename_matched_scraped: set[str] = set()
if unmatched_truth and unmatched_scraped:
# Build hash → truth file index for unmatched truth files
truth_hash_index: dict[str, dict] = {}
for fe in unmatched_truth:
for h in ("sha1", "md5", "crc32"):
val = fe.get(h)
if val and isinstance(val, str):
truth_hash_index[val.lower()] = fe
for s_key, s_entry in unmatched_scraped.items():
for h in ("sha1", "md5", "crc32"):
s_val = s_entry.get(h)
if not s_val or not isinstance(s_val, str):
continue
t_entry = truth_hash_index.get(s_val.lower())
if t_entry is not None:
# Rename detected — count as matched
rename_matched_truth.add(t_entry["name"].lower())
rename_matched_scraped.add(s_key)
break
# Truth files not matched (by name, alias, or hash) -> missing
for fe in unmatched_truth:
if fe["name"].lower() not in rename_matched_truth:
missing.append({
"name": fe["name"],
"cores": list(fe.get("_cores", [])),
"source_refs": list(fe.get("_source_refs", [])),
})
# Scraped files not in truth -> extra
coverage = truth_sys.get("_coverage", {})
has_unprofiled = bool(coverage.get("cores_unprofiled"))
for s_key, s_entry in unmatched_scraped.items():
if s_key in rename_matched_scraped:
continue
entry = {"name": s_entry["name"]}
if has_unprofiled:
extra_unprofiled.append(entry)
else:
extra_phantom.append(entry)
result: dict = {}
if missing:
result["missing"] = missing
if hash_mismatch:
result["hash_mismatch"] = hash_mismatch
if required_mismatch:
result["required_mismatch"] = required_mismatch
if extra_phantom:
result["extra_phantom"] = extra_phantom
if extra_unprofiled:
result["extra_unprofiled"] = extra_unprofiled
return result
def _has_divergences(sys_div: dict) -> bool:
"""Check if a system divergence dict contains any actual divergences."""
return bool(sys_div)
def _update_summary(summary: dict, sys_div: dict) -> None:
"""Update summary counters from a system divergence dict."""
summary["total_missing"] += len(sys_div.get("missing", []))
summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", []))
summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", []))
summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", []))
summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", []))
def diff_platform_truth(truth: dict, scraped: dict) -> dict:
"""Compare truth YAML against scraped YAML, returning divergences.
System IDs are matched using normalized forms (via _norm_system_id) to
handle naming differences between emulator profiles and scraped platforms
(e.g. 'sega-game-gear' vs 'sega-gamegear').
"""
truth_systems = truth.get("systems", {})
scraped_systems = scraped.get("systems", {})
summary = {
"systems_compared": 0,
"systems_fully_covered": 0,
"systems_partially_covered": 0,
"systems_uncovered": 0,
"total_missing": 0,
"total_extra_phantom": 0,
"total_extra_unprofiled": 0,
"total_hash_mismatch": 0,
"total_required_mismatch": 0,
}
divergences: dict[str, dict] = {}
uncovered_systems: list[str] = []
# Build normalized-ID lookup for truth systems
norm_to_truth: dict[str, str] = {}
for sid in truth_systems:
norm_to_truth[_norm_system_id(sid)] = sid
# Match scraped systems to truth via normalized IDs
matched_truth: set[str] = set()
for s_sid in sorted(scraped_systems):
norm = _norm_system_id(s_sid)
t_sid = norm_to_truth.get(norm)
if t_sid is None:
# Also try exact match (in case normalization is lossy)
if s_sid in truth_systems:
t_sid = s_sid
else:
uncovered_systems.append(s_sid)
summary["systems_uncovered"] += 1
continue
matched_truth.add(t_sid)
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid])
if _has_divergences(sys_div):
divergences[s_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
# Truth systems not matched by any scraped system -all files missing
for t_sid in sorted(truth_systems):
if t_sid in matched_truth:
continue
summary["systems_compared"] += 1
sys_div = _diff_system(truth_systems[t_sid], {"files": []})
if _has_divergences(sys_div):
divergences[t_sid] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
result: dict = {"summary": summary}
if divergences:
result["divergences"] = divergences
if uncovered_systems:
result["uncovered_systems"] = uncovered_systems
return result