refactor: centralize hash logic, fix circular imports and perf bottlenecks

This commit is contained in:
Abdessamad Derraz
2026-03-18 11:51:12 +01:00
parent becd0efb33
commit 08f68e792d
11 changed files with 132 additions and 113 deletions

View File

@@ -1,5 +1,5 @@
{
"generated_at": "2026-03-18T07:24:56Z",
"generated_at": "2026-03-18T10:49:11Z",
"total_files": 5357,
"total_size": 4887546948,
"files": {
@@ -68044,14 +68044,6 @@
"disk2-16boot.rom": [
"d4181c9f046aafc3fb326b381baac809d9e38d16"
],
"tos100uk.img": [
"da39a3ee5e6b4b0d3255bfef95601890afd80709",
"9a6e4c88533a9eaa4d55cdc040e47443e0226eb2"
],
"tos206us.img": [
"da39a3ee5e6b4b0d3255bfef95601890afd80709",
"ee58768bdfc602c9b14942ce5481e97dd24e7c83"
],
"tos102uk.img": [
"87900a40a890fdf03bd08be6c60cc645855cbce5"
],
@@ -68952,9 +68944,15 @@
"sega-saturn:0306c0e408d6682dd2d86324bd4ac661": [
"8c031bf9908fd0142fdd10a9cdd79389f8a3f2fc"
],
"tos100uk.img": [
"9a6e4c88533a9eaa4d55cdc040e47443e0226eb2"
],
"tos106de.img": [
"3b8cf5ffa41b252eb67f8824f94608fa4005d6dd"
],
"tos206us.img": [
"ee58768bdfc602c9b14942ce5481e97dd24e7c83"
],
"bios7.bin": [
"24f67bdea115a2c847c8813a262502ee1607b7df"
],

View File

@@ -45,12 +45,16 @@ def load_database(db_path: str) -> dict:
return json.load(f)
def md5sum(filepath: str | Path) -> str:
"""Compute MD5 of a file - matches Batocera's md5sum()."""
def md5sum(source: str | Path | object) -> str:
"""Compute MD5 of a file path or file-like object - matches Batocera's md5sum()."""
h = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
if hasattr(source, "read"):
for chunk in iter(lambda: source.read(65536), b""):
h.update(chunk)
else:
with open(source, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
@@ -154,13 +158,13 @@ def resolve_local_file(
if sha1_match and sha1_match in files_db:
path = files_db[sha1_match]["path"]
if os.path.exists(path):
return path, "exact"
return path, "md5_exact"
if len(md5_candidate) < 32:
for db_md5, db_sha1 in by_md5.items():
if db_md5.startswith(md5_candidate) and db_sha1 in files_db:
path = files_db[db_sha1]["path"]
if os.path.exists(path):
return path, "exact"
return path, "md5_exact"
# 3. zipped_file content match via pre-built index
if zipped_file and md5_list and zip_contents:
@@ -217,27 +221,6 @@ def resolve_local_file(
return None, "not_found"
def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict:
"""Compute BIOS coverage for a platform using verify logic."""
from verify import verify_platform
config = load_platform_config(platform_name, platforms_dir)
result = verify_platform(config, db)
present = result["ok"] + result["untested"]
pct = (present / result["total"] * 100) if result["total"] > 0 else 0
return {
"platform": config.get("platform", platform_name),
"total": result["total"],
"verified": result["ok"],
"untested": result["untested"],
"missing": result["missing"],
"present": present,
"percentage": pct,
"mode": config.get("verification_mode", "existence"),
"details": result["details"],
"config": config,
}
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
"""Extract a ZIP file safely, preventing zip-slip path traversal."""
dest = os.path.realpath(dest_dir)

View File

@@ -25,7 +25,7 @@ except ImportError:
sys.exit(1)
sys.path.insert(0, os.path.dirname(__file__))
from common import load_database
from common import load_database, load_platform_config
DEFAULT_EMULATORS_DIR = "emulators"
DEFAULT_PLATFORMS_DIR = "platforms"
@@ -52,8 +52,7 @@ def load_platform_files(platforms_dir: str) -> dict[str, set[str]]:
for f in sorted(Path(platforms_dir).glob("*.yml")):
if f.name.startswith("_"):
continue
with open(f) as fh:
config = yaml.safe_load(fh) or {}
config = load_platform_config(f.stem, platforms_dir)
for sys_id, system in config.get("systems", {}).items():
for fe in system.get("files", []):
name = fe.get("name", "")

View File

@@ -13,7 +13,6 @@ Usage:
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
@@ -23,7 +22,7 @@ import zipfile
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import safe_extract_zip
from common import compute_hashes, safe_extract_zip
GITHUB_API = "https://api.github.com"
REPO = "Abdess/retrobios"
@@ -135,21 +134,15 @@ def verify_files(platform: str, dest_dir: str, release: dict):
found = False
for local_file in dest.rglob(name):
if local_file.is_file():
h = hashlib.sha1()
with open(local_file, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
local_sha1 = compute_hashes(local_file)["sha1"]
if h.hexdigest() == sha1:
if local_sha1 == sha1:
verified += 1
found = True
break
else:
mismatched += 1
print(f" MISMATCH: {name} (expected {sha1[:12]}..., got {h.hexdigest()[:12]}...)")
print(f" MISMATCH: {name} (expected {sha1[:12]}..., got {local_sha1[:12]}...)")
found = True
break

View File

@@ -24,7 +24,7 @@ import zipfile
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import load_database, load_platform_config, md5_composite, resolve_local_file
from common import compute_hashes, load_database, load_platform_config, md5_composite, resolve_local_file
try:
import yaml
@@ -44,17 +44,12 @@ MAX_ENTRY_SIZE = 512 * 1024 * 1024 # 512MB
def _verify_file_hash(path: str, expected_sha1: str = "",
expected_md5: str = "") -> bool:
"""Compute and compare hash of a local file."""
if not expected_sha1 and not expected_md5:
return True
h = hashlib.sha1() if expected_sha1 else hashlib.md5()
with open(path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest() == (expected_sha1 or expected_md5)
hashes = compute_hashes(path)
if expected_sha1:
return hashes["sha1"] == expected_sha1
return hashes["md5"] == expected_md5
def fetch_large_file(name: str, dest_dir: str = ".cache/large",
@@ -267,26 +262,20 @@ def _load_emulator_extras(
def generate_pack(
platform_name: str,
platforms_dir: str,
db_path: str,
db: dict,
bios_dir: str,
output_dir: str,
include_extras: bool = False,
emulators_dir: str = "emulators",
zip_contents: dict | None = None,
) -> str | None:
"""Generate a ZIP pack for a platform.
Returns the path to the generated ZIP, or None on failure.
"""
config = load_platform_config(platform_name, platforms_dir)
db = load_database(db_path)
# Only build the expensive ZIP contents index if the platform has zipped_file entries
has_zipped = any(
fe.get("zipped_file")
for sys in config.get("systems", {}).values()
for fe in sys.get("files", [])
)
zip_contents = build_zip_contents_index(db) if has_zipped else {}
if zip_contents is None:
zip_contents = {}
verification_mode = config.get("verification_mode", "existence")
platform_display = config.get("platform", platform_name)
@@ -468,6 +457,9 @@ def main():
parser.error("Specify --platform or --all")
return
db = load_database(args.db)
zip_contents = build_zip_contents_index(db)
groups = _group_identical_platforms(platforms, args.platforms_dir)
for group_platforms, representative in groups:
@@ -480,8 +472,9 @@ def main():
try:
zip_path = generate_pack(
representative, args.platforms_dir, args.db, args.bios_dir, args.output_dir,
representative, args.platforms_dir, db, args.bios_dir, args.output_dir,
include_extras=args.include_extras, emulators_dir=args.emulators_dir,
zip_contents=zip_contents,
)
if zip_path and len(group_platforms) > 1:
# Rename ZIP to include all platform names

View File

@@ -18,7 +18,27 @@ from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import load_database, compute_coverage
from common import load_database, load_platform_config
from verify import verify_platform
def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict:
config = load_platform_config(platform_name, platforms_dir)
result = verify_platform(config, db)
present = result["ok"] + result["untested"]
pct = (present / result["total"] * 100) if result["total"] > 0 else 0
return {
"platform": config.get("platform", platform_name),
"total": result["total"],
"verified": result["ok"],
"untested": result["untested"],
"missing": result["missing"],
"present": present,
"percentage": pct,
"mode": config.get("verification_mode", "existence"),
"details": result["details"],
"config": config,
}
SITE_URL = "https://abdess.github.io/retrobios/"
RELEASE_URL = "../../releases/latest"

View File

@@ -26,7 +26,8 @@ except ImportError:
sys.exit(1)
sys.path.insert(0, os.path.dirname(__file__))
from common import load_database, load_platform_config, compute_coverage
from common import load_database, load_platform_config
from verify import verify_platform
DOCS_DIR = "docs"
SITE_NAME = "RetroBIOS"
@@ -64,9 +65,23 @@ def _status_icon(pct: float) -> str:
return "partial"
# ---------------------------------------------------------------------------
# Coverage computation (reuses verify.py logic)
# ---------------------------------------------------------------------------
def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict:
config = load_platform_config(platform_name, platforms_dir)
result = verify_platform(config, db)
present = result["ok"] + result["untested"]
pct = (present / result["total"] * 100) if result["total"] > 0 else 0
return {
"platform": config.get("platform", platform_name),
"total": result["total"],
"verified": result["ok"],
"untested": result["untested"],
"missing": result["missing"],
"present": present,
"percentage": pct,
"mode": config.get("verification_mode", "existence"),
"details": result["details"],
"config": config,
}
# ---------------------------------------------------------------------------

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import json
import sys
import urllib.request
import urllib.error
from abc import ABC, abstractmethod
@@ -168,7 +169,6 @@ def scraper_cli(scraper_class: type, description: str = "Scrape BIOS requirement
reqs = scraper.fetch_requirements()
except (ConnectionError, ValueError) as e:
print(f"Error: {e}", file=sys.stderr)
import sys
sys.exit(1)
if args.dry_run:

View File

@@ -211,28 +211,12 @@ class Scraper(BaseScraper):
systems[req.system]["files"].append(entry)
# Sort numerically since API returns by commit date, not version
import json as _json
tag = fetch_github_latest_tag("batocera-linux/batocera.linux", prefix="batocera-")
batocera_version = ""
try:
_url = "https://api.github.com/repos/batocera-linux/batocera.linux/tags?per_page=50"
_req = urllib.request.Request(_url, headers={
"User-Agent": "retrobios-scraper/1.0",
"Accept": "application/vnd.github.v3+json",
})
with urllib.request.urlopen(_req, timeout=15) as _resp:
_tags = _json.loads(_resp.read())
_versions = []
for _t in _tags:
_name = _t["name"]
if _name.startswith("batocera-"):
_num = _name.replace("batocera-", "")
if _num.isdigit():
_versions.append(int(_num))
if _versions:
batocera_version = str(max(_versions))
except (ConnectionError, ValueError, OSError):
pass
if tag:
num = tag.removeprefix("batocera-")
if num.isdigit():
batocera_version = num
return {
"platform": "Batocera",

View File

@@ -262,7 +262,6 @@ def main():
print(json.dumps(config, indent=2))
return
reqs = scraper.fetch_requirements()
by_system = {}
for r in reqs:
by_system.setdefault(r.system, []).append(r)

View File

@@ -54,14 +54,9 @@ def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
return Status.OK
with archive.open(fname) as entry:
h = hashlib.md5()
while True:
block = entry.read(65536)
if not block:
break
h.update(block)
actual = md5sum(entry)
if h.hexdigest() == expected_md5:
if actual == expected_md5:
return Status.OK
else:
return Status.UNTESTED
@@ -71,10 +66,13 @@ def check_inside_zip(container: str, file_name: str, expected_md5: str) -> str:
return "error"
def resolve_to_local_path(file_entry: dict, db: dict) -> str | None:
def resolve_to_local_path(
file_entry: dict,
db: dict,
zip_contents: dict | None = None,
) -> tuple[str | None, str]:
"""Find the local file path for a BIOS entry. Delegates to common.resolve_local_file."""
path, _ = resolve_local_file(file_entry, db)
return path
return resolve_local_file(file_entry, db, zip_contents)
def verify_entry_existence(file_entry: dict, local_path: str | None) -> dict:
@@ -85,7 +83,11 @@ def verify_entry_existence(file_entry: dict, local_path: str | None) -> dict:
return {"name": name, "status": Status.MISSING}
def verify_entry_md5(file_entry: dict, local_path: str | None) -> dict:
def verify_entry_md5(
file_entry: dict,
local_path: str | None,
resolve_status: str = "",
) -> dict:
"""MD5 verification - supports single MD5 (Batocera) and multi-MD5 (Recalbox)."""
name = file_entry.get("name", "")
expected_md5 = file_entry.get("md5", "")
@@ -125,6 +127,9 @@ def verify_entry_md5(file_entry: dict, local_path: str | None) -> dict:
if not md5_list:
return {"name": name, "status": Status.OK, "path": local_path}
if resolve_status == "md5_exact":
return {"name": name, "status": Status.OK, "path": local_path}
actual_md5 = md5sum(local_path)
# Case-insensitive - Recalbox uses uppercase MD5s
@@ -153,6 +158,26 @@ def verify_entry_md5(file_entry: dict, local_path: str | None) -> dict:
}
def _build_zip_contents_index(db: dict) -> dict:
"""Build index of {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files."""
index: dict[str, str] = {}
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
if info.is_dir() or info.file_size > 512 * 1024 * 1024:
continue
data = zf.read(info.filename)
inner_md5 = hashlib.md5(data).hexdigest()
index[inner_md5] = sha1
except (zipfile.BadZipFile, OSError):
continue
return index
def verify_platform(config: dict, db: dict) -> dict:
"""Verify all BIOS files for a platform using its verification_mode.
@@ -170,13 +195,23 @@ def verify_platform(config: dict, db: dict) -> dict:
mode = config.get("verification_mode", "existence")
platform = config.get("platform", "unknown")
verify_fn = verify_entry_existence if mode == "existence" else verify_entry_md5
has_zipped = any(
fe.get("zipped_file")
for sys in config.get("systems", {}).values()
for fe in sys.get("files", [])
)
zip_contents = _build_zip_contents_index(db) if has_zipped else {}
results = []
for sys_id, system in config.get("systems", {}).items():
for file_entry in system.get("files", []):
local_path = resolve_to_local_path(file_entry, db)
result = verify_fn(file_entry, local_path)
local_path, resolve_status = resolve_to_local_path(
file_entry, db, zip_contents,
)
if mode == "existence":
result = verify_entry_existence(file_entry, local_path)
else:
result = verify_entry_md5(file_entry, local_path, resolve_status)
result["system"] = sys_id
results.append(result)