diff --git a/scripts/common.py b/scripts/common.py index 47a7e1fb..7b662c4c 100644 --- a/scripts/common.py +++ b/scripts/common.py @@ -10,6 +10,7 @@ import hashlib import json import os import urllib.error +import urllib.parse import urllib.request import zipfile import zlib @@ -32,27 +33,46 @@ def require_yaml(): sys.exit(1) -def compute_hashes(filepath: str | Path) -> dict[str, str]: - """Compute SHA1, MD5, SHA256, CRC32, Adler32 for a file.""" - sha1 = hashlib.sha1() - md5 = hashlib.md5() - sha256 = hashlib.sha256() +_ALL_ALGORITHMS = frozenset({"sha1", "md5", "sha256", "crc32", "adler32"}) + + +def compute_hashes( + filepath: str | Path, + algorithms: frozenset[str] | None = None, +) -> dict[str, str]: + """Compute file hashes. Pass *algorithms* to limit which are computed.""" + algos = algorithms or _ALL_ALGORITHMS + sha1 = hashlib.sha1() if "sha1" in algos else None + md5 = hashlib.md5() if "md5" in algos else None + sha256 = hashlib.sha256() if "sha256" in algos else None + do_crc = "crc32" in algos + do_adler = "adler32" in algos crc = 0 adler = 1 # zlib.adler32 initial value with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): - sha1.update(chunk) - md5.update(chunk) - sha256.update(chunk) - crc = zlib.crc32(chunk, crc) - adler = zlib.adler32(chunk, adler) - return { - "sha1": sha1.hexdigest(), - "md5": md5.hexdigest(), - "sha256": sha256.hexdigest(), - "crc32": format(crc & 0xFFFFFFFF, "08x"), - "adler32": format(adler & 0xFFFFFFFF, "08x"), - } + if sha1: + sha1.update(chunk) + if md5: + md5.update(chunk) + if sha256: + sha256.update(chunk) + if do_crc: + crc = zlib.crc32(chunk, crc) + if do_adler: + adler = zlib.adler32(chunk, adler) + result: dict[str, str] = {} + if sha1: + result["sha1"] = sha1.hexdigest() + if md5: + result["md5"] = md5.hexdigest() + if sha256: + result["sha256"] = sha256.hexdigest() + if do_crc: + result["crc32"] = format(crc & 0xFFFFFFFF, "08x") + if do_adler: + result["adler32"] = format(adler & 0xFFFFFFFF, "08x") + return result def load_database(db_path: str) -> dict: @@ -106,12 +126,20 @@ def parse_md5_list(raw: str) -> list[str]: return [m.strip().lower() for m in raw.split(",") if m.strip()] if raw else [] +_shared_yml_cache: dict[str, dict] = {} +_platform_config_cache: dict[tuple[str, str], dict] = {} + + def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict: """Load a platform config with inheritance and shared group resolution. This is the SINGLE implementation used by generate_pack, generate_readme, verify, and auto_fetch. No other copy should exist. """ + cache_key = (platform_name, os.path.realpath(platforms_dir)) + if cache_key in _platform_config_cache: + return _platform_config_cache[cache_key] + if yaml is None: raise ImportError("PyYAML required: pip install pyyaml") @@ -136,16 +164,14 @@ def load_platform_config(platform_name: str, platforms_dir: str = "platforms") - merged["systems"][sys_id] = override config = merged - # Resolve shared group includes (cached to avoid re-parsing per call) + # Resolve shared group includes shared_path = os.path.join(platforms_dir, "_shared.yml") if os.path.exists(shared_path): - if not hasattr(load_platform_config, "_shared_cache"): - load_platform_config._shared_cache = {} - cache_key = os.path.realpath(shared_path) - if cache_key not in load_platform_config._shared_cache: + shared_real = os.path.realpath(shared_path) + if shared_real not in _shared_yml_cache: with open(shared_path) as f: - load_platform_config._shared_cache[cache_key] = yaml.safe_load(f) or {} - shared = load_platform_config._shared_cache[cache_key] + _shared_yml_cache[shared_real] = yaml.safe_load(f) or {} + shared = _shared_yml_cache[shared_real] shared_groups = shared.get("shared_groups", {}) for system in config.get("systems", {}).values(): for group_name in system.get("includes", []): @@ -165,6 +191,7 @@ def load_platform_config(platform_name: str, platforms_dir: str = "platforms") - system.setdefault("files", []).append(gf) existing.add(key) + _platform_config_cache[cache_key] = config return config @@ -485,37 +512,41 @@ def resolve_local_file( candidate = os.path.join(cache_dir, try_name) if os.path.isfile(candidate): return candidate, "data_dir" - # Basename walk: find file anywhere in cache tree + # Basename walk: find file anywhere in cache tree (case-insensitive) basename_targets = { - (n.rsplit("/", 1)[-1] if "/" in n else n) + (n.rsplit("/", 1)[-1] if "/" in n else n).casefold() for n in names_to_try } for root, _dirs, fnames in os.walk(cache_dir): for fn in fnames: - if fn in basename_targets: + if fn.casefold() in basename_targets: return os.path.join(root, fn), "data_dir" return None, "not_found" +_mame_clone_map_cache: dict[str, str] | None = None + + def _get_mame_clone_map() -> dict[str, str]: """Load and cache the MAME clone map (clone_name -> canonical_name).""" - if not hasattr(_get_mame_clone_map, "_cache"): - clone_path = os.path.join( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))), - "_mame_clones.json", - ) - if os.path.exists(clone_path): - import json as _json - with open(clone_path) as f: - data = _json.load(f) - _get_mame_clone_map._cache = {} - for canonical, info in data.items(): - for clone in info.get("clones", []): - _get_mame_clone_map._cache[clone] = canonical - else: - _get_mame_clone_map._cache = {} - return _get_mame_clone_map._cache + global _mame_clone_map_cache + if _mame_clone_map_cache is not None: + return _mame_clone_map_cache + clone_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "_mame_clones.json", + ) + if os.path.exists(clone_path): + with open(clone_path) as f: + data = json.load(f) + _mame_clone_map_cache = {} + for canonical, info in data.items(): + for clone in info.get("clones", []): + _mame_clone_map_cache[clone] = canonical + else: + _mame_clone_map_cache = {} + return _mame_clone_map_cache def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str: @@ -540,13 +571,32 @@ def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str: return "error" +_zip_contents_cache: tuple[frozenset[tuple[str, float]], dict] | None = None + + def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) -> dict: - """Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files.""" - index: dict[str, str] = {} + """Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files. + + Results are cached in-process; repeated calls with unchanged ZIPs return + the cached index. + """ + global _zip_contents_cache + + # Build fingerprint from ZIP paths + mtimes for cache invalidation + zip_entries: list[tuple[str, str]] = [] for sha1, entry in db.get("files", {}).items(): path = entry["path"] - if not path.endswith(".zip") or not os.path.exists(path): - continue + if path.endswith(".zip") and os.path.exists(path): + zip_entries.append((path, sha1)) + + fingerprint = frozenset( + (path, os.path.getmtime(path)) for path, _ in zip_entries + ) + if _zip_contents_cache is not None and _zip_contents_cache[0] == fingerprint: + return _zip_contents_cache[1] + + index: dict[str, str] = {} + for path, sha1 in zip_entries: try: with zipfile.ZipFile(path, "r") as zf: for info in zf.infolist(): @@ -559,6 +609,8 @@ def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) index[h.hexdigest()] = sha1 except (zipfile.BadZipFile, OSError): continue + + _zip_contents_cache = (fingerprint, index) return index @@ -859,31 +911,35 @@ def fetch_large_file(name: str, dest_dir: str = LARGE_FILES_CACHE, else: return cached - encoded_name = urllib.request.quote(name) + encoded_name = urllib.parse.quote(name) url = f"https://github.com/{LARGE_FILES_REPO}/releases/download/{LARGE_FILES_RELEASE}/{encoded_name}" + os.makedirs(dest_dir, exist_ok=True) + tmp_path = cached + ".tmp" try: req = urllib.request.Request(url, headers={"User-Agent": "retrobios/1.0"}) with urllib.request.urlopen(req, timeout=300) as resp: - os.makedirs(dest_dir, exist_ok=True) - with open(cached, "wb") as f: + with open(tmp_path, "wb") as f: while True: chunk = resp.read(65536) if not chunk: break f.write(chunk) except (urllib.error.URLError, urllib.error.HTTPError): + if os.path.exists(tmp_path): + os.unlink(tmp_path) return None if expected_sha1 or expected_md5: - hashes = compute_hashes(cached) + hashes = compute_hashes(tmp_path) if expected_sha1 and hashes["sha1"].lower() != expected_sha1.lower(): - os.unlink(cached) + os.unlink(tmp_path) return None if expected_md5: md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()] if hashes["md5"].lower() not in md5_list: - os.unlink(cached) + os.unlink(tmp_path) return None + os.replace(tmp_path, cached) return cached @@ -938,12 +994,31 @@ def list_platform_system_ids(platform_name: str, platforms_dir: str) -> None: -# Re-exports: validation and truth modules extracted for SoC. -# Existing consumers import from common — these preserve that contract. -from validation import ( # noqa: F401, E402 - _build_validation_index, _parse_validation, build_ground_truth, - check_file_validation, filter_files_by_mode, validate_cli_modes, -) -from truth import ( # noqa: F401, E402 - diff_platform_truth, generate_platform_truth, -) +def build_target_cores_cache( + platforms: list[str], + target: str, + platforms_dir: str, + is_all: bool = False, +) -> tuple[dict[str, set[str] | None], list[str]]: + """Build target cores cache for a list of platforms. + + Returns (cache dict, list of platforms to keep after skipping failures). + """ + cache: dict[str, set[str] | None] = {} + skip: list[str] = [] + for p in platforms: + try: + cache[p] = load_target_config(p, target, platforms_dir) + except FileNotFoundError: + if is_all: + cache[p] = None + else: + raise + except ValueError as e: + if is_all: + print(f"INFO: Skipping {p}: {e}") + skip.append(p) + else: + raise + kept = [p for p in platforms if p not in skip] + return cache, kept