mirror of
https://github.com/Abdess/retroarch_system.git
synced 2026-04-13 12:22:33 -05:00
- fix urllib.parse.quote import (was urllib.request.quote) - add operator precedence parens in generate_pack dedup check - narrow bare except to specific types in batocera target scraper - cache load_platform_config and build_zip_contents_index results - add selective algorithm support to compute_hashes - atomic write for fetch_large_file (tmp + rename) - add response size limit to base scraper fetch - extract build_target_cores_cache to common.py (dedup verify/pack) - hoist _build_supplemental_index out of per-platform loop - migrate function-attribute caches to module-level dicts - add @abstractmethod to BaseTargetScraper.fetch_targets - remove backward-compat re-exports from common.py - replace em-dashes and unicode arrows with ASCII equivalents - remove decorative section dividers and obvious comments
198 lines
6.8 KiB
Python
198 lines
6.8 KiB
Python
"""Deterministic ZIP builder for MAME BIOS archives.
|
|
|
|
Creates byte-identical ZIP files from individual ROM atoms, enabling:
|
|
- Reproducible builds: same ROMs -> same ZIP hash, always
|
|
- Version-agnostic assembly: build neogeo.zip for any MAME version
|
|
- Deduplication: store ROM atoms once, assemble any ZIP on demand
|
|
|
|
A ZIP's hash depends on: file content, filenames, order, timestamps,
|
|
compression, and permissions. This module fixes all metadata to produce
|
|
deterministic output.
|
|
|
|
Usage:
|
|
from deterministic_zip import build_deterministic_zip, extract_atoms
|
|
|
|
# Extract atoms from an existing ZIP
|
|
atoms = extract_atoms("neogeo.zip")
|
|
|
|
# Build a ZIP from a recipe
|
|
recipe = [
|
|
{"name": "sp-s2.sp1", "crc32": "9036d879"},
|
|
{"name": "000-lo.lo", "crc32": "5a86cff2"},
|
|
]
|
|
build_deterministic_zip("neogeo.zip", recipe, atom_store)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import struct
|
|
import zipfile
|
|
import zlib
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
# Fixed metadata for deterministic ZIPs
|
|
_FIXED_DATE_TIME = (1980, 1, 1, 0, 0, 0) # minimum ZIP timestamp
|
|
_FIXED_CREATE_SYSTEM = 0 # FAT/DOS (most compatible)
|
|
_FIXED_EXTERNAL_ATTR = 0o100644 << 16 # -rw-r--r--
|
|
_COMPRESS_LEVEL = 9 # deflate level 9 for determinism
|
|
|
|
|
|
def build_deterministic_zip(
|
|
output_path: str | Path,
|
|
recipe: list[dict],
|
|
atom_store: dict[str, bytes],
|
|
compression: int = zipfile.ZIP_DEFLATED,
|
|
) -> str:
|
|
"""Build a deterministic ZIP from a recipe and atom store.
|
|
|
|
Args:
|
|
output_path: Path for the output ZIP file.
|
|
recipe: List of dicts with 'name' and 'crc32' (lowercase hex, no 0x).
|
|
Files are sorted by name for determinism.
|
|
atom_store: Dict mapping CRC32 (lowercase hex) to ROM binary data.
|
|
compression: ZIP_DEFLATED (default) or ZIP_STORED.
|
|
|
|
Returns:
|
|
SHA1 hex digest of the generated ZIP.
|
|
|
|
Raises:
|
|
KeyError: If a recipe CRC32 is not found in the atom store.
|
|
ValueError: If a ROM's actual CRC32 doesn't match the recipe.
|
|
"""
|
|
# Sort by filename for deterministic order
|
|
sorted_recipe = sorted(recipe, key=lambda r: r["name"])
|
|
|
|
with zipfile.ZipFile(str(output_path), "w", compression, compresslevel=_COMPRESS_LEVEL) as zf:
|
|
for entry in sorted_recipe:
|
|
name = entry["name"]
|
|
expected_crc = entry.get("crc32", "").lower()
|
|
|
|
if expected_crc not in atom_store:
|
|
raise KeyError(
|
|
f"ROM atom not found: {name} (crc32={expected_crc}). "
|
|
f"Available: {len(atom_store)} atoms"
|
|
)
|
|
|
|
data = atom_store[expected_crc]
|
|
|
|
# Verify CRC32 of the atom data
|
|
actual_crc = format(zlib.crc32(data) & 0xFFFFFFFF, "08x")
|
|
if expected_crc and actual_crc != expected_crc:
|
|
raise ValueError(
|
|
f"CRC32 mismatch for {name}: expected {expected_crc}, got {actual_crc}"
|
|
)
|
|
|
|
# Create ZipInfo with fixed metadata
|
|
info = zipfile.ZipInfo(filename=name, date_time=_FIXED_DATE_TIME)
|
|
info.compress_type = compression
|
|
info.create_system = _FIXED_CREATE_SYSTEM
|
|
info.external_attr = _FIXED_EXTERNAL_ATTR
|
|
|
|
zf.writestr(info, data)
|
|
|
|
# Compute and return the ZIP's SHA1
|
|
sha1 = hashlib.sha1()
|
|
with open(output_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
sha1.update(chunk)
|
|
return sha1.hexdigest()
|
|
|
|
|
|
def extract_atoms(zip_path: str | Path) -> dict[str, bytes]:
|
|
"""Extract all ROM atoms from a ZIP, indexed by CRC32.
|
|
|
|
Returns: Dict mapping CRC32 (lowercase hex) to raw ROM data.
|
|
"""
|
|
atoms: dict[str, bytes] = {}
|
|
with zipfile.ZipFile(str(zip_path), "r") as zf:
|
|
for info in zf.infolist():
|
|
if info.is_dir():
|
|
continue
|
|
data = zf.read(info.filename)
|
|
crc = format(zlib.crc32(data) & 0xFFFFFFFF, "08x")
|
|
atoms[crc] = data
|
|
return atoms
|
|
|
|
|
|
def extract_atoms_with_names(zip_path: str | Path) -> list[dict]:
|
|
"""Extract atoms with full metadata from a ZIP.
|
|
|
|
Returns: List of dicts with 'name', 'crc32', 'size', 'data'.
|
|
"""
|
|
result = []
|
|
with zipfile.ZipFile(str(zip_path), "r") as zf:
|
|
for info in sorted(zf.infolist(), key=lambda i: i.filename):
|
|
if info.is_dir():
|
|
continue
|
|
data = zf.read(info.filename)
|
|
crc = format(zlib.crc32(data) & 0xFFFFFFFF, "08x")
|
|
result.append({
|
|
"name": info.filename,
|
|
"crc32": crc,
|
|
"size": len(data),
|
|
"data": data,
|
|
})
|
|
return result
|
|
|
|
|
|
def verify_zip_determinism(zip_path: str | Path) -> tuple[bool, str, str]:
|
|
"""Verify a ZIP can be rebuilt deterministically.
|
|
|
|
Extracts atoms, rebuilds the ZIP, compares hashes.
|
|
|
|
Returns: (is_deterministic, original_sha1, rebuilt_sha1)
|
|
"""
|
|
# Hash the original
|
|
orig_sha1 = hashlib.sha1(Path(zip_path).read_bytes()).hexdigest()
|
|
|
|
# Extract atoms
|
|
atoms_list = extract_atoms_with_names(zip_path)
|
|
atom_store = {a["crc32"]: a["data"] for a in atoms_list}
|
|
recipe = [{"name": a["name"], "crc32": a["crc32"]} for a in atoms_list]
|
|
|
|
# Rebuild to memory
|
|
buf = BytesIO()
|
|
sorted_recipe = sorted(recipe, key=lambda r: r["name"])
|
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED, compresslevel=_COMPRESS_LEVEL) as zf:
|
|
for entry in sorted_recipe:
|
|
info = zipfile.ZipInfo(filename=entry["name"], date_time=_FIXED_DATE_TIME)
|
|
info.compress_type = zipfile.ZIP_DEFLATED
|
|
info.create_system = _FIXED_CREATE_SYSTEM
|
|
info.external_attr = _FIXED_EXTERNAL_ATTR
|
|
zf.writestr(info, atom_store[entry["crc32"]])
|
|
|
|
rebuilt_sha1 = hashlib.sha1(buf.getvalue()).hexdigest()
|
|
return orig_sha1 == rebuilt_sha1, orig_sha1, rebuilt_sha1
|
|
|
|
|
|
def rebuild_zip_deterministic(
|
|
source_zip: str | Path,
|
|
output_zip: str | Path,
|
|
) -> str:
|
|
"""Rebuild an existing ZIP deterministically.
|
|
|
|
Extracts all files, reassembles with fixed metadata.
|
|
Returns the SHA1 of the new ZIP.
|
|
"""
|
|
atoms_list = extract_atoms_with_names(source_zip)
|
|
atom_store = {a["crc32"]: a["data"] for a in atoms_list}
|
|
recipe = [{"name": a["name"], "crc32": a["crc32"]} for a in atoms_list]
|
|
return build_deterministic_zip(output_zip, recipe, atom_store)
|
|
|
|
|
|
def build_atom_store_from_zips(zip_dir: str | Path) -> dict[str, bytes]:
|
|
"""Build a global atom store from all ZIPs in a directory.
|
|
|
|
Scans all .zip files, extracts every ROM, indexes by CRC32.
|
|
Identical ROMs (same CRC32) from different ZIPs are stored once.
|
|
"""
|
|
store: dict[str, bytes] = {}
|
|
for zip_path in sorted(Path(zip_dir).rglob("*.zip")):
|
|
try:
|
|
atoms = extract_atoms(zip_path)
|
|
store.update(atoms)
|
|
except zipfile.BadZipFile:
|
|
continue
|
|
return store
|