"""Shared utilities for retrobios scripts. Single source of truth for platform config loading, hash computation, and file resolution - eliminates DRY violations across scripts. """ from __future__ import annotations import hashlib import json import os import urllib.error import urllib.request import zipfile import zlib from pathlib import Path try: import yaml except ImportError: yaml = None def compute_hashes(filepath: str | Path) -> dict[str, str]: """Compute SHA1, MD5, SHA256, CRC32, Adler32 for a file.""" sha1 = hashlib.sha1() md5 = hashlib.md5() sha256 = hashlib.sha256() crc = 0 adler = 1 # zlib.adler32 initial value with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): sha1.update(chunk) md5.update(chunk) sha256.update(chunk) crc = zlib.crc32(chunk, crc) adler = zlib.adler32(chunk, adler) return { "sha1": sha1.hexdigest(), "md5": md5.hexdigest(), "sha256": sha256.hexdigest(), "crc32": format(crc & 0xFFFFFFFF, "08x"), "adler32": format(adler & 0xFFFFFFFF, "08x"), } def load_database(db_path: str) -> dict: """Load database.json and return parsed dict.""" with open(db_path) as f: return json.load(f) def md5sum(source: str | Path | object) -> str: """Compute MD5 of a file path or file-like object - matches Batocera's md5sum().""" h = hashlib.md5() if hasattr(source, "read"): for chunk in iter(lambda: source.read(65536), b""): h.update(chunk) else: with open(source, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() _md5_composite_cache: dict[str, str] = {} def md5_composite(filepath: str | Path) -> str: """Compute composite MD5 of a ZIP - matches Recalbox's Zip::Md5Composite(). Sorts filenames alphabetically, reads each file's contents in order, feeds everything into a single MD5 hasher. The result is independent of ZIP compression level or metadata. Results are cached per path. """ key = str(filepath) cached = _md5_composite_cache.get(key) if cached is not None: return cached with zipfile.ZipFile(filepath) as zf: names = sorted(n for n in zf.namelist() if not n.endswith("/")) h = hashlib.md5() for name in names: info = zf.getinfo(name) if info.file_size > 512 * 1024 * 1024: continue # skip oversized entries h.update(zf.read(name)) result = h.hexdigest() _md5_composite_cache[key] = result return result def parse_md5_list(raw: str) -> list[str]: """Parse comma-separated MD5 string into normalized lowercase list.""" return [m.strip().lower() for m in raw.split(",") if m.strip()] if raw else [] def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict: """Load a platform config with inheritance and shared group resolution. This is the SINGLE implementation used by generate_pack, generate_readme, verify, and auto_fetch. No other copy should exist. """ if yaml is None: raise ImportError("PyYAML required: pip install pyyaml") config_file = os.path.join(platforms_dir, f"{platform_name}.yml") if not os.path.exists(config_file): raise FileNotFoundError(f"Platform config not found: {config_file}") with open(config_file) as f: config = yaml.safe_load(f) or {} # Resolve inheritance if "inherits" in config: parent = load_platform_config(config["inherits"], platforms_dir) merged = {**parent} merged.update({k: v for k, v in config.items() if k not in ("inherits", "overrides")}) if "overrides" in config and "systems" in config["overrides"]: merged.setdefault("systems", {}) for sys_id, override in config["overrides"]["systems"].items(): if sys_id in merged["systems"]: merged["systems"][sys_id] = {**merged["systems"][sys_id], **override} else: merged["systems"][sys_id] = override config = merged # Resolve shared group includes (cached to avoid re-parsing per call) shared_path = os.path.join(platforms_dir, "_shared.yml") if os.path.exists(shared_path): if not hasattr(load_platform_config, "_shared_cache"): load_platform_config._shared_cache = {} cache_key = os.path.realpath(shared_path) if cache_key not in load_platform_config._shared_cache: with open(shared_path) as f: load_platform_config._shared_cache[cache_key] = yaml.safe_load(f) or {} shared = load_platform_config._shared_cache[cache_key] shared_groups = shared.get("shared_groups", {}) for system in config.get("systems", {}).values(): for group_name in system.get("includes", []): if group_name in shared_groups: existing = { (f.get("name"), f.get("destination", f.get("name"))) for f in system.get("files", []) } existing_lower = { f.get("destination", f.get("name", "")).lower() for f in system.get("files", []) } for gf in shared_groups[group_name]: key = (gf.get("name"), gf.get("destination", gf.get("name"))) dest_lower = gf.get("destination", gf.get("name", "")).lower() if key not in existing and dest_lower not in existing_lower: system.setdefault("files", []).append(gf) existing.add(key) return config def load_data_dir_registry(platforms_dir: str = "platforms") -> dict: """Load the data directory registry from _data_dirs.yml.""" registry_path = os.path.join(platforms_dir, "_data_dirs.yml") if not os.path.exists(registry_path): return {} with open(registry_path) as f: data = yaml.safe_load(f) or {} return data.get("data_directories", {}) def list_registered_platforms( platforms_dir: str = "platforms", include_archived: bool = False, ) -> list[str]: """List platforms registered in _registry.yml. Only registered platforms generate packs and appear in CI. Unregistered YAMLs (e.g., emulatorjs.yml) are base configs for inheritance. """ registry_path = os.path.join(platforms_dir, "_registry.yml") if not os.path.exists(registry_path): return [] with open(registry_path) as f: registry = yaml.safe_load(f) or {} platforms = [] for name, meta in sorted(registry.get("platforms", {}).items()): status = meta.get("status", "active") if status == "archived" and not include_archived: continue config_path = os.path.join(platforms_dir, meta.get("config", f"{name}.yml")) if os.path.exists(config_path): platforms.append(name) return platforms def load_target_config( platform_name: str, target: str, platforms_dir: str = "platforms", ) -> set[str]: """Load target config and return the set of core names for the given target. Resolves aliases from _overrides.yml, applies add_cores/remove_cores. Raises ValueError if target is unknown (with list of available targets). Raises FileNotFoundError if no target file exists for the platform. """ targets_dir = os.path.join(platforms_dir, "targets") target_file = os.path.join(targets_dir, f"{platform_name}.yml") if not os.path.exists(target_file): raise FileNotFoundError( f"No target config for platform '{platform_name}': {target_file}" ) with open(target_file) as f: data = yaml.safe_load(f) or {} targets = data.get("targets", {}) overrides_file = os.path.join(targets_dir, "_overrides.yml") overrides = {} if os.path.exists(overrides_file): with open(overrides_file) as f: all_overrides = yaml.safe_load(f) or {} overrides = all_overrides.get(platform_name, {}).get("targets", {}) alias_index: dict[str, str] = {} for tname in targets: alias_index[tname] = tname for alias in overrides.get(tname, {}).get("aliases", []): alias_index[alias] = tname canonical = alias_index.get(target) if canonical is None: available = sorted(targets.keys()) aliases = [] for tname, ovr in overrides.items(): for a in ovr.get("aliases", []): aliases.append(f"{a} -> {tname}") msg = f"Unknown target '{target}' for platform '{platform_name}'.\n" msg += f"Available targets: {', '.join(available)}" if aliases: msg += f"\nAliases: {', '.join(sorted(aliases))}" raise ValueError(msg) cores = set(str(c) for c in targets[canonical].get("cores", [])) ovr = overrides.get(canonical, {}) for c in ovr.get("add_cores", []): cores.add(str(c)) for c in ovr.get("remove_cores", []): cores.discard(str(c)) return cores def list_available_targets( platform_name: str, platforms_dir: str = "platforms", ) -> list[dict]: """List available targets for a platform with their aliases. Returns list of dicts with keys: name, architecture, core_count, aliases. Returns empty list if no target file exists. """ targets_dir = os.path.join(platforms_dir, "targets") target_file = os.path.join(targets_dir, f"{platform_name}.yml") if not os.path.exists(target_file): return [] with open(target_file) as f: data = yaml.safe_load(f) or {} overrides_file = os.path.join(targets_dir, "_overrides.yml") overrides = {} if os.path.exists(overrides_file): with open(overrides_file) as f: all_overrides = yaml.safe_load(f) or {} overrides = all_overrides.get(platform_name, {}).get("targets", {}) result = [] for tname, tdata in sorted(data.get("targets", {}).items()): aliases = overrides.get(tname, {}).get("aliases", []) result.append({ "name": tname, "architecture": tdata.get("architecture", ""), "core_count": len(tdata.get("cores", [])), "aliases": aliases, }) return result def resolve_local_file( file_entry: dict, db: dict, zip_contents: dict | None = None, dest_hint: str = "", _depth: int = 0, data_dir_registry: dict | None = None, ) -> tuple[str | None, str]: """Resolve a BIOS file to its local path using database.json. Single source of truth for file resolution, used by both verify.py and generate_pack.py. Does NOT handle storage tiers (external/user_provided) or release assets - callers handle those. dest_hint: optional destination path (e.g., "GC/USA/IPL.bin") used to disambiguate when multiple files share the same name. Matched against the by_path_suffix index built from the repo's directory structure. Returns (local_path, status) where status is one of: exact, zip_exact, hash_mismatch, not_found. """ sha1 = file_entry.get("sha1") md5_raw = file_entry.get("md5", "") name = file_entry.get("name", "") zipped_file = file_entry.get("zipped_file") aliases = file_entry.get("aliases", []) names_to_try = [name] + [a for a in aliases if a != name] # When name contains a path separator (e.g. "res/tilemap.bin"), also # try the basename since by_name indexes filenames without directories if "/" in name: name_base = name.rsplit("/", 1)[-1] if name_base and name_base not in names_to_try: names_to_try.append(name_base) # When dest_hint contains a path, also try its basename as a name # (handles emulator profiles where name: is descriptive and path: is # the actual filename, e.g. name: "MDA font ROM", path: "mda.rom") if dest_hint: hint_base = dest_hint.rsplit("/", 1)[-1] if "/" in dest_hint else dest_hint if hint_base and hint_base not in names_to_try: names_to_try.append(hint_base) md5_list = [m.strip().lower() for m in md5_raw.split(",") if m.strip()] if md5_raw else [] files_db = db.get("files", {}) by_md5 = db.get("indexes", {}).get("by_md5", {}) by_name = db.get("indexes", {}).get("by_name", {}) by_path_suffix = db.get("indexes", {}).get("by_path_suffix", {}) # 0. Path suffix exact match (for regional variants with same filename) if dest_hint and by_path_suffix: for match_sha1 in by_path_suffix.get(dest_hint, []): if match_sha1 in files_db: path = files_db[match_sha1]["path"] if os.path.exists(path): return path, "exact" # 1. SHA1 exact match if sha1 and sha1 in files_db: path = files_db[sha1]["path"] if os.path.exists(path): return path, "exact" # 2. MD5 direct lookup (skip for zipped_file: md5 is inner ROM, not container) # Guard: only accept if the found file's name matches the requested name # (or is a .variants/ derivative). Prevents cross-contamination when an # unrelated file happens to share the same MD5 in the index. _name_set = set(names_to_try) def _md5_name_ok(candidate_path: str) -> bool: bn = os.path.basename(candidate_path) if bn in _name_set: return True # .variants/ pattern: filename like "neogeo.zip.fc398ab4" return any(bn.startswith(n + ".") for n in _name_set) if md5_list and not zipped_file: for md5_candidate in md5_list: sha1_match = by_md5.get(md5_candidate) if sha1_match and sha1_match in files_db: path = files_db[sha1_match]["path"] if os.path.exists(path) and _md5_name_ok(path): return path, "md5_exact" if len(md5_candidate) < 32: for db_md5, db_sha1 in by_md5.items(): if db_md5.startswith(md5_candidate) and db_sha1 in files_db: path = files_db[db_sha1]["path"] if os.path.exists(path) and _md5_name_ok(path): return path, "md5_exact" # 3. No MD5 = any file with that name or alias (existence check) if not md5_list: candidates = [] for try_name in names_to_try: for match_sha1 in by_name.get(try_name, []): if match_sha1 in files_db: path = files_db[match_sha1]["path"] if os.path.exists(path) and path not in candidates: candidates.append(path) if candidates: if zipped_file: candidates = [p for p in candidates if ".zip" in os.path.basename(p)] primary = [p for p in candidates if "/.variants/" not in p] if primary or candidates: return (primary[0] if primary else candidates[0]), "exact" # 4. Name + alias fallback with md5_composite + direct MD5 per candidate md5_set = set(md5_list) candidates = [] seen_paths = set() for try_name in names_to_try: for match_sha1 in by_name.get(try_name, []): if match_sha1 in files_db: entry = files_db[match_sha1] path = entry["path"] if os.path.exists(path) and path not in seen_paths: seen_paths.add(path) candidates.append((path, entry.get("md5", ""))) if candidates: if zipped_file: candidates = [(p, m) for p, m in candidates if ".zip" in os.path.basename(p)] if md5_set: for path, db_md5 in candidates: if ".zip" in os.path.basename(path): try: composite = md5_composite(path).lower() if composite in md5_set: return path, "exact" except (zipfile.BadZipFile, OSError): pass if db_md5.lower() in md5_set: return path, "exact" # When zipped_file is set, only accept candidates that contain it if zipped_file: valid = [] for path, m in candidates: try: with zipfile.ZipFile(path) as zf: inner_names = {n.casefold() for n in zf.namelist()} if zipped_file.casefold() in inner_names: valid.append((path, m)) except (zipfile.BadZipFile, OSError): pass if valid: primary = [p for p, _ in valid if "/.variants/" not in p] return (primary[0] if primary else valid[0][0]), "hash_mismatch" # No candidate contains the zipped_file — fall through to step 5 else: primary = [p for p, _ in candidates if "/.variants/" not in p] return (primary[0] if primary else candidates[0][0]), "hash_mismatch" # 5. zipped_file content match via pre-built index (last resort: # matches inner ROM MD5 across ALL ZIPs in the repo, so only use # when name-based resolution failed entirely) if zipped_file and md5_list and zip_contents: for md5_candidate in md5_list: if md5_candidate in zip_contents: zip_sha1 = zip_contents[md5_candidate] if zip_sha1 in files_db: path = files_db[zip_sha1]["path"] if os.path.exists(path): return path, "zip_exact" # MAME clone fallback: if a file was deduped, resolve via canonical if _depth < 3: clone_map = _get_mame_clone_map() canonical = clone_map.get(name) if canonical and canonical != name: canonical_entry = {"name": canonical} result = resolve_local_file( canonical_entry, db, zip_contents, dest_hint, _depth=_depth + 1, data_dir_registry=data_dir_registry, ) if result[0]: return result[0], "mame_clone" # Data directory fallback: scan data/ caches for matching filename if data_dir_registry: for _dd_key, dd_entry in data_dir_registry.items(): cache_dir = dd_entry.get("local_cache", "") if not cache_dir or not os.path.isdir(cache_dir): continue for try_name in names_to_try: # Exact relative path candidate = os.path.join(cache_dir, try_name) if os.path.isfile(candidate): return candidate, "data_dir" # Basename walk: find file anywhere in cache tree basename_targets = { (n.rsplit("/", 1)[-1] if "/" in n else n) for n in names_to_try } for root, _dirs, fnames in os.walk(cache_dir): for fn in fnames: if fn in basename_targets: return os.path.join(root, fn), "data_dir" return None, "not_found" def _get_mame_clone_map() -> dict[str, str]: """Load and cache the MAME clone map (clone_name -> canonical_name).""" if not hasattr(_get_mame_clone_map, "_cache"): clone_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "_mame_clones.json", ) if os.path.exists(clone_path): import json as _json with open(clone_path) as f: data = _json.load(f) _get_mame_clone_map._cache = {} for canonical, info in data.items(): for clone in info.get("clones", []): _get_mame_clone_map._cache[clone] = canonical else: _get_mame_clone_map._cache = {} return _get_mame_clone_map._cache def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str: """Check a ROM inside a ZIP — replicates Batocera checkInsideZip(). Returns "ok", "untested", "not_in_zip", or "error". """ try: with zipfile.ZipFile(container) as archive: for fname in archive.namelist(): if fname.casefold() == file_name.casefold(): info = archive.getinfo(fname) if info.file_size > 512 * 1024 * 1024: return "error" if expected_md5 == "": return "ok" with archive.open(fname) as entry: actual = md5sum(entry) return "ok" if actual == expected_md5 else "untested" return "not_in_zip" except (zipfile.BadZipFile, OSError, KeyError): return "error" def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) -> dict: """Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files.""" index: dict[str, str] = {} for sha1, entry in db.get("files", {}).items(): path = entry["path"] if not path.endswith(".zip") or not os.path.exists(path): continue try: with zipfile.ZipFile(path, "r") as zf: for info in zf.infolist(): if info.is_dir() or info.file_size > max_entry_size: continue h = hashlib.md5() with zf.open(info.filename) as inner: for chunk in iter(lambda: inner.read(65536), b""): h.update(chunk) index[h.hexdigest()] = sha1 except (zipfile.BadZipFile, OSError): continue return index _emulator_profiles_cache: dict[tuple[str, bool], dict[str, dict]] = {} def load_emulator_profiles( emulators_dir: str, skip_aliases: bool = True, ) -> dict[str, dict]: """Load all emulator YAML profiles from a directory (cached).""" cache_key = (os.path.realpath(emulators_dir), skip_aliases) if cache_key in _emulator_profiles_cache: return _emulator_profiles_cache[cache_key] try: import yaml except ImportError: return {} profiles = {} emu_path = Path(emulators_dir) if not emu_path.exists(): return profiles for f in sorted(emu_path.glob("*.yml")): with open(f) as fh: profile = yaml.safe_load(fh) or {} if "emulator" not in profile: continue if skip_aliases and profile.get("type") == "alias": continue profiles[f.stem] = profile _emulator_profiles_cache[cache_key] = profiles return profiles def group_identical_platforms( platforms: list[str], platforms_dir: str, target_cores_cache: dict[str, set[str] | None] | None = None, ) -> list[tuple[list[str], str]]: """Group platforms that produce identical packs (same files + base_destination). Returns [(group_of_platform_names, representative), ...]. The representative is the root platform (one that does not inherit). """ fingerprints: dict[str, list[str]] = {} representatives: dict[str, str] = {} inherits: dict[str, bool] = {} for platform in platforms: try: raw_path = os.path.join(platforms_dir, f"{platform}.yml") with open(raw_path) as f: raw = yaml.safe_load(f) or {} inherits[platform] = "inherits" in raw config = load_platform_config(platform, platforms_dir) except FileNotFoundError: fingerprints.setdefault(platform, []).append(platform) representatives.setdefault(platform, platform) inherits[platform] = False continue base_dest = config.get("base_destination", "") entries = [] for sys_id, system in sorted(config.get("systems", {}).items()): for fe in system.get("files", []): dest = fe.get("destination", fe.get("name", "")) full_dest = f"{base_dest}/{dest}" if base_dest else dest sha1 = fe.get("sha1", "") md5 = fe.get("md5", "") entries.append(f"{full_dest}|{sha1}|{md5}") fp = hashlib.sha1("|".join(sorted(entries)).encode()).hexdigest() if target_cores_cache: tc = target_cores_cache.get(platform) if tc is not None: tc_str = "|".join(sorted(tc)) fp = hashlib.sha1(f"{fp}|{tc_str}".encode()).hexdigest() fingerprints.setdefault(fp, []).append(platform) # Prefer the root platform (no inherits) as representative if fp not in representatives or (not inherits[platform] and inherits.get(representatives[fp], False)): representatives[fp] = platform result = [] for fp, group in fingerprints.items(): rep = representatives[fp] ordered = [rep] + [p for p in group if p != rep] result.append((ordered, rep)) return result def resolve_platform_cores( config: dict, profiles: dict[str, dict], target_cores: set[str] | None = None, ) -> set[str]: """Resolve which emulator profiles are relevant for a platform. Resolution strategies (by priority): 1. cores: "all_libretro" -- all profiles with libretro in type 2. cores: [list] -- profiles whose dict key matches a core name 3. cores: absent -- fallback to systems intersection Alias profiles are always excluded (they point to another profile). If target_cores is provided, result is intersected with it. """ cores_config = config.get("cores") if cores_config == "all_libretro": result = { name for name, p in profiles.items() if "libretro" in p.get("type", "") and p.get("type") != "alias" } elif isinstance(cores_config, list): core_set = {str(c) for c in cores_config} core_to_profile: dict[str, str] = {} for name, p in profiles.items(): if p.get("type") == "alias": continue core_to_profile[name] = name for core_name in p.get("cores", []): core_to_profile[str(core_name)] = name result = { core_to_profile[c] for c in core_set if c in core_to_profile } else: # Fallback: system ID intersection with normalization norm_plat_systems = {_norm_system_id(s) for s in config.get("systems", {})} result = { name for name, p in profiles.items() if {_norm_system_id(s) for s in p.get("systems", [])} & norm_plat_systems and p.get("type") != "alias" } if target_cores is not None: # Build reverse index: upstream name -> profile key # Upstream sources (buildbot, es_systems) may use different names # than our profile keys (e.g., mednafen_psx vs beetle_psx). # The profiles' cores: field lists these alternate names. upstream_to_profile: dict[str, str] = {} for name, p in profiles.items(): upstream_to_profile[name] = name for alias in p.get("cores", []): upstream_to_profile[str(alias)] = name # Expand target_cores to profile keys expanded = {upstream_to_profile.get(c, c) for c in target_cores} result = result & expanded return result MANUFACTURER_PREFIXES = ( "acorn-", "apple-", "microsoft-", "nintendo-", "sony-", "sega-", "snk-", "panasonic-", "nec-", "epoch-", "mattel-", "fairchild-", "hartung-", "tiger-", "magnavox-", "philips-", "bandai-", "casio-", "coleco-", "commodore-", "sharp-", "sinclair-", "atari-", "sammy-", "gce-", "texas-instruments-", ) def derive_manufacturer(system_id: str, system_data: dict) -> str: """Derive manufacturer name for a system. Priority: explicit manufacturer field > system ID prefix > 'Other'. """ mfr = system_data.get("manufacturer", "") if mfr and mfr not in ("Various", "Other"): return mfr.split("|")[0].strip() s = system_id.lower().replace("_", "-") for prefix in MANUFACTURER_PREFIXES: if s.startswith(prefix): return prefix.rstrip("-").title() return "Other" def _norm_system_id(sid: str) -> str: """Normalize system ID for cross-platform matching. Strips manufacturer prefixes and separators so that platform-specific IDs (e.g., "xbox", "nintendo-wiiu") match profile IDs (e.g., "microsoft-xbox", "nintendo-wii-u"). """ s = sid.lower().replace("_", "-") for prefix in MANUFACTURER_PREFIXES: if s.startswith(prefix): s = s[len(prefix):] break return s.replace("-", "") def filter_systems_by_target( systems: dict[str, dict], profiles: dict[str, dict], target_cores: set[str] | None, platform_cores: set[str] | None = None, ) -> dict[str, dict]: """Filter platform systems to only those reachable by target cores. A system is reachable if at least one core that emulates it is available on the target. Only considers cores relevant to the platform (from platform_cores). Systems whose cores are all outside the platform's scope are kept (no information to exclude them). Returns the filtered systems dict (or all if no target). """ if target_cores is None: return systems # Build reverse index for target core name resolution upstream_to_profile: dict[str, str] = {} for name, p in profiles.items(): upstream_to_profile[name] = name for alias in p.get("cores", []): upstream_to_profile[str(alias)] = name expanded_target = {upstream_to_profile.get(c, c) for c in target_cores} _norm_sid = _norm_system_id # Build normalized system -> cores from ALL profiles norm_system_cores: dict[str, set[str]] = {} for name, p in profiles.items(): if p.get("type") == "alias": continue for sid in p.get("systems", []): norm_key = _norm_sid(sid) norm_system_cores.setdefault(norm_key, set()).add(name) # Platform-scoped mapping (for distinguishing "no info" from "known but off-target") norm_plat_system_cores: dict[str, set[str]] = {} if platform_cores is not None: for name in platform_cores: p = profiles.get(name, {}) for sid in p.get("systems", []): norm_key = _norm_sid(sid) norm_plat_system_cores.setdefault(norm_key, set()).add(name) filtered = {} for sys_id, sys_data in systems.items(): norm_key = _norm_sid(sys_id) all_cores = norm_system_cores.get(norm_key, set()) plat_cores_here = norm_plat_system_cores.get(norm_key, set()) if not all_cores and not plat_cores_here: # No profile maps to this system — keep it filtered[sys_id] = sys_data elif all_cores & expanded_target: # At least one core is on the target filtered[sys_id] = sys_data elif not plat_cores_here: # Platform resolution didn't find cores for this system — keep it filtered[sys_id] = sys_data # else: known cores exist but none are on the target — exclude return filtered def _parse_validation(validation: list | dict | None) -> list[str]: """Extract the validation check list from a file's validation field. Handles both simple list and divergent (core/upstream) dict forms. For dicts, uses the ``core`` key since RetroArch users run the core. """ if validation is None: return [] if isinstance(validation, list): return validation if isinstance(validation, dict): return validation.get("core", []) return [] # Validation types that require console-specific cryptographic keys. # verify.py cannot reproduce these — size checks still apply if combined. _CRYPTO_CHECKS = frozenset({"signature", "crypto"}) # All reproducible validation types. _HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"}) def _build_validation_index(profiles: dict) -> dict[str, dict]: """Build per-filename validation rules from emulator profiles. Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None, "max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None, "adler32": str|None, "crypto_only": [str], "per_emulator": {emu: detail}}}. ``crypto_only`` lists validation types we cannot reproduce (signature, crypto) so callers can report them as non-verifiable rather than silently skipping. ``per_emulator`` preserves each core's individual checks, source_ref, and expected values before merging, for ground truth reporting. When multiple emulators reference the same file, merges checks (union). Raises ValueError if two profiles declare conflicting values. """ index: dict[str, dict] = {} for emu_name, profile in profiles.items(): if profile.get("type") in ("launcher", "alias"): continue for f in profile.get("files", []): fname = f.get("name", "") if not fname: continue checks = _parse_validation(f.get("validation")) if not checks: continue if fname not in index: index[fname] = { "checks": set(), "sizes": set(), "min_size": None, "max_size": None, "crc32": set(), "md5": set(), "sha1": set(), "sha256": set(), "adler32": set(), "crypto_only": set(), "emulators": set(), "per_emulator": {}, } index[fname]["emulators"].add(emu_name) index[fname]["checks"].update(checks) # Track non-reproducible crypto checks index[fname]["crypto_only"].update( c for c in checks if c in _CRYPTO_CHECKS ) # Size checks if "size" in checks: if f.get("size") is not None: index[fname]["sizes"].add(f["size"]) if f.get("min_size") is not None: cur = index[fname]["min_size"] index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"] if f.get("max_size") is not None: cur = index[fname]["max_size"] index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"] # Hash checks — collect all accepted hashes as sets (multiple valid # versions of the same file, e.g. MT-32 ROM versions) if "crc32" in checks and f.get("crc32"): crc_val = f["crc32"] crc_list = crc_val if isinstance(crc_val, list) else [crc_val] for cv in crc_list: norm = str(cv).lower() if norm.startswith("0x"): norm = norm[2:] index[fname]["crc32"].add(norm) for hash_type in ("md5", "sha1", "sha256"): if hash_type in checks and f.get(hash_type): val = f[hash_type] if isinstance(val, list): for h in val: index[fname][hash_type].add(str(h).lower()) else: index[fname][hash_type].add(str(val).lower()) # Adler32 — stored as known_hash_adler32 field (not in validation: list # for Dolphin, but support it in both forms for future profiles) adler_val = f.get("known_hash_adler32") or f.get("adler32") if adler_val: norm = adler_val.lower() if norm.startswith("0x"): norm = norm[2:] index[fname]["adler32"].add(norm) # Per-emulator ground truth detail expected: dict = {} if "size" in checks: for key in ("size", "min_size", "max_size"): if f.get(key) is not None: expected[key] = f[key] for hash_type in ("crc32", "md5", "sha1", "sha256"): if hash_type in checks and f.get(hash_type): expected[hash_type] = f[hash_type] adler_val_pe = f.get("known_hash_adler32") or f.get("adler32") if adler_val_pe: expected["adler32"] = adler_val_pe pe_entry = { "checks": sorted(checks), "source_ref": f.get("source_ref"), "expected": expected, } pe = index[fname]["per_emulator"] if emu_name in pe: # Merge checks from multiple file entries for same emulator existing = pe[emu_name] merged_checks = sorted(set(existing["checks"]) | set(pe_entry["checks"])) existing["checks"] = merged_checks existing["expected"].update(pe_entry["expected"]) if pe_entry["source_ref"] and not existing["source_ref"]: existing["source_ref"] = pe_entry["source_ref"] else: pe[emu_name] = pe_entry # Convert sets to sorted tuples/lists for determinism for v in index.values(): v["checks"] = sorted(v["checks"]) v["crypto_only"] = sorted(v["crypto_only"]) v["emulators"] = sorted(v["emulators"]) # Keep hash sets as frozensets for O(1) lookup in check_file_validation return index def build_ground_truth(filename: str, validation_index: dict[str, dict]) -> list[dict]: """Format per-emulator ground truth for a file from the validation index. Returns a sorted list of {emulator, checks, source_ref, expected} dicts. Returns [] if the file has no emulator validation data. """ entry = validation_index.get(filename) if not entry or not entry.get("per_emulator"): return [] result = [] for emu_name in sorted(entry["per_emulator"]): detail = entry["per_emulator"][emu_name] result.append({ "emulator": emu_name, "checks": detail["checks"], "source_ref": detail.get("source_ref"), "expected": detail.get("expected", {}), }) return result def check_file_validation( local_path: str, filename: str, validation_index: dict[str, dict], bios_dir: str = "bios", ) -> str | None: """Check emulator-level validation on a resolved file. Supports: size (exact/min/max), crc32, md5, sha1, adler32, signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256). Returns None if all checks pass or no validation applies. Returns a reason string if a check fails. """ entry = validation_index.get(filename) if not entry: return None checks = entry["checks"] # Size checks — sizes is a set of accepted values if "size" in checks: actual_size = os.path.getsize(local_path) if entry["sizes"] and actual_size not in entry["sizes"]: expected = ",".join(str(s) for s in sorted(entry["sizes"])) return f"size mismatch: got {actual_size}, accepted [{expected}]" if entry["min_size"] is not None and actual_size < entry["min_size"]: return f"size too small: min {entry['min_size']}, got {actual_size}" if entry["max_size"] is not None and actual_size > entry["max_size"]: return f"size too large: max {entry['max_size']}, got {actual_size}" # Hash checks — compute once, reuse for all hash types. # Each hash field is a set of accepted values (multiple valid ROM versions). need_hashes = ( any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256")) or entry.get("adler32") ) if need_hashes: hashes = compute_hashes(local_path) if "crc32" in checks and entry["crc32"]: if hashes["crc32"].lower() not in entry["crc32"]: expected = ",".join(sorted(entry["crc32"])) return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]" if "md5" in checks and entry["md5"]: if hashes["md5"].lower() not in entry["md5"]: expected = ",".join(sorted(entry["md5"])) return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]" if "sha1" in checks and entry["sha1"]: if hashes["sha1"].lower() not in entry["sha1"]: expected = ",".join(sorted(entry["sha1"])) return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]" if "sha256" in checks and entry["sha256"]: if hashes["sha256"].lower() not in entry["sha256"]: expected = ",".join(sorted(entry["sha256"])) return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]" if entry["adler32"]: if hashes["adler32"].lower() not in entry["adler32"]: expected = ",".join(sorted(entry["adler32"])) return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]" # Signature/crypto checks (3DS RSA, AES) if entry["crypto_only"]: from crypto_verify import check_crypto_validation crypto_reason = check_crypto_validation(local_path, filename, bios_dir) if crypto_reason: return crypto_reason return None def validate_cli_modes(args, mode_attrs: list[str]) -> None: """Validate mutual exclusion of CLI mode arguments.""" modes = sum(1 for attr in mode_attrs if getattr(args, attr, None)) if modes == 0: raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}") if modes > 1: raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}") def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]: """Filter file entries by libretro/standalone mode.""" result = [] for f in files: fmode = f.get("mode", "") if standalone and fmode == "libretro": continue if not standalone and fmode == "standalone": continue result.append(f) return result LARGE_FILES_RELEASE = "large-files" LARGE_FILES_REPO = "Abdess/retrobios" LARGE_FILES_CACHE = ".cache/large" def fetch_large_file(name: str, dest_dir: str = LARGE_FILES_CACHE, expected_sha1: str = "", expected_md5: str = "") -> str | None: """Download a large file from the 'large-files' GitHub release if not cached.""" cached = os.path.join(dest_dir, name) if os.path.exists(cached): if expected_sha1 or expected_md5: hashes = compute_hashes(cached) if expected_sha1 and hashes["sha1"].lower() != expected_sha1.lower(): os.unlink(cached) elif expected_md5: md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()] if hashes["md5"].lower() not in md5_list: os.unlink(cached) else: return cached else: return cached else: return cached encoded_name = urllib.request.quote(name) url = f"https://github.com/{LARGE_FILES_REPO}/releases/download/{LARGE_FILES_RELEASE}/{encoded_name}" try: req = urllib.request.Request(url, headers={"User-Agent": "retrobios/1.0"}) with urllib.request.urlopen(req, timeout=300) as resp: os.makedirs(dest_dir, exist_ok=True) with open(cached, "wb") as f: while True: chunk = resp.read(65536) if not chunk: break f.write(chunk) except (urllib.error.URLError, urllib.error.HTTPError): return None if expected_sha1 or expected_md5: hashes = compute_hashes(cached) if expected_sha1 and hashes["sha1"].lower() != expected_sha1.lower(): os.unlink(cached) return None if expected_md5: md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()] if hashes["md5"].lower() not in md5_list: os.unlink(cached) return None return cached def safe_extract_zip(zip_path: str, dest_dir: str) -> None: """Extract a ZIP file safely, preventing zip-slip path traversal.""" dest = os.path.realpath(dest_dir) with zipfile.ZipFile(zip_path, "r") as zf: for member in zf.infolist(): member_path = os.path.realpath(os.path.join(dest, member.filename)) if not member_path.startswith(dest + os.sep) and member_path != dest: raise ValueError(f"Zip slip detected: {member.filename}") zf.extract(member, dest) def list_emulator_profiles(emulators_dir: str, skip_aliases: bool = True) -> None: """Print available emulator profiles.""" profiles = load_emulator_profiles(emulators_dir, skip_aliases=False) for name in sorted(profiles): p = profiles[name] if p.get("type") in ("alias", "test"): continue display = p.get("emulator", name) ptype = p.get("type", "libretro") systems = ", ".join(p.get("systems", [])[:3]) more = "..." if len(p.get("systems", [])) > 3 else "" print(f" {name:30s} {display:40s} [{ptype}] {systems}{more}") def list_system_ids(emulators_dir: str) -> None: """Print available system IDs with emulator count.""" profiles = load_emulator_profiles(emulators_dir) system_emus: dict[str, list[str]] = {} for name, p in profiles.items(): if p.get("type") in ("alias", "test", "launcher"): continue for sys_id in p.get("systems", []): system_emus.setdefault(sys_id, []).append(name) for sys_id in sorted(system_emus): count = len(system_emus[sys_id]) print(f" {sys_id:35s} ({count} emulator{'s' if count > 1 else ''})") def list_platform_system_ids(platform_name: str, platforms_dir: str) -> None: """Print system IDs from a platform's YAML config.""" config = load_platform_config(platform_name, platforms_dir) systems = config.get("systems", {}) for sys_id in sorted(systems): file_count = len(systems[sys_id].get("files", [])) mfr = systems[sys_id].get("manufacturer", "") mfr_display = f" [{mfr.split('|')[0]}]" if mfr else "" print(f" {sys_id:35s} ({file_count} file{'s' if file_count != 1 else ''}){mfr_display}") # --------------------------------------------------------------- # Truth generation — build ground-truth YAML from emulator profiles # --------------------------------------------------------------- def _determine_core_mode( emu_name: str, profile: dict, cores_config: str | list | None, standalone_set: set[str] | None, ) -> str: """Determine effective mode (libretro/standalone) for a resolved core.""" if cores_config == "all_libretro": return "libretro" if standalone_set is not None: profile_names = {emu_name} | {str(c) for c in profile.get("cores", [])} if profile_names & standalone_set: return "standalone" return "libretro" ptype = profile.get("type", "libretro") if "standalone" in ptype and "libretro" in ptype: return "both" if "standalone" in ptype: return "standalone" return "libretro" def _enrich_hashes(entry: dict, db: dict) -> None: """Fill missing hash fields from the database.""" sha1 = entry.get("sha1", "") md5 = entry.get("md5", "") record = None if sha1 and db.get("files"): record = db["files"].get(sha1) if record is None and md5: by_md5 = db.get("by_md5", {}) md5_str = md5 if isinstance(md5, str) else md5[0] if md5 else "" ref_sha1 = by_md5.get(md5_str.lower()) if md5_str else None if ref_sha1 and db.get("files"): record = db["files"].get(ref_sha1) if record is None: return for field in ("sha1", "md5", "sha256", "crc32"): if not entry.get(field) and record.get(field): entry[field] = record[field] def _merge_file_into_system( system: dict, file_entry: dict, emu_name: str, db: dict | None, ) -> None: """Merge a file entry into a system's file list, deduplicating by name.""" files = system.setdefault("files", []) name_lower = file_entry["name"].lower() existing = None for f in files: if f["name"].lower() == name_lower: existing = f break if existing is not None: existing["_cores"] = existing.get("_cores", set()) | {emu_name} sr = file_entry.get("source_ref") if sr is not None: sr_key = str(sr) if not isinstance(sr, str) else sr existing["_source_refs"] = existing.get("_source_refs", set()) | {sr_key} else: existing.setdefault("_source_refs", set()) if file_entry.get("required") and not existing.get("required"): existing["required"] = True for h in ("sha1", "md5", "sha256", "crc32"): theirs = file_entry.get(h, "") ours = existing.get(h, "") if theirs and ours and theirs.lower() != ours.lower(): import sys as _sys print( f"WARNING: hash conflict for {file_entry['name']} " f"({h}: {ours} vs {theirs}, core {emu_name})", file=_sys.stderr, ) elif theirs and not ours: existing[h] = theirs return entry: dict = {"name": file_entry["name"]} if file_entry.get("required") is not None: entry["required"] = file_entry["required"] for field in ("sha1", "md5", "sha256", "crc32", "size", "path", "description", "hle_fallback", "category", "note", "validation", "min_size", "max_size", "aliases"): val = file_entry.get(field) if val is not None: entry[field] = val entry["_cores"] = {emu_name} sr = file_entry.get("source_ref") if sr is not None: sr_key = str(sr) if not isinstance(sr, str) else sr entry["_source_refs"] = {sr_key} else: entry["_source_refs"] = set() if db: _enrich_hashes(entry, db) files.append(entry) def generate_platform_truth( platform_name: str, config: dict, registry_entry: dict, profiles: dict[str, dict], db: dict | None = None, target_cores: set[str] | None = None, ) -> dict: """Generate ground-truth system data for a platform from emulator profiles. Args: platform_name: platform identifier config: loaded platform config (via load_platform_config), has cores, systems, standalone_cores with inheritance resolved registry_entry: registry metadata for hash_type, verification_mode, etc. profiles: all loaded emulator profiles db: optional database for hash enrichment target_cores: optional hardware target core filter Returns a dict with platform metadata, systems, and per-file details including which cores reference each file. """ cores_config = config.get("cores") # Resolve standalone set for mode determination standalone_set: set[str] | None = None standalone_cores = config.get("standalone_cores") if isinstance(standalone_cores, list): standalone_set = {str(c) for c in standalone_cores} resolved = resolve_platform_cores(config, profiles, target_cores) # Build mapping: profile system ID -> platform system ID # Three strategies, tried in order: # 1. File-based: if the scraped platform already has this file, use its system # 2. Exact match: profile system ID == platform system ID # 3. Normalized match: strip manufacturer prefix + separators platform_sys_ids = set(config.get("systems", {}).keys()) # File→platform_system reverse index from scraped config file_to_plat_sys: dict[str, str] = {} for psid, sys_data in config.get("systems", {}).items(): for fe in sys_data.get("files", []): fname = fe.get("name", "").lower() if fname: file_to_plat_sys[fname] = psid for alias in fe.get("aliases", []): file_to_plat_sys[alias.lower()] = psid # Normalized ID → platform system ID norm_to_platform: dict[str, str] = {} for psid in platform_sys_ids: norm_to_platform[_norm_system_id(psid)] = psid def _map_sys_id(profile_sid: str, file_name: str = "") -> str: """Map a profile system ID to the platform's system ID.""" # 1. File-based lookup (handles composites and name mismatches) if file_name: plat_sys = file_to_plat_sys.get(file_name.lower()) if plat_sys: return plat_sys # 2. Exact match if profile_sid in platform_sys_ids: return profile_sid # 3. Normalized match normed = _norm_system_id(profile_sid) return norm_to_platform.get(normed, profile_sid) systems: dict[str, dict] = {} cores_profiled: set[str] = set() cores_unprofiled: set[str] = set() # Track which cores contribute to each system system_cores: dict[str, dict[str, set[str]]] = {} for emu_name in sorted(resolved): profile = profiles.get(emu_name) if not profile: cores_unprofiled.add(emu_name) continue cores_profiled.add(emu_name) mode = _determine_core_mode(emu_name, profile, cores_config, standalone_set) raw_files = profile.get("files", []) if mode == "both": filtered = raw_files else: filtered = filter_files_by_mode(raw_files, standalone=(mode == "standalone")) for fe in filtered: profile_sid = fe.get("system", "") if not profile_sid: sys_ids = profile.get("systems", []) profile_sid = sys_ids[0] if sys_ids else "unknown" sys_id = _map_sys_id(profile_sid, fe.get("name", "")) system = systems.setdefault(sys_id, {}) _merge_file_into_system(system, fe, emu_name, db) # Track core contribution per system sys_cov = system_cores.setdefault(sys_id, { "profiled": set(), "unprofiled": set(), }) sys_cov["profiled"].add(emu_name) # Track unprofiled cores per system based on profile system lists for emu_name in cores_unprofiled: for sys_id in systems: sys_cov = system_cores.setdefault(sys_id, { "profiled": set(), "unprofiled": set(), }) sys_cov["unprofiled"].add(emu_name) # Convert sets to sorted lists for serialization for sys_id, sys_data in systems.items(): for fe in sys_data.get("files", []): fe["_cores"] = sorted(fe.get("_cores", set())) fe["_source_refs"] = sorted(fe.get("_source_refs", set())) # Add per-system coverage cov = system_cores.get(sys_id, {}) sys_data["_coverage"] = { "cores_profiled": sorted(cov.get("profiled", set())), "cores_unprofiled": sorted(cov.get("unprofiled", set())), } return { "platform": platform_name, "generated": True, "systems": systems, "_coverage": { "cores_resolved": len(resolved), "cores_profiled": len(cores_profiled), "cores_unprofiled": sorted(cores_unprofiled), }, } # ------------------------------------------------------------------- # Platform truth diffing # ------------------------------------------------------------------- def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict: """Compare files between truth and scraped for a single system.""" # Build truth index: name.lower() -> entry, alias.lower() -> entry truth_index: dict[str, dict] = {} for fe in truth_sys.get("files", []): truth_index[fe["name"].lower()] = fe for alias in fe.get("aliases", []): truth_index[alias.lower()] = fe # Build scraped index: name.lower() -> entry scraped_index: dict[str, dict] = {} for fe in scraped_sys.get("files", []): scraped_index[fe["name"].lower()] = fe missing: list[dict] = [] hash_mismatch: list[dict] = [] required_mismatch: list[dict] = [] extra_phantom: list[dict] = [] extra_unprofiled: list[dict] = [] matched_truth_names: set[str] = set() # Compare scraped files against truth for s_key, s_entry in scraped_index.items(): t_entry = truth_index.get(s_key) if t_entry is None: continue matched_truth_names.add(t_entry["name"].lower()) # Hash comparison for h in ("sha1", "md5", "crc32"): t_hash = t_entry.get(h, "") s_hash = s_entry.get(h, "") if not t_hash or not s_hash: continue # Normalize to list for multi-hash support t_list = t_hash if isinstance(t_hash, list) else [t_hash] s_list = s_hash if isinstance(s_hash, list) else [s_hash] t_set = {v.lower() for v in t_list} s_set = {v.lower() for v in s_list} if not t_set & s_set: hash_mismatch.append({ "name": s_entry["name"], "hash_type": h, f"truth_{h}": t_hash, f"scraped_{h}": s_hash, "truth_cores": list(t_entry.get("_cores", [])), }) break # Required mismatch t_req = t_entry.get("required") s_req = s_entry.get("required") if t_req is not None and s_req is not None and t_req != s_req: required_mismatch.append({ "name": s_entry["name"], "truth_required": t_req, "scraped_required": s_req, }) # Truth files not matched -> missing for fe in truth_sys.get("files", []): if fe["name"].lower() not in matched_truth_names: missing.append({ "name": fe["name"], "cores": list(fe.get("_cores", [])), "source_refs": list(fe.get("_source_refs", [])), }) # Scraped files not in truth -> extra coverage = truth_sys.get("_coverage", {}) has_unprofiled = bool(coverage.get("cores_unprofiled")) for s_key, s_entry in scraped_index.items(): if s_key not in truth_index: entry = {"name": s_entry["name"]} if has_unprofiled: extra_unprofiled.append(entry) else: extra_phantom.append(entry) result: dict = {} if missing: result["missing"] = missing if hash_mismatch: result["hash_mismatch"] = hash_mismatch if required_mismatch: result["required_mismatch"] = required_mismatch if extra_phantom: result["extra_phantom"] = extra_phantom if extra_unprofiled: result["extra_unprofiled"] = extra_unprofiled return result def _has_divergences(sys_div: dict) -> bool: """Check if a system divergence dict contains any actual divergences.""" return bool(sys_div) def _update_summary(summary: dict, sys_div: dict) -> None: """Update summary counters from a system divergence dict.""" summary["total_missing"] += len(sys_div.get("missing", [])) summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", [])) summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", [])) summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", [])) summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", [])) def diff_platform_truth(truth: dict, scraped: dict) -> dict: """Compare truth YAML against scraped YAML, returning divergences. System IDs are matched using normalized forms (via _norm_system_id) to handle naming differences between emulator profiles and scraped platforms (e.g. 'sega-game-gear' vs 'sega-gamegear'). """ truth_systems = truth.get("systems", {}) scraped_systems = scraped.get("systems", {}) summary = { "systems_compared": 0, "systems_fully_covered": 0, "systems_partially_covered": 0, "systems_uncovered": 0, "total_missing": 0, "total_extra_phantom": 0, "total_extra_unprofiled": 0, "total_hash_mismatch": 0, "total_required_mismatch": 0, } divergences: dict[str, dict] = {} uncovered_systems: list[str] = [] # Build normalized-ID lookup for truth systems norm_to_truth: dict[str, str] = {} for sid in truth_systems: norm_to_truth[_norm_system_id(sid)] = sid # Match scraped systems to truth via normalized IDs matched_truth: set[str] = set() for s_sid in sorted(scraped_systems): norm = _norm_system_id(s_sid) t_sid = norm_to_truth.get(norm) if t_sid is None: # Also try exact match (in case normalization is lossy) if s_sid in truth_systems: t_sid = s_sid else: uncovered_systems.append(s_sid) summary["systems_uncovered"] += 1 continue matched_truth.add(t_sid) summary["systems_compared"] += 1 sys_div = _diff_system(truth_systems[t_sid], scraped_systems[s_sid]) if _has_divergences(sys_div): divergences[s_sid] = sys_div _update_summary(summary, sys_div) summary["systems_partially_covered"] += 1 else: summary["systems_fully_covered"] += 1 # Truth systems not matched by any scraped system — all files missing for t_sid in sorted(truth_systems): if t_sid in matched_truth: continue summary["systems_compared"] += 1 sys_div = _diff_system(truth_systems[t_sid], {"files": []}) if _has_divergences(sys_div): divergences[t_sid] = sys_div _update_summary(summary, sys_div) summary["systems_partially_covered"] += 1 else: summary["systems_fully_covered"] += 1 result: dict = {"summary": summary} if divergences: result["divergences"] = divergences if uncovered_systems: result["uncovered_systems"] = uncovered_systems return result