fix: case-insensitive data dir basename resolution

This commit is contained in:
Abdessamad Derraz
2026-03-29 23:01:32 +02:00
parent 84decad08d
commit a08c730805

View File

@@ -10,6 +10,7 @@ import hashlib
import json
import os
import urllib.error
import urllib.parse
import urllib.request
import zipfile
import zlib
@@ -32,27 +33,46 @@ def require_yaml():
sys.exit(1)
def compute_hashes(filepath: str | Path) -> dict[str, str]:
"""Compute SHA1, MD5, SHA256, CRC32, Adler32 for a file."""
sha1 = hashlib.sha1()
md5 = hashlib.md5()
sha256 = hashlib.sha256()
_ALL_ALGORITHMS = frozenset({"sha1", "md5", "sha256", "crc32", "adler32"})
def compute_hashes(
filepath: str | Path,
algorithms: frozenset[str] | None = None,
) -> dict[str, str]:
"""Compute file hashes. Pass *algorithms* to limit which are computed."""
algos = algorithms or _ALL_ALGORITHMS
sha1 = hashlib.sha1() if "sha1" in algos else None
md5 = hashlib.md5() if "md5" in algos else None
sha256 = hashlib.sha256() if "sha256" in algos else None
do_crc = "crc32" in algos
do_adler = "adler32" in algos
crc = 0
adler = 1 # zlib.adler32 initial value
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
if sha1:
sha1.update(chunk)
if md5:
md5.update(chunk)
if sha256:
sha256.update(chunk)
if do_crc:
crc = zlib.crc32(chunk, crc)
if do_adler:
adler = zlib.adler32(chunk, adler)
return {
"sha1": sha1.hexdigest(),
"md5": md5.hexdigest(),
"sha256": sha256.hexdigest(),
"crc32": format(crc & 0xFFFFFFFF, "08x"),
"adler32": format(adler & 0xFFFFFFFF, "08x"),
}
result: dict[str, str] = {}
if sha1:
result["sha1"] = sha1.hexdigest()
if md5:
result["md5"] = md5.hexdigest()
if sha256:
result["sha256"] = sha256.hexdigest()
if do_crc:
result["crc32"] = format(crc & 0xFFFFFFFF, "08x")
if do_adler:
result["adler32"] = format(adler & 0xFFFFFFFF, "08x")
return result
def load_database(db_path: str) -> dict:
@@ -106,12 +126,20 @@ def parse_md5_list(raw: str) -> list[str]:
return [m.strip().lower() for m in raw.split(",") if m.strip()] if raw else []
_shared_yml_cache: dict[str, dict] = {}
_platform_config_cache: dict[tuple[str, str], dict] = {}
def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict:
"""Load a platform config with inheritance and shared group resolution.
This is the SINGLE implementation used by generate_pack, generate_readme,
verify, and auto_fetch. No other copy should exist.
"""
cache_key = (platform_name, os.path.realpath(platforms_dir))
if cache_key in _platform_config_cache:
return _platform_config_cache[cache_key]
if yaml is None:
raise ImportError("PyYAML required: pip install pyyaml")
@@ -136,16 +164,14 @@ def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -
merged["systems"][sys_id] = override
config = merged
# Resolve shared group includes (cached to avoid re-parsing per call)
# Resolve shared group includes
shared_path = os.path.join(platforms_dir, "_shared.yml")
if os.path.exists(shared_path):
if not hasattr(load_platform_config, "_shared_cache"):
load_platform_config._shared_cache = {}
cache_key = os.path.realpath(shared_path)
if cache_key not in load_platform_config._shared_cache:
shared_real = os.path.realpath(shared_path)
if shared_real not in _shared_yml_cache:
with open(shared_path) as f:
load_platform_config._shared_cache[cache_key] = yaml.safe_load(f) or {}
shared = load_platform_config._shared_cache[cache_key]
_shared_yml_cache[shared_real] = yaml.safe_load(f) or {}
shared = _shared_yml_cache[shared_real]
shared_groups = shared.get("shared_groups", {})
for system in config.get("systems", {}).values():
for group_name in system.get("includes", []):
@@ -165,6 +191,7 @@ def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -
system.setdefault("files", []).append(gf)
existing.add(key)
_platform_config_cache[cache_key] = config
return config
@@ -485,37 +512,41 @@ def resolve_local_file(
candidate = os.path.join(cache_dir, try_name)
if os.path.isfile(candidate):
return candidate, "data_dir"
# Basename walk: find file anywhere in cache tree
# Basename walk: find file anywhere in cache tree (case-insensitive)
basename_targets = {
(n.rsplit("/", 1)[-1] if "/" in n else n)
(n.rsplit("/", 1)[-1] if "/" in n else n).casefold()
for n in names_to_try
}
for root, _dirs, fnames in os.walk(cache_dir):
for fn in fnames:
if fn in basename_targets:
if fn.casefold() in basename_targets:
return os.path.join(root, fn), "data_dir"
return None, "not_found"
_mame_clone_map_cache: dict[str, str] | None = None
def _get_mame_clone_map() -> dict[str, str]:
"""Load and cache the MAME clone map (clone_name -> canonical_name)."""
if not hasattr(_get_mame_clone_map, "_cache"):
global _mame_clone_map_cache
if _mame_clone_map_cache is not None:
return _mame_clone_map_cache
clone_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"_mame_clones.json",
)
if os.path.exists(clone_path):
import json as _json
with open(clone_path) as f:
data = _json.load(f)
_get_mame_clone_map._cache = {}
data = json.load(f)
_mame_clone_map_cache = {}
for canonical, info in data.items():
for clone in info.get("clones", []):
_get_mame_clone_map._cache[clone] = canonical
_mame_clone_map_cache[clone] = canonical
else:
_get_mame_clone_map._cache = {}
return _get_mame_clone_map._cache
_mame_clone_map_cache = {}
return _mame_clone_map_cache
def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
@@ -540,13 +571,32 @@ def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
return "error"
_zip_contents_cache: tuple[frozenset[tuple[str, float]], dict] | None = None
def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024) -> dict:
"""Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files."""
index: dict[str, str] = {}
"""Build {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files.
Results are cached in-process; repeated calls with unchanged ZIPs return
the cached index.
"""
global _zip_contents_cache
# Build fingerprint from ZIP paths + mtimes for cache invalidation
zip_entries: list[tuple[str, str]] = []
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
if path.endswith(".zip") and os.path.exists(path):
zip_entries.append((path, sha1))
fingerprint = frozenset(
(path, os.path.getmtime(path)) for path, _ in zip_entries
)
if _zip_contents_cache is not None and _zip_contents_cache[0] == fingerprint:
return _zip_contents_cache[1]
index: dict[str, str] = {}
for path, sha1 in zip_entries:
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
@@ -559,6 +609,8 @@ def build_zip_contents_index(db: dict, max_entry_size: int = 512 * 1024 * 1024)
index[h.hexdigest()] = sha1
except (zipfile.BadZipFile, OSError):
continue
_zip_contents_cache = (fingerprint, index)
return index
@@ -859,31 +911,35 @@ def fetch_large_file(name: str, dest_dir: str = LARGE_FILES_CACHE,
else:
return cached
encoded_name = urllib.request.quote(name)
encoded_name = urllib.parse.quote(name)
url = f"https://github.com/{LARGE_FILES_REPO}/releases/download/{LARGE_FILES_RELEASE}/{encoded_name}"
os.makedirs(dest_dir, exist_ok=True)
tmp_path = cached + ".tmp"
try:
req = urllib.request.Request(url, headers={"User-Agent": "retrobios/1.0"})
with urllib.request.urlopen(req, timeout=300) as resp:
os.makedirs(dest_dir, exist_ok=True)
with open(cached, "wb") as f:
with open(tmp_path, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
except (urllib.error.URLError, urllib.error.HTTPError):
if os.path.exists(tmp_path):
os.unlink(tmp_path)
return None
if expected_sha1 or expected_md5:
hashes = compute_hashes(cached)
hashes = compute_hashes(tmp_path)
if expected_sha1 and hashes["sha1"].lower() != expected_sha1.lower():
os.unlink(cached)
os.unlink(tmp_path)
return None
if expected_md5:
md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()]
if hashes["md5"].lower() not in md5_list:
os.unlink(cached)
os.unlink(tmp_path)
return None
os.replace(tmp_path, cached)
return cached
@@ -938,12 +994,31 @@ def list_platform_system_ids(platform_name: str, platforms_dir: str) -> None:
# Re-exports: validation and truth modules extracted for SoC.
# Existing consumers import from common — these preserve that contract.
from validation import ( # noqa: F401, E402
_build_validation_index, _parse_validation, build_ground_truth,
check_file_validation, filter_files_by_mode, validate_cli_modes,
)
from truth import ( # noqa: F401, E402
diff_platform_truth, generate_platform_truth,
)
def build_target_cores_cache(
platforms: list[str],
target: str,
platforms_dir: str,
is_all: bool = False,
) -> tuple[dict[str, set[str] | None], list[str]]:
"""Build target cores cache for a list of platforms.
Returns (cache dict, list of platforms to keep after skipping failures).
"""
cache: dict[str, set[str] | None] = {}
skip: list[str] = []
for p in platforms:
try:
cache[p] = load_target_config(p, target, platforms_dir)
except FileNotFoundError:
if is_all:
cache[p] = None
else:
raise
except ValueError as e:
if is_all:
print(f"INFO: Skipping {p}: {e}")
skip.append(p)
else:
raise
kept = [p for p in platforms if p not in skip]
return cache, kept