mirror of
https://github.com/Abdess/retroarch_system.git
synced 2026-04-13 12:22:33 -05:00
feat: pack integrity verification, manifests, SHA256SUMS
post-generation verification: reopen each ZIP, hash every file, check against database.json. inject manifest.json inside each pack (self-documenting: path, sha1, md5, size, status per file). generate SHA256SUMS.txt alongside packs for download verification. validation index now uses sets for hashes and sizes to support multiple valid ROM versions (MT-32 v1.04-v2.07, CM-32L variants). 69 tests pass, pipeline complete.
This commit is contained in:
@@ -924,6 +924,170 @@ def main():
|
||||
except (FileNotFoundError, OSError, yaml.YAMLError) as e:
|
||||
print(f" ERROR: {e}")
|
||||
|
||||
# Post-generation: verify all packs + inject manifests + SHA256SUMS
|
||||
if not args.list_emulators and not args.list_systems:
|
||||
print("\nVerifying packs and generating manifests...")
|
||||
all_ok = verify_and_finalize_packs(args.output_dir, db)
|
||||
if not all_ok:
|
||||
print("WARNING: some packs have verification errors")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Post-generation pack verification + manifest + SHA256SUMS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def verify_pack(zip_path: str, db: dict) -> tuple[bool, dict]:
|
||||
"""Verify a generated pack ZIP by re-hashing every file inside.
|
||||
|
||||
Opens the ZIP, computes SHA1 for each file, and checks against
|
||||
database.json. Returns (all_ok, manifest_dict).
|
||||
|
||||
The manifest contains per-file metadata for self-documentation.
|
||||
"""
|
||||
files_db = db.get("files", {}) # SHA1 -> file_info
|
||||
by_md5 = db.get("indexes", {}).get("by_md5", {}) # MD5 -> SHA1
|
||||
manifest = {
|
||||
"version": 1,
|
||||
"generator": "retrobios generate_pack.py",
|
||||
"generated": __import__("datetime").datetime.now(
|
||||
__import__("datetime").timezone.utc
|
||||
).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"files": [],
|
||||
}
|
||||
errors = []
|
||||
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
for info in zf.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
name = info.filename
|
||||
if name.startswith("INSTRUCTIONS_") or name == "manifest.json":
|
||||
continue
|
||||
with zf.open(info) as f:
|
||||
data = f.read()
|
||||
sha1 = hashlib.sha1(data).hexdigest()
|
||||
md5 = hashlib.md5(data).hexdigest()
|
||||
size = len(data)
|
||||
|
||||
# Look up in database: files_db keyed by SHA1
|
||||
db_entry = files_db.get(sha1)
|
||||
status = "verified"
|
||||
file_name = ""
|
||||
if db_entry:
|
||||
file_name = db_entry.get("name", "")
|
||||
else:
|
||||
# Try MD5 -> SHA1 lookup
|
||||
ref_sha1 = by_md5.get(md5)
|
||||
if ref_sha1:
|
||||
db_entry = files_db.get(ref_sha1)
|
||||
if db_entry:
|
||||
file_name = db_entry.get("name", "")
|
||||
status = "verified_md5"
|
||||
else:
|
||||
status = "untracked"
|
||||
else:
|
||||
status = "untracked"
|
||||
|
||||
manifest["files"].append({
|
||||
"path": name,
|
||||
"sha1": sha1,
|
||||
"md5": md5,
|
||||
"size": size,
|
||||
"status": status,
|
||||
"name": file_name,
|
||||
})
|
||||
|
||||
# Corruption check: SHA1 in DB but doesn't match what we computed
|
||||
# This should never happen (we looked up by SHA1), but catches
|
||||
# edge cases where by_md5 resolved to a different SHA1
|
||||
if db_entry and status == "verified_md5":
|
||||
expected_sha1 = db_entry.get("sha1", "")
|
||||
if expected_sha1 and expected_sha1.lower() != sha1.lower():
|
||||
errors.append(f"{name}: SHA1 mismatch (expected {expected_sha1}, got {sha1})")
|
||||
|
||||
verified = sum(1 for f in manifest["files"] if f["status"] == "verified")
|
||||
untracked = sum(1 for f in manifest["files"] if f["status"] == "untracked")
|
||||
total = len(manifest["files"])
|
||||
manifest["summary"] = {
|
||||
"total_files": total,
|
||||
"verified": verified,
|
||||
"untracked": untracked,
|
||||
"errors": len(errors),
|
||||
}
|
||||
manifest["errors"] = errors
|
||||
|
||||
all_ok = len(errors) == 0
|
||||
return all_ok, manifest
|
||||
|
||||
|
||||
def inject_manifest(zip_path: str, manifest: dict) -> None:
|
||||
"""Inject manifest.json into an existing ZIP pack."""
|
||||
import tempfile as _tempfile
|
||||
manifest_json = json.dumps(manifest, indent=2, ensure_ascii=False)
|
||||
|
||||
# ZipFile doesn't support appending to existing entries,
|
||||
# so we rebuild with the manifest added
|
||||
tmp_fd, tmp_path = _tempfile.mkstemp(suffix=".zip", dir=os.path.dirname(zip_path))
|
||||
os.close(tmp_fd)
|
||||
try:
|
||||
with zipfile.ZipFile(zip_path, "r") as src, \
|
||||
zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as dst:
|
||||
for item in src.infolist():
|
||||
if item.filename == "manifest.json":
|
||||
continue # replace existing
|
||||
dst.writestr(item, src.read(item.filename))
|
||||
dst.writestr("manifest.json", manifest_json)
|
||||
os.replace(tmp_path, zip_path)
|
||||
except Exception:
|
||||
os.unlink(tmp_path)
|
||||
raise
|
||||
|
||||
|
||||
def generate_sha256sums(output_dir: str) -> str | None:
|
||||
"""Generate SHA256SUMS.txt for all ZIP files in output_dir."""
|
||||
sums_path = os.path.join(output_dir, "SHA256SUMS.txt")
|
||||
entries = []
|
||||
for name in sorted(os.listdir(output_dir)):
|
||||
if not name.endswith(".zip"):
|
||||
continue
|
||||
path = os.path.join(output_dir, name)
|
||||
sha256 = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(65536), b""):
|
||||
sha256.update(chunk)
|
||||
entries.append(f"{sha256.hexdigest()} {name}")
|
||||
if not entries:
|
||||
return None
|
||||
with open(sums_path, "w") as f:
|
||||
f.write("\n".join(entries) + "\n")
|
||||
print(f"\n{sums_path}: {len(entries)} pack checksums")
|
||||
return sums_path
|
||||
|
||||
|
||||
def verify_and_finalize_packs(output_dir: str, db: dict) -> bool:
|
||||
"""Verify all packs, inject manifests, generate SHA256SUMS.
|
||||
|
||||
Returns True if all packs pass verification.
|
||||
"""
|
||||
all_ok = True
|
||||
for name in sorted(os.listdir(output_dir)):
|
||||
if not name.endswith(".zip"):
|
||||
continue
|
||||
zip_path = os.path.join(output_dir, name)
|
||||
ok, manifest = verify_pack(zip_path, db)
|
||||
summary = manifest["summary"]
|
||||
status = "OK" if ok else "ERRORS"
|
||||
print(f" verify {name}: {summary['verified']}/{summary['total_files']} verified, "
|
||||
f"{summary['untracked']} untracked, {summary['errors']} errors [{status}]")
|
||||
if not ok:
|
||||
for err in manifest["errors"]:
|
||||
print(f" ERROR: {err}")
|
||||
all_ok = False
|
||||
inject_manifest(zip_path, manifest)
|
||||
generate_sha256sums(output_dir)
|
||||
return all_ok
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -122,10 +122,10 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
|
||||
continue
|
||||
if fname not in index:
|
||||
index[fname] = {
|
||||
"checks": set(), "size": None,
|
||||
"checks": set(), "sizes": set(),
|
||||
"min_size": None, "max_size": None,
|
||||
"crc32": None, "md5": None, "sha1": None, "sha256": None,
|
||||
"adler32": None, "crypto_only": set(),
|
||||
"crc32": set(), "md5": set(), "sha1": set(), "sha256": set(),
|
||||
"adler32": set(), "crypto_only": set(),
|
||||
}
|
||||
sources[fname] = {}
|
||||
index[fname]["checks"].update(checks)
|
||||
@@ -136,51 +136,23 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
|
||||
# Size checks
|
||||
if "size" in checks:
|
||||
if f.get("size") is not None:
|
||||
new_size = f["size"]
|
||||
prev_size = index[fname]["size"]
|
||||
if prev_size is not None and prev_size != new_size:
|
||||
prev_emu = sources[fname].get("size", "?")
|
||||
raise ValueError(
|
||||
f"validation conflict for '{fname}': "
|
||||
f"size={prev_size} ({prev_emu}) vs size={new_size} ({emu_name})"
|
||||
)
|
||||
index[fname]["size"] = new_size
|
||||
sources[fname]["size"] = emu_name
|
||||
index[fname]["sizes"].add(f["size"])
|
||||
if f.get("min_size") is not None:
|
||||
index[fname]["min_size"] = f["min_size"]
|
||||
cur = index[fname]["min_size"]
|
||||
index[fname]["min_size"] = min(cur, f["min_size"]) if cur is not None else f["min_size"]
|
||||
if f.get("max_size") is not None:
|
||||
index[fname]["max_size"] = f["max_size"]
|
||||
# Hash checks (crc32, md5, sha1, adler32)
|
||||
cur = index[fname]["max_size"]
|
||||
index[fname]["max_size"] = max(cur, f["max_size"]) if cur is not None else f["max_size"]
|
||||
# Hash checks — collect all accepted hashes as sets (multiple valid
|
||||
# versions of the same file, e.g. MT-32 ROM versions)
|
||||
if "crc32" in checks and f.get("crc32"):
|
||||
new_crc = f["crc32"].lower()
|
||||
if new_crc.startswith("0x"):
|
||||
new_crc = new_crc[2:]
|
||||
prev_crc = index[fname]["crc32"]
|
||||
if prev_crc is not None:
|
||||
norm_prev = prev_crc.lower()
|
||||
if norm_prev.startswith("0x"):
|
||||
norm_prev = norm_prev[2:]
|
||||
if norm_prev != new_crc:
|
||||
prev_emu = sources[fname].get("crc32", "?")
|
||||
raise ValueError(
|
||||
f"validation conflict for '{fname}': "
|
||||
f"crc32={prev_crc} ({prev_emu}) vs crc32={f['crc32']} ({emu_name})"
|
||||
)
|
||||
index[fname]["crc32"] = f["crc32"]
|
||||
sources[fname]["crc32"] = emu_name
|
||||
norm = f["crc32"].lower()
|
||||
if norm.startswith("0x"):
|
||||
norm = norm[2:]
|
||||
index[fname]["crc32"].add(norm)
|
||||
for hash_type in ("md5", "sha1", "sha256"):
|
||||
if hash_type in checks and f.get(hash_type):
|
||||
new_hash = f[hash_type].lower()
|
||||
prev_hash = index[fname][hash_type]
|
||||
if prev_hash is not None and prev_hash.lower() != new_hash:
|
||||
prev_emu = sources[fname].get(hash_type, "?")
|
||||
raise ValueError(
|
||||
f"validation conflict for '{fname}': "
|
||||
f"{hash_type}={prev_hash} ({prev_emu}) vs "
|
||||
f"{hash_type}={f[hash_type]} ({emu_name})"
|
||||
)
|
||||
index[fname][hash_type] = f[hash_type]
|
||||
sources[fname][hash_type] = emu_name
|
||||
index[fname][hash_type].add(f[hash_type].lower())
|
||||
# Adler32 — stored as known_hash_adler32 field (not in validation: list
|
||||
# for Dolphin, but support it in both forms for future profiles)
|
||||
adler_val = f.get("known_hash_adler32") or f.get("adler32")
|
||||
@@ -188,19 +160,12 @@ def _build_validation_index(profiles: dict) -> dict[str, dict]:
|
||||
norm = adler_val.lower()
|
||||
if norm.startswith("0x"):
|
||||
norm = norm[2:]
|
||||
prev_adler = index[fname]["adler32"]
|
||||
if prev_adler is not None and prev_adler != norm:
|
||||
prev_emu = sources[fname].get("adler32", "?")
|
||||
raise ValueError(
|
||||
f"validation conflict for '{fname}': "
|
||||
f"adler32={prev_adler} ({prev_emu}) vs adler32={norm} ({emu_name})"
|
||||
)
|
||||
index[fname]["adler32"] = norm
|
||||
sources[fname]["adler32"] = emu_name
|
||||
# Convert sets to sorted lists for determinism
|
||||
index[fname]["adler32"].add(norm)
|
||||
# Convert sets to sorted tuples/lists for determinism
|
||||
for v in index.values():
|
||||
v["checks"] = sorted(v["checks"])
|
||||
v["crypto_only"] = sorted(v["crypto_only"])
|
||||
# Keep hash sets as frozensets for O(1) lookup in check_file_validation
|
||||
return index
|
||||
|
||||
|
||||
@@ -221,46 +186,45 @@ def check_file_validation(
|
||||
return None
|
||||
checks = entry["checks"]
|
||||
|
||||
# Size checks
|
||||
# Size checks — sizes is a set of accepted values
|
||||
if "size" in checks:
|
||||
actual_size = os.path.getsize(local_path)
|
||||
if entry["size"] is not None and actual_size != entry["size"]:
|
||||
return f"size mismatch: expected {entry['size']}, got {actual_size}"
|
||||
if entry["sizes"] and actual_size not in entry["sizes"]:
|
||||
expected = ",".join(str(s) for s in sorted(entry["sizes"]))
|
||||
return f"size mismatch: got {actual_size}, accepted [{expected}]"
|
||||
if entry["min_size"] is not None and actual_size < entry["min_size"]:
|
||||
return f"size too small: min {entry['min_size']}, got {actual_size}"
|
||||
if entry["max_size"] is not None and actual_size > entry["max_size"]:
|
||||
return f"size too large: max {entry['max_size']}, got {actual_size}"
|
||||
|
||||
# Hash checks — compute once, reuse for all hash types
|
||||
# Hash checks — compute once, reuse for all hash types.
|
||||
# Each hash field is a set of accepted values (multiple valid ROM versions).
|
||||
need_hashes = (
|
||||
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1"))
|
||||
any(h in checks and entry.get(h) for h in ("crc32", "md5", "sha1", "sha256"))
|
||||
or entry.get("adler32")
|
||||
)
|
||||
if need_hashes:
|
||||
hashes = compute_hashes(local_path)
|
||||
if "crc32" in checks and entry["crc32"]:
|
||||
expected_crc = entry["crc32"].lower()
|
||||
if expected_crc.startswith("0x"):
|
||||
expected_crc = expected_crc[2:]
|
||||
if hashes["crc32"].lower() != expected_crc:
|
||||
return f"crc32 mismatch: expected {entry['crc32']}, got {hashes['crc32']}"
|
||||
if hashes["crc32"].lower() not in entry["crc32"]:
|
||||
expected = ",".join(sorted(entry["crc32"]))
|
||||
return f"crc32 mismatch: got {hashes['crc32']}, accepted [{expected}]"
|
||||
if "md5" in checks and entry["md5"]:
|
||||
if hashes["md5"].lower() != entry["md5"].lower():
|
||||
return f"md5 mismatch: expected {entry['md5']}, got {hashes['md5']}"
|
||||
if hashes["md5"].lower() not in entry["md5"]:
|
||||
expected = ",".join(sorted(entry["md5"]))
|
||||
return f"md5 mismatch: got {hashes['md5']}, accepted [{expected}]"
|
||||
if "sha1" in checks and entry["sha1"]:
|
||||
if hashes["sha1"].lower() != entry["sha1"].lower():
|
||||
return f"sha1 mismatch: expected {entry['sha1']}, got {hashes['sha1']}"
|
||||
if hashes["sha1"].lower() not in entry["sha1"]:
|
||||
expected = ",".join(sorted(entry["sha1"]))
|
||||
return f"sha1 mismatch: got {hashes['sha1']}, accepted [{expected}]"
|
||||
if "sha256" in checks and entry["sha256"]:
|
||||
if hashes["sha256"].lower() != entry["sha256"].lower():
|
||||
return f"sha256 mismatch: expected {entry['sha256']}, got {hashes['sha256']}"
|
||||
# Adler32 — check if known_hash_adler32 is available (even if not
|
||||
# in the validation: list, Dolphin uses it as informational check)
|
||||
if hashes["sha256"].lower() not in entry["sha256"]:
|
||||
expected = ",".join(sorted(entry["sha256"]))
|
||||
return f"sha256 mismatch: got {hashes['sha256']}, accepted [{expected}]"
|
||||
if entry["adler32"]:
|
||||
if hashes["adler32"].lower() != entry["adler32"]:
|
||||
return (
|
||||
f"adler32 mismatch: expected 0x{entry['adler32']}, "
|
||||
f"got 0x{hashes['adler32']}"
|
||||
)
|
||||
if hashes["adler32"].lower() not in entry["adler32"]:
|
||||
expected = ",".join(sorted(entry["adler32"]))
|
||||
return f"adler32 mismatch: got 0x{hashes['adler32']}, accepted [{expected}]"
|
||||
|
||||
# Signature/crypto checks (3DS RSA, AES)
|
||||
if entry["crypto_only"]:
|
||||
|
||||
Reference in New Issue
Block a user