feat: re-profile 22 emulators, refactor validation to common.py

batch re-profiled nekop2 through pokemini. mupen64plus renamed to
mupen64plus_next. new profiles: nes, mupen64plus_next.
validation functions (_build_validation_index, check_file_validation)
consolidated in common.py — single source of truth for verify.py
and generate_pack.py. pipeline 100% consistent on all 6 platforms.
This commit is contained in:
Abdessamad Derraz
2026-03-24 22:31:22 +01:00
parent 94000bdaef
commit 0543165ed2
33 changed files with 1449 additions and 783 deletions

View File

@@ -79,12 +79,20 @@ def md5_composite(filepath: str | Path) -> str:
names = sorted(n for n in zf.namelist() if not n.endswith("/"))
h = hashlib.md5()
for name in names:
info = zf.getinfo(name)
if info.file_size > 512 * 1024 * 1024:
continue # skip oversized entries
h.update(zf.read(name))
result = h.hexdigest()
_md5_composite_cache[key] = result
return result
def parse_md5_list(raw: str) -> list[str]:
"""Parse comma-separated MD5 string into normalized lowercase list."""
return [m.strip().lower() for m in raw.split(",") if m.strip()] if raw else []
def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict:
"""Load a platform config with inheritance and shared group resolution.
@@ -162,6 +170,7 @@ def resolve_local_file(
db: dict,
zip_contents: dict | None = None,
dest_hint: str = "",
_depth: int = 0,
) -> tuple[str | None, str]:
"""Resolve a BIOS file to its local path using database.json.
@@ -293,13 +302,16 @@ def resolve_local_file(
return path, "zip_exact"
# MAME clone fallback: if a file was deduped, resolve via canonical
clone_map = _get_mame_clone_map()
canonical = clone_map.get(name)
if canonical and canonical != name:
canonical_entry = {"name": canonical}
result = resolve_local_file(canonical_entry, db, zip_contents, dest_hint)
if result[0]:
return result[0], "mame_clone"
if _depth < 3:
clone_map = _get_mame_clone_map()
canonical = clone_map.get(name)
if canonical and canonical != name:
canonical_entry = {"name": canonical}
result = resolve_local_file(
canonical_entry, db, zip_contents, dest_hint, _depth=_depth + 1,
)
if result[0]:
return result[0], "mame_clone"
return None, "not_found"
@@ -333,6 +345,9 @@ def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
with zipfile.ZipFile(container) as archive:
for fname in archive.namelist():
if fname.casefold() == file_name.casefold():
info = archive.getinfo(fname)
if info.file_size > 512 * 1024 * 1024:
return "error"
if expected_md5 == "":
return "ok"
with archive.open(fname) as entry:
@@ -365,10 +380,16 @@ def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024)
return index
_emulator_profiles_cache: dict[tuple[str, bool], dict[str, dict]] = {}
def load_emulator_profiles(
emulators_dir: str, skip_aliases: bool = True,
) -> dict[str, dict]:
"""Load all emulator YAML profiles from a directory."""
"""Load all emulator YAML profiles from a directory (cached)."""
cache_key = (os.path.realpath(emulators_dir), skip_aliases)
if cache_key in _emulator_profiles_cache:
return _emulator_profiles_cache[cache_key]
try:
import yaml
except ImportError:
@@ -385,6 +406,7 @@ def load_emulator_profiles(
if skip_aliases and profile.get("type") == "alias":
continue
profiles[f.stem] = profile
_emulator_profiles_cache[cache_key] = profiles
return profiles
@@ -461,6 +483,192 @@ def resolve_platform_cores(
}
def _parse_validation(validation: list | dict | None) -> list[str]:
"""Extract the validation check list from a file's validation field.
Handles both simple list and divergent (core/upstream) dict forms.
For dicts, uses the ``core`` key since RetroArch users run the core.
"""
if validation is None:
return []
if isinstance(validation, list):
return validation
if isinstance(validation, dict):
return validation.get("core", [])
return []
# Validation types that require console-specific cryptographic keys.
# verify.py cannot reproduce these — size checks still apply if combined.
_CRYPTO_CHECKS = frozenset({"signature", "crypto"})
# All reproducible validation types.
_HASH_CHECKS = frozenset({"crc32", "md5", "sha1", "adler32"})
def _build_validation_index(profiles: dict) -> dict[str, dict]:
"""Build per-filename validation rules from emulator profiles.
Returns {filename: {"checks": [str], "size": int|None, "min_size": int|None,
"max_size": int|None, "crc32": str|None, "md5": str|None, "sha1": str|None,
"adler32": str|None, "crypto_only": [str]}}.
``crypto_only`` lists validation types we cannot reproduce (signature, crypto)
so callers can report them as non-verifiable rather than silently skipping.
When multiple emulators reference the same file, merges checks (union).
Raises ValueError if two profiles declare conflicting values.
"""
index: dict[str, dict] = {}
sources: dict[str, dict[str, str]] = {}
for emu_name, profile in profiles.items():
if profile.get("type") in ("launcher", "alias"):
continue
for f in profile.get("files", []):
fname = f.get("name", "")
if not fname:
continue
checks = _parse_validation(f.get("validation"))
if not checks:
continue
if fname not in index:
index[fname] = {
"checks": set(), "sizes": set(),
"min_size": None, "max_size": None,
"crc32": set(), "md5": set(), "sha1": set(), "sha256": set(),
"adler32": set(), "crypto_only": set(),
}
sources[fname] = {}
index[fname]["checks"].update(checks)
# Track non-reproducible crypto checks
index[fname]["crypto_only"].update(
c for c in checks if c in _CRYPTO_CHECKS
)
# Size checks
if "size" in checks:
if f.get("size") is not None:
index[fname]["sizes"].add(f["size"])
if f.get("min_size") is not None:
cur = index[fname]["min_size"]
index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"]
if f.get("max_size") is not None:
cur = index[fname]["max_size"]
index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"]
# Hash checks — collect all accepted hashes as sets (multiple valid
# versions of the same file, e.g. MT-32 ROM versions)
if "crc32" in checks and f.get("crc32"):
norm = f["crc32"].lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["crc32"].add(norm)
for hash_type in ("md5", "sha1", "sha256"):
if hash_type in checks and f.get(hash_type):
index[fname][hash_type].add(f[hash_type].lower())
# Adler32 — stored as known_hash_adler32 field (not in validation: list
# for Dolphin, but support it in both forms for future profiles)
adler_val = f.get("known_hash_adler32") or f.get("adler32")
if adler_val:
norm = adler_val.lower()
if norm.startswith("0x"):
norm = norm[2:]
index[fname]["adler32"].add(norm)
# Convert sets to sorted tuples/lists for determinism
for v in index.values():
v["checks"] = sorted(v["checks"])
v["crypto_only"] = sorted(v["crypto_only"])
# Keep hash sets as frozensets for O(1) lookup in check_file_validation
return index
def check_file_validation(
local_path: str, filename: str, validation_index: dict[str, dict],
bios_dir: str = "bios",
) -> str | None:
"""Check emulator-level validation on a resolved file.
Supports: size (exact/min/max), crc32, md5, sha1, adler32,
signature (RSA-2048 PKCS1v15 SHA256), crypto (AES-128-CBC + SHA256).
Returns None if all checks pass or no validation applies.
Returns a reason string if a check fails.
"""
entry = validation_index.get(filename)
if not entry:
return None
checks = entry["checks"]
# Size checks — sizes is a set of accepted values
if "size" in checks:
actual_size = os.path.getsize(local_path)
if entry["sizes"] and actual_size not in entry["sizes"]:
expected = ",".join(str(s) for s in sorted(entry["sizes"]))
return f"size mismatch: got {actual_size}, accepted [{expected}]"
if entry["min_size"] is not None and actual_size < entry["min_size"]:
return f"size too small: min {entry['min_size']}, got {actual_size}"
if entry["max_size"] is not None and actual_size > entry["max_size"]:
return f"size too large: max {entry['max_size']}, got {actual_size}"
# Hash checks — compute once, reuse for all hash types.
# Each hash field is a set of accepted values (multiple valid ROM versions).
need_hashes = (
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256"))
or entry.get("adler32")
)
if need_hashes:
hashes = compute_hashes(local_path)
if "crc32" in checks and entry["crc32"]:
if hashes["crc32"].lower() not in entry["crc32"]:
expected = ",".join(sorted(entry["crc32"]))
return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]"
if "md5" in checks and entry["md5"]:
if hashes["md5"].lower() not in entry["md5"]:
expected = ",".join(sorted(entry["md5"]))
return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]"
if "sha1" in checks and entry["sha1"]:
if hashes["sha1"].lower() not in entry["sha1"]:
expected = ",".join(sorted(entry["sha1"]))
return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]"
if "sha256" in checks and entry["sha256"]:
if hashes["sha256"].lower() not in entry["sha256"]:
expected = ",".join(sorted(entry["sha256"]))
return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]"
if entry["adler32"]:
if hashes["adler32"].lower() not in entry["adler32"]:
expected = ",".join(sorted(entry["adler32"]))
return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]"
# Signature/crypto checks (3DS RSA, AES)
if entry["crypto_only"]:
from crypto_verify import check_crypto_validation
crypto_reason = check_crypto_validation(local_path, filename, bios_dir)
if crypto_reason:
return crypto_reason
return None
def validate_cli_modes(args, mode_attrs: list[str]) -> None:
"""Validate mutual exclusion of CLI mode arguments."""
modes = sum(1 for attr in mode_attrs if getattr(args, attr, None))
if modes == 0:
raise SystemExit(f"Specify one of: --{' --'.join(mode_attrs)}")
if modes > 1:
raise SystemExit(f"Options are mutually exclusive: --{' --'.join(mode_attrs)}")
def filter_files_by_mode(files: list[dict], standalone: bool) -> list[dict]:
"""Filter file entries by libretro/standalone mode."""
result = []
for f in files:
fmode = f.get("mode", "")
if standalone and fmode == "libretro":
continue
if not standalone and fmode == "standalone":
continue
result.append(f)
return result
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
"""Extract a ZIP file safely, preventing zip-slip path traversal."""
dest = os.path.realpath(dest_dir)
@@ -470,3 +678,31 @@ def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
if not member_path.startswith(dest + os.sep) and member_path != dest:
raise ValueError(f"Zip slip detected: {member.filename}")
zf.extract(member, dest)
def list_emulator_profiles(emulators_dir: str, skip_aliases: bool = True) -> None:
"""Print available emulator profiles."""
profiles = load_emulator_profiles(emulators_dir, skip_aliases=False)
for name in sorted(profiles):
p = profiles[name]
if p.get("type") in ("alias", "test"):
continue
display = p.get("emulator", name)
ptype = p.get("type", "libretro")
systems = ", ".join(p.get("systems", [])[:3])
more = "..." if len(p.get("systems", [])) > 3 else ""
print(f" {name:30s} {display:40s} [{ptype}] {systems}{more}")
def list_system_ids(emulators_dir: str) -> None:
"""Print available system IDs with emulator count."""
profiles = load_emulator_profiles(emulators_dir)
system_emus: dict[str, list[str]] = {}
for name, p in profiles.items():
if p.get("type") in ("alias", "test", "launcher"):
continue
for sys_id in p.get("systems", []):
system_emus.setdefault(sys_id, []).append(name)
for sys_id in sorted(system_emus):
count = len(system_emus[sys_id])
print(f" {sys_id:35s} ({count} emulator{'s' if count > 1 else ''})")