Files
libretro/scripts/generate_pack.py
2026-03-17 11:38:52 +01:00

425 lines
16 KiB
Python

#!/usr/bin/env python3
"""Generate platform-specific BIOS ZIP packs.
Usage:
python scripts/generate_pack.py --platform retroarch [--output-dir dist/]
python scripts/generate_pack.py --all [--output-dir dist/]
Reads platform YAML config + database.json -> creates ZIP with correct
file layout for each platform. Handles inheritance, shared groups, variants,
and 3-tier storage (embedded/external/user_provided).
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import tempfile
import urllib.request
import urllib.error
import zipfile
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from common import load_platform_config
try:
import yaml
except ImportError:
print("Error: PyYAML required (pip install pyyaml)", file=sys.stderr)
sys.exit(1)
DEFAULT_PLATFORMS_DIR = "platforms"
DEFAULT_DB_FILE = "database.json"
DEFAULT_OUTPUT_DIR = "dist"
DEFAULT_BIOS_DIR = "bios"
LARGE_FILES_RELEASE = "large-files"
LARGE_FILES_REPO = "Abdess/retrobios"
def load_database(db_path: str) -> dict:
with open(db_path) as f:
return json.load(f)
def fetch_large_file(name: str, dest_dir: str = ".cache/large") -> str | None:
"""Download a large file from the 'large-files' GitHub release if not cached."""
cached = os.path.join(dest_dir, name)
if os.path.exists(cached):
return cached
encoded_name = urllib.request.quote(name)
url = f"https://github.com/{LARGE_FILES_REPO}/releases/download/{LARGE_FILES_RELEASE}/{encoded_name}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "retrobios-pack/1.0"})
with urllib.request.urlopen(req, timeout=300) as resp:
data = resp.read()
os.makedirs(dest_dir, exist_ok=True)
with open(cached, "wb") as f:
f.write(data)
return cached
except (urllib.error.URLError, urllib.error.HTTPError):
return None
def resolve_file(file_entry: dict, db: dict, bios_dir: str,
zip_contents: dict | None = None) -> tuple[str | None, str]:
"""Resolve a BIOS file to its local path using database.json.
Returns (local_path, status) where status is one of:
exact, zip_exact, hash_mismatch, external, user_provided, not_found.
"""
storage = file_entry.get("storage", "embedded")
if storage == "user_provided":
return None, "user_provided"
if storage == "external":
return None, "external"
sha1 = file_entry.get("sha1")
md5 = file_entry.get("md5")
name = file_entry.get("name", "")
zipped_file = file_entry.get("zipped_file")
if sha1 and sha1 in db.get("files", {}):
local_path = db["files"][sha1]["path"]
if os.path.exists(local_path):
return local_path, "exact"
if md5:
sha1_from_md5 = db.get("indexes", {}).get("by_md5", {}).get(md5)
if sha1_from_md5 and sha1_from_md5 in db["files"]:
local_path = db["files"][sha1_from_md5]["path"]
if os.path.exists(local_path):
return local_path, "exact"
# Truncated MD5 match (batocera-systems bug: 29 chars instead of 32)
if len(md5) < 32:
for db_md5, db_sha1 in db.get("indexes", {}).get("by_md5", {}).items():
if db_md5.startswith(md5) and db_sha1 in db["files"]:
local_path = db["files"][db_sha1]["path"]
if os.path.exists(local_path):
return local_path, "exact"
if zipped_file and md5 and zip_contents:
if md5 in zip_contents:
zip_sha1 = zip_contents[md5]
if zip_sha1 in db["files"]:
local_path = db["files"][zip_sha1]["path"]
if os.path.exists(local_path):
return local_path, "zip_exact"
# No MD5 specified = any file with that name is acceptable
if not md5:
name_matches = db.get("indexes", {}).get("by_name", {}).get(name, [])
for match_sha1 in name_matches:
if match_sha1 in db["files"]:
local_path = db["files"][match_sha1]["path"]
if os.path.exists(local_path):
return local_path, "exact"
name_matches = db.get("indexes", {}).get("by_name", {}).get(name, [])
for match_sha1 in name_matches:
if match_sha1 in db["files"]:
local_path = db["files"][match_sha1]["path"]
if os.path.exists(local_path):
return local_path, "hash_mismatch"
# Last resort: try downloading from large-files release
cached = fetch_large_file(name)
if cached:
return cached, "release_asset"
return None, "not_found"
def build_zip_contents_index(db: dict) -> dict:
"""Build index of {inner_rom_md5: zip_file_sha1} for ROMs inside ZIP files."""
index = {}
for sha1, entry in db.get("files", {}).items():
path = entry["path"]
if not path.endswith(".zip") or not os.path.exists(path):
continue
try:
with zipfile.ZipFile(path, "r") as zf:
for info in zf.infolist():
if info.is_dir():
continue
data = zf.read(info.filename)
inner_md5 = hashlib.md5(data).hexdigest()
index[inner_md5] = sha1
except (zipfile.BadZipFile, OSError):
continue
return index
def download_external(file_entry: dict, dest_path: str) -> bool:
"""Download an external BIOS file, verify hash, save to dest_path."""
url = file_entry.get("source_url")
if not url:
return False
try:
req = urllib.request.Request(url, headers={"User-Agent": "retrobios-pack-gen/1.0"})
with urllib.request.urlopen(req, timeout=120) as resp:
data = resp.read()
except urllib.error.URLError as e:
print(f" WARNING: Failed to download {url}: {e}")
return False
# Verify hash
sha256 = file_entry.get("sha256")
sha1 = file_entry.get("sha1")
md5 = file_entry.get("md5")
if sha256:
actual = hashlib.sha256(data).hexdigest()
if actual != sha256:
print(f" WARNING: SHA256 mismatch for {file_entry['name']}")
return False
elif sha1:
actual = hashlib.sha1(data).hexdigest()
if actual != sha1:
print(f" WARNING: SHA1 mismatch for {file_entry['name']}")
return False
elif md5:
actual = hashlib.md5(data).hexdigest()
if actual != md5:
print(f" WARNING: MD5 mismatch for {file_entry['name']}")
return False
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
with open(dest_path, "wb") as f:
f.write(data)
return True
def generate_pack(
platform_name: str,
platforms_dir: str,
db_path: str,
bios_dir: str,
output_dir: str,
) -> str | None:
"""Generate a ZIP pack for a platform.
Returns the path to the generated ZIP, or None on failure.
"""
config = load_platform_config(platform_name, platforms_dir)
db = load_database(db_path)
zip_contents = build_zip_contents_index(db)
verification_mode = config.get("verification_mode", "existence")
platform_display = config.get("platform", platform_name)
base_dest = config.get("base_destination", "")
zip_name = f"{platform_display.replace(' ', '_')}_BIOS_Pack.zip"
zip_path = os.path.join(output_dir, zip_name)
os.makedirs(output_dir, exist_ok=True)
total_files = 0
missing_files = []
untested_files = []
user_provided = []
seen_destinations = set()
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for sys_id, system in sorted(config.get("systems", {}).items()):
for file_entry in system.get("files", []):
dest = file_entry.get("destination", file_entry["name"])
if base_dest:
full_dest = f"{base_dest}/{dest}"
else:
full_dest = dest
dedup_key = full_dest
if dedup_key in seen_destinations:
continue
seen_destinations.add(dedup_key)
storage = file_entry.get("storage", "embedded")
if storage == "user_provided":
instructions = file_entry.get("instructions", "Please provide this file manually.")
instr_name = f"INSTRUCTIONS_{file_entry['name']}.txt"
instr_path = f"{base_dest}/{instr_name}" if base_dest else instr_name
zf.writestr(instr_path, f"File needed: {file_entry['name']}\n\n{instructions}\n")
user_provided.append(file_entry["name"])
total_files += 1
continue
local_path, status = resolve_file(file_entry, db, bios_dir, zip_contents)
if status == "external":
suffix = os.path.splitext(file_entry["name"])[1] or ""
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp_path = tmp.name
try:
if download_external(file_entry, tmp_path):
extract = file_entry.get("extract", False)
if extract and tmp_path.endswith(".zip"):
_extract_zip_to_archive(tmp_path, full_dest, zf)
else:
zf.write(tmp_path, full_dest)
total_files += 1
else:
missing_files.append(file_entry["name"])
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
continue
if status == "not_found":
missing_files.append(file_entry["name"])
continue
if status == "hash_mismatch":
if verification_mode != "existence":
untested_files.append(file_entry["name"])
extract = file_entry.get("extract", False)
if extract and local_path.endswith(".zip"):
_extract_zip_to_archive(local_path, full_dest, zf)
else:
zf.write(local_path, full_dest)
total_files += 1
if missing_files:
print(f" Missing ({len(missing_files)}): {', '.join(missing_files[:10])}")
if len(missing_files) > 10:
print(f" ... and {len(missing_files) - 10} more")
if untested_files:
print(f" Untested ({len(untested_files)}): {', '.join(untested_files[:10])}")
if len(untested_files) > 10:
print(f" ... and {len(untested_files) - 10} more")
if user_provided:
print(f" User-provided ({len(user_provided)}): {', '.join(user_provided)}")
if verification_mode == "existence":
# RetroArch-family: only existence matters
print(f" Generated {zip_path}: {total_files} files ({total_files} present, {len(missing_files)} missing) [verification: existence]")
else:
# Batocera-family: hash verification matters
verified = total_files - len(untested_files)
print(f" Generated {zip_path}: {total_files} files ({verified} verified, {len(untested_files)} untested, {len(missing_files)} missing) [verification: {verification_mode}]")
return zip_path
def _extract_zip_to_archive(source_zip: str, dest_prefix: str, target_zf: zipfile.ZipFile):
"""Extract contents of a source ZIP into target ZIP under dest_prefix."""
with zipfile.ZipFile(source_zip, "r") as src:
for info in src.infolist():
if info.is_dir():
continue
data = src.read(info.filename)
target_path = f"{dest_prefix}/{info.filename}" if dest_prefix else info.filename
target_zf.writestr(target_path, data)
def list_platforms(platforms_dir: str) -> list[str]:
"""List available platform names from YAML files."""
platforms = []
for f in sorted(Path(platforms_dir).glob("*.yml")):
if f.name.startswith("_"):
continue
platforms.append(f.stem)
return platforms
def main():
parser = argparse.ArgumentParser(description="Generate platform BIOS ZIP packs")
parser.add_argument("--platform", "-p", help="Platform name (e.g., retroarch)")
parser.add_argument("--all", action="store_true", help="Generate packs for all active platforms")
parser.add_argument("--include-archived", action="store_true", help="Include archived platforms")
parser.add_argument("--platforms-dir", default=DEFAULT_PLATFORMS_DIR)
parser.add_argument("--db", default=DEFAULT_DB_FILE, help="Path to database.json")
parser.add_argument("--bios-dir", default=DEFAULT_BIOS_DIR)
parser.add_argument("--output-dir", "-o", default=DEFAULT_OUTPUT_DIR)
parser.add_argument("--list", action="store_true", help="List available platforms")
args = parser.parse_args()
if args.list:
platforms = list_platforms(args.platforms_dir)
for p in platforms:
print(p)
return
if args.all:
sys.path.insert(0, os.path.dirname(__file__))
from list_platforms import list_platforms as _list_active
platforms = _list_active(include_archived=args.include_archived)
elif args.platform:
platforms = [args.platform]
else:
parser.error("Specify --platform or --all")
return
groups = _group_identical_platforms(platforms, args.platforms_dir)
for group_platforms, representative in groups:
if len(group_platforms) > 1:
names = [load_platform_config(p, args.platforms_dir).get("platform", p) for p in group_platforms]
combined_name = " + ".join(names)
print(f"\nGenerating shared pack for {combined_name}...")
else:
print(f"\nGenerating pack for {representative}...")
try:
zip_path = generate_pack(representative, args.platforms_dir, args.db, args.bios_dir, args.output_dir)
if zip_path and len(group_platforms) > 1:
# Rename ZIP to include all platform names
names = [load_platform_config(p, args.platforms_dir).get("platform", p) for p in group_platforms]
combined_filename = "_".join(n.replace(" ", "") for n in names) + "_BIOS_Pack.zip"
new_path = os.path.join(os.path.dirname(zip_path), combined_filename)
if new_path != zip_path:
os.rename(zip_path, new_path)
print(f" Renamed -> {os.path.basename(new_path)}")
except FileNotFoundError as e:
print(f" ERROR: {e}")
except Exception as e:
print(f" ERROR: {e}")
def _group_identical_platforms(platforms: list[str], platforms_dir: str) -> list[tuple[list[str], str]]:
"""Group platforms that would produce identical ZIP packs.
Returns [(group_of_platform_names, representative_platform), ...].
Platforms with the same resolved systems+files+base_destination are grouped.
"""
import hashlib as _hl
fingerprints = {}
representatives = {}
for platform in platforms:
try:
config = load_platform_config(platform, platforms_dir)
except FileNotFoundError:
fingerprints.setdefault(platform, []).append(platform)
representatives.setdefault(platform, platform)
continue
base_dest = config.get("base_destination", "")
entries = []
for sys_id, system in sorted(config.get("systems", {}).items()):
for fe in system.get("files", []):
dest = fe.get("destination", fe.get("name", ""))
full_dest = f"{base_dest}/{dest}" if base_dest else dest
entries.append(full_dest)
fingerprint = _hl.sha1("|".join(sorted(entries)).encode()).hexdigest()
fingerprints.setdefault(fingerprint, []).append(platform)
representatives.setdefault(fingerprint, platform)
return [(group, representatives[fp]) for fp, group in fingerprints.items()]
if __name__ == "__main__":
main()