diff --git a/scripts/generate_pack.py b/scripts/generate_pack.py index 95af7ae5..5eef8cc3 100644 --- a/scripts/generate_pack.py +++ b/scripts/generate_pack.py @@ -924,6 +924,170 @@ def main(): except (FileNotFoundError, OSError, yaml.YAMLError) as e: print(f" ERROR: {e}") + # Post-generation: verify all packs + inject manifests + SHA256SUMS + if not args.list_emulators and not args.list_systems: + print("\nVerifying packs and generating manifests...") + all_ok = verify_and_finalize_packs(args.output_dir, db) + if not all_ok: + print("WARNING: some packs have verification errors") + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Post-generation pack verification + manifest + SHA256SUMS +# --------------------------------------------------------------------------- + +def verify_pack(zip_path: str, db: dict) -> tuple[bool, dict]: + """Verify a generated pack ZIP by re-hashing every file inside. + + Opens the ZIP, computes SHA1 for each file, and checks against + database.json. Returns (all_ok, manifest_dict). + + The manifest contains per-file metadata for self-documentation. + """ + files_db = db.get("files", {}) # SHA1 -> file_info + by_md5 = db.get("indexes", {}).get("by_md5", {}) # MD5 -> SHA1 + manifest = { + "version": 1, + "generator": "retrobios generate_pack.py", + "generated": __import__("datetime").datetime.now( + __import__("datetime").timezone.utc + ).strftime("%Y-%m-%dT%H:%M:%SZ"), + "files": [], + } + errors = [] + + with zipfile.ZipFile(zip_path, "r") as zf: + for info in zf.infolist(): + if info.is_dir(): + continue + name = info.filename + if name.startswith("INSTRUCTIONS_") or name == "manifest.json": + continue + with zf.open(info) as f: + data = f.read() + sha1 = hashlib.sha1(data).hexdigest() + md5 = hashlib.md5(data).hexdigest() + size = len(data) + + # Look up in database: files_db keyed by SHA1 + db_entry = files_db.get(sha1) + status = "verified" + file_name = "" + if db_entry: + file_name = db_entry.get("name", "") + else: + # Try MD5 -> SHA1 lookup + ref_sha1 = by_md5.get(md5) + if ref_sha1: + db_entry = files_db.get(ref_sha1) + if db_entry: + file_name = db_entry.get("name", "") + status = "verified_md5" + else: + status = "untracked" + else: + status = "untracked" + + manifest["files"].append({ + "path": name, + "sha1": sha1, + "md5": md5, + "size": size, + "status": status, + "name": file_name, + }) + + # Corruption check: SHA1 in DB but doesn't match what we computed + # This should never happen (we looked up by SHA1), but catches + # edge cases where by_md5 resolved to a different SHA1 + if db_entry and status == "verified_md5": + expected_sha1 = db_entry.get("sha1", "") + if expected_sha1 and expected_sha1.lower() != sha1.lower(): + errors.append(f"{name}: SHA1 mismatch (expected {expected_sha1}, got {sha1})") + + verified = sum(1 for f in manifest["files"] if f["status"] == "verified") + untracked = sum(1 for f in manifest["files"] if f["status"] == "untracked") + total = len(manifest["files"]) + manifest["summary"] = { + "total_files": total, + "verified": verified, + "untracked": untracked, + "errors": len(errors), + } + manifest["errors"] = errors + + all_ok = len(errors) == 0 + return all_ok, manifest + + +def inject_manifest(zip_path: str, manifest: dict) -> None: + """Inject manifest.json into an existing ZIP pack.""" + import tempfile as _tempfile + manifest_json = json.dumps(manifest, indent=2, ensure_ascii=False) + + # ZipFile doesn't support appending to existing entries, + # so we rebuild with the manifest added + tmp_fd, tmp_path = _tempfile.mkstemp(suffix=".zip", dir=os.path.dirname(zip_path)) + os.close(tmp_fd) + try: + with zipfile.ZipFile(zip_path, "r") as src, \ + zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as dst: + for item in src.infolist(): + if item.filename == "manifest.json": + continue # replace existing + dst.writestr(item, src.read(item.filename)) + dst.writestr("manifest.json", manifest_json) + os.replace(tmp_path, zip_path) + except Exception: + os.unlink(tmp_path) + raise + + +def generate_sha256sums(output_dir: str) -> str | None: + """Generate SHA256SUMS.txt for all ZIP files in output_dir.""" + sums_path = os.path.join(output_dir, "SHA256SUMS.txt") + entries = [] + for name in sorted(os.listdir(output_dir)): + if not name.endswith(".zip"): + continue + path = os.path.join(output_dir, name) + sha256 = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + sha256.update(chunk) + entries.append(f"{sha256.hexdigest()} {name}") + if not entries: + return None + with open(sums_path, "w") as f: + f.write("\n".join(entries) + "\n") + print(f"\n{sums_path}: {len(entries)} pack checksums") + return sums_path + + +def verify_and_finalize_packs(output_dir: str, db: dict) -> bool: + """Verify all packs, inject manifests, generate SHA256SUMS. + + Returns True if all packs pass verification. + """ + all_ok = True + for name in sorted(os.listdir(output_dir)): + if not name.endswith(".zip"): + continue + zip_path = os.path.join(output_dir, name) + ok, manifest = verify_pack(zip_path, db) + summary = manifest["summary"] + status = "OK" if ok else "ERRORS" + print(f" verify {name}: {summary['verified']}/{summary['total_files']} verified, " + f"{summary['untracked']} untracked, {summary['errors']} errors [{status}]") + if not ok: + for err in manifest["errors"]: + print(f" ERROR: {err}") + all_ok = False + inject_manifest(zip_path, manifest) + generate_sha256sums(output_dir) + return all_ok + if __name__ == "__main__": main() diff --git a/scripts/verify.py b/scripts/verify.py index cd750c3a..76d162c8 100644 --- a/scripts/verify.py +++ b/scripts/verify.py @@ -122,10 +122,10 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]: continue if fname not in index: index[fname] = { - "checks": set(), "size": None, + "checks": set(), "sizes": set(), "min_size": None, "max_size": None, - "crc32": None, "md5": None, "sha1": None, "sha256": None, - "adler32": None, "crypto_only": set(), + "crc32": set(), "md5": set(), "sha1": set(), "sha256": set(), + "adler32": set(), "crypto_only": set(), } sources[fname] = {} index[fname]["checks"].update(checks) @@ -136,51 +136,23 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]: # Size checks if "size" in checks: if f.get("size") is not None: - new_size = f["size"] - prev_size = index[fname]["size"] - if prev_size is not None and prev_size != new_size: - prev_emu = sources[fname].get("size", "?") - raise ValueError( - f"validation conflict for '{fname}': " - f"size={prev_size} ({prev_emu}) vs size={new_size} ({emu_name})" - ) - index[fname]["size"] = new_size - sources[fname]["size"] = emu_name + index[fname]["sizes"].add(f["size"]) if f.get("min_size") is not None: - index[fname]["min_size"] = f["min_size"] + cur = index[fname]["min_size"] + index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"] if f.get("max_size") is not None: - index[fname]["max_size"] = f["max_size"] - # Hash checks (crc32, md5, sha1, adler32) + cur = index[fname]["max_size"] + index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"] + # Hash checks — collect all accepted hashes as sets (multiple valid + # versions of the same file, e.g. MT-32 ROM versions) if "crc32" in checks and f.get("crc32"): - new_crc = f["crc32"].lower() - if new_crc.startswith("0x"): - new_crc = new_crc[2:] - prev_crc = index[fname]["crc32"] - if prev_crc is not None: - norm_prev = prev_crc.lower() - if norm_prev.startswith("0x"): - norm_prev = norm_prev[2:] - if norm_prev != new_crc: - prev_emu = sources[fname].get("crc32", "?") - raise ValueError( - f"validation conflict for '{fname}': " - f"crc32={prev_crc} ({prev_emu}) vs crc32={f['crc32']} ({emu_name})" - ) - index[fname]["crc32"] = f["crc32"] - sources[fname]["crc32"] = emu_name + norm = f["crc32"].lower() + if norm.startswith("0x"): + norm = norm[2:] + index[fname]["crc32"].add(norm) for hash_type in ("md5", "sha1", "sha256"): if hash_type in checks and f.get(hash_type): - new_hash = f[hash_type].lower() - prev_hash = index[fname][hash_type] - if prev_hash is not None and prev_hash.lower() != new_hash: - prev_emu = sources[fname].get(hash_type, "?") - raise ValueError( - f"validation conflict for '{fname}': " - f"{hash_type}={prev_hash} ({prev_emu}) vs " - f"{hash_type}={f[hash_type]} ({emu_name})" - ) - index[fname][hash_type] = f[hash_type] - sources[fname][hash_type] = emu_name + index[fname][hash_type].add(f[hash_type].lower()) # Adler32 — stored as known_hash_adler32 field (not in validation: list # for Dolphin, but support it in both forms for future profiles) adler_val = f.get("known_hash_adler32") or f.get("adler32") @@ -188,19 +160,12 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]: norm = adler_val.lower() if norm.startswith("0x"): norm = norm[2:] - prev_adler = index[fname]["adler32"] - if prev_adler is not None and prev_adler != norm: - prev_emu = sources[fname].get("adler32", "?") - raise ValueError( - f"validation conflict for '{fname}': " - f"adler32={prev_adler} ({prev_emu}) vs adler32={norm} ({emu_name})" - ) - index[fname]["adler32"] = norm - sources[fname]["adler32"] = emu_name - # Convert sets to sorted lists for determinism + index[fname]["adler32"].add(norm) + # Convert sets to sorted tuples/lists for determinism for v in index.values(): v["checks"] = sorted(v["checks"]) v["crypto_only"] = sorted(v["crypto_only"]) + # Keep hash sets as frozensets for O(1) lookup in check_file_validation return index @@ -221,46 +186,45 @@ def check_file_validation( return None checks = entry["checks"] - # Size checks + # Size checks — sizes is a set of accepted values if "size" in checks: actual_size = os.path.getsize(local_path) - if entry["size"] is not None and actual_size != entry["size"]: - return f"size mismatch: expected {entry['size']}, got {actual_size}" + if entry["sizes"] and actual_size not in entry["sizes"]: + expected = ",".join(str(s) for s in sorted(entry["sizes"])) + return f"size mismatch: got {actual_size}, accepted [{expected}]" if entry["min_size"] is not None and actual_size < entry["min_size"]: return f"size too small: min {entry['min_size']}, got {actual_size}" if entry["max_size"] is not None and actual_size > entry["max_size"]: return f"size too large: max {entry['max_size']}, got {actual_size}" - # Hash checks — compute once, reuse for all hash types + # Hash checks — compute once, reuse for all hash types. + # Each hash field is a set of accepted values (multiple valid ROM versions). need_hashes = ( - any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1")) + any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256")) or entry.get("adler32") ) if need_hashes: hashes = compute_hashes(local_path) if "crc32" in checks and entry["crc32"]: - expected_crc = entry["crc32"].lower() - if expected_crc.startswith("0x"): - expected_crc = expected_crc[2:] - if hashes["crc32"].lower() != expected_crc: - return f"crc32 mismatch: expected {entry['crc32']}, got {hashes['crc32']}" + if hashes["crc32"].lower() not in entry["crc32"]: + expected = ",".join(sorted(entry["crc32"])) + return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]" if "md5" in checks and entry["md5"]: - if hashes["md5"].lower() != entry["md5"].lower(): - return f"md5 mismatch: expected {entry['md5']}, got {hashes['md5']}" + if hashes["md5"].lower() not in entry["md5"]: + expected = ",".join(sorted(entry["md5"])) + return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]" if "sha1" in checks and entry["sha1"]: - if hashes["sha1"].lower() != entry["sha1"].lower(): - return f"sha1 mismatch: expected {entry['sha1']}, got {hashes['sha1']}" + if hashes["sha1"].lower() not in entry["sha1"]: + expected = ",".join(sorted(entry["sha1"])) + return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]" if "sha256" in checks and entry["sha256"]: - if hashes["sha256"].lower() != entry["sha256"].lower(): - return f"sha256 mismatch: expected {entry['sha256']}, got {hashes['sha256']}" - # Adler32 — check if known_hash_adler32 is available (even if not - # in the validation: list, Dolphin uses it as informational check) + if hashes["sha256"].lower() not in entry["sha256"]: + expected = ",".join(sorted(entry["sha256"])) + return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]" if entry["adler32"]: - if hashes["adler32"].lower() != entry["adler32"]: - return ( - f"adler32 mismatch: expected 0x{entry['adler32']}, " - f"got 0x{hashes['adler32']}" - ) + if hashes["adler32"].lower() not in entry["adler32"]: + expected = ",".join(sorted(entry["adler32"])) + return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]" # Signature/crypto checks (3DS RSA, AES) if entry["crypto_only"]: diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 30140052..87d503f4 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -719,7 +719,7 @@ class TestE2E(unittest.TestCase): index = _build_validation_index(profiles) self.assertIn("present_req.bin", index) self.assertIn("size", index["present_req.bin"]["checks"]) - self.assertEqual(index["present_req.bin"]["size"], 16) + self.assertIn(16, index["present_req.bin"]["sizes"]) self.assertIn("correct_hash.bin", index) self.assertIn("crc32", index["correct_hash.bin"]["checks"]) @@ -779,8 +779,8 @@ class TestE2E(unittest.TestCase): reason = check_file_validation(path, "leading_zero_crc.bin", index) self.assertIsNone(reason) - def test_78_validation_conflict_raises(self): - """Conflicting size/crc32 from two profiles raises ValueError.""" + def test_78_validation_multi_size_accepted(self): + """Multiple valid sizes from different profiles are collected as a set.""" profiles = { "emu_a": { "type": "libretro", "files": [ @@ -793,10 +793,8 @@ class TestE2E(unittest.TestCase): ], }, } - with self.assertRaises(ValueError) as ctx: - _build_validation_index(profiles) - self.assertIn("validation conflict", str(ctx.exception)) - self.assertIn("shared.bin", str(ctx.exception)) + index = _build_validation_index(profiles) + self.assertEqual(index["shared.bin"]["sizes"], {512, 1024}) def test_79_validation_md5_pass(self): """File with correct MD5 passes validation."""