mirror of
https://github.com/Abdess/retroarch_system.git
synced 2026-04-13 12:22:33 -05:00
feat: add --from-md5 lookup and pack builder
This commit is contained in:
@@ -16,6 +16,7 @@ import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.request
|
||||
@@ -48,6 +49,96 @@ DEFAULT_OUTPUT_DIR = "dist"
|
||||
DEFAULT_BIOS_DIR = "bios"
|
||||
MAX_ENTRY_SIZE = 512 * 1024 * 1024 # 512MB
|
||||
|
||||
_HEX_RE = re.compile(r"\b([0-9a-fA-F]{8,40})\b")
|
||||
|
||||
|
||||
def _detect_hash_type(h: str) -> str:
|
||||
n = len(h)
|
||||
if n == 40:
|
||||
return "sha1"
|
||||
if n == 32:
|
||||
return "md5"
|
||||
if n == 8:
|
||||
return "crc32"
|
||||
return "md5"
|
||||
|
||||
|
||||
def parse_hash_input(raw: str) -> list[tuple[str, str]]:
|
||||
"""Parse comma-separated hash string into (type, hash) tuples."""
|
||||
results: list[tuple[str, str]] = []
|
||||
for part in raw.split(","):
|
||||
part = part.strip().lower()
|
||||
if not part:
|
||||
continue
|
||||
m = _HEX_RE.search(part)
|
||||
if m:
|
||||
h = m.group(1)
|
||||
results.append((_detect_hash_type(h), h))
|
||||
return results
|
||||
|
||||
|
||||
def parse_hash_file(path: str) -> list[tuple[str, str]]:
|
||||
"""Parse hash file (one per line, comments with #, mixed formats)."""
|
||||
results: list[tuple[str, str]] = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
m = _HEX_RE.search(line.lower())
|
||||
if m:
|
||||
h = m.group(1)
|
||||
results.append((_detect_hash_type(h), h))
|
||||
return results
|
||||
|
||||
|
||||
def lookup_hashes(
|
||||
hashes: list[tuple[str, str]],
|
||||
db: dict,
|
||||
bios_dir: str,
|
||||
emulators_dir: str,
|
||||
platforms_dir: str,
|
||||
) -> None:
|
||||
"""Print diagnostic info for each hash."""
|
||||
files_db = db.get("files", {})
|
||||
by_md5 = db.get("indexes", {}).get("by_md5", {})
|
||||
by_crc32 = db.get("indexes", {}).get("by_crc32", {})
|
||||
|
||||
for hash_type, hash_val in hashes:
|
||||
sha1 = None
|
||||
if hash_type == "sha1" and hash_val in files_db:
|
||||
sha1 = hash_val
|
||||
elif hash_type == "md5":
|
||||
sha1 = by_md5.get(hash_val)
|
||||
elif hash_type == "crc32":
|
||||
sha1 = by_crc32.get(hash_val)
|
||||
|
||||
if not sha1 or sha1 not in files_db:
|
||||
print(f"\n{hash_type.upper()}: {hash_val}")
|
||||
print(" NOT FOUND in database")
|
||||
continue
|
||||
|
||||
entry = files_db[sha1]
|
||||
name = entry.get("name", "?")
|
||||
md5 = entry.get("md5", "?")
|
||||
paths = entry.get("paths", [])
|
||||
aliases = entry.get("aliases", [])
|
||||
|
||||
print(f"\n{hash_type.upper()}: {hash_val}")
|
||||
print(f" SHA1: {sha1}")
|
||||
print(f" MD5: {md5}")
|
||||
print(f" Name: {name}")
|
||||
if paths:
|
||||
print(f" Path: {paths[0]}")
|
||||
if aliases:
|
||||
print(f" Aliases: {aliases}")
|
||||
|
||||
primary = os.path.join(bios_dir, paths[0]) if paths else None
|
||||
if primary and os.path.exists(primary):
|
||||
print(" In repo: YES")
|
||||
else:
|
||||
print(" In repo: NO")
|
||||
|
||||
|
||||
def _find_candidate_satisfying_both(
|
||||
file_entry: dict,
|
||||
@@ -966,6 +1057,128 @@ def generate_split_packs(
|
||||
return results
|
||||
|
||||
|
||||
def generate_md5_pack(
|
||||
hashes: list[tuple[str, str]],
|
||||
db: dict,
|
||||
bios_dir: str,
|
||||
output_dir: str,
|
||||
zip_contents: dict | None = None,
|
||||
platform_name: str | None = None,
|
||||
platforms_dir: str | None = None,
|
||||
emulator_name: str | None = None,
|
||||
emulators_dir: str | None = None,
|
||||
standalone: bool = False,
|
||||
) -> str | None:
|
||||
"""Build a pack from an explicit list of hashes with layout context."""
|
||||
files_db = db.get("files", {})
|
||||
by_md5 = db.get("indexes", {}).get("by_md5", {})
|
||||
by_crc32 = db.get("indexes", {}).get("by_crc32", {})
|
||||
if zip_contents is None:
|
||||
zip_contents = {}
|
||||
|
||||
plat_file_index: dict[str, dict] = {}
|
||||
base_dest = ""
|
||||
plat_display = "Custom"
|
||||
if platform_name and platforms_dir:
|
||||
config = load_platform_config(platform_name, platforms_dir)
|
||||
base_dest = config.get("base_destination", "")
|
||||
plat_display = config.get("platform", platform_name)
|
||||
for _sys_id, system in config.get("systems", {}).items():
|
||||
for fe in system.get("files", []):
|
||||
plat_file_index[fe.get("name", "").lower()] = fe
|
||||
|
||||
emu_pack_structure = None
|
||||
emu_display = ""
|
||||
if emulator_name and emulators_dir:
|
||||
profiles = load_emulator_profiles(emulators_dir, skip_aliases=False)
|
||||
if emulator_name in profiles:
|
||||
profile = profiles[emulator_name]
|
||||
emu_display = profile.get("emulator", emulator_name)
|
||||
emu_pack_structure = profile.get("pack_structure")
|
||||
for fe in profile.get("files", []):
|
||||
plat_file_index[fe.get("name", "").lower()] = fe
|
||||
for alias in fe.get("aliases", []):
|
||||
plat_file_index[alias.lower()] = fe
|
||||
|
||||
context_name = plat_display if platform_name else (emu_display or "Custom")
|
||||
zip_name = f"{context_name.replace(' ', '_')}_Custom_BIOS_Pack.zip"
|
||||
zip_path = os.path.join(output_dir, zip_name)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
packed: list[tuple[str, str]] = []
|
||||
not_in_repo: list[tuple[str, str]] = []
|
||||
not_in_db: list[str] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for hash_type, hash_val in hashes:
|
||||
sha1 = None
|
||||
if hash_type == "sha1" and hash_val in files_db:
|
||||
sha1 = hash_val
|
||||
elif hash_type == "md5":
|
||||
sha1 = by_md5.get(hash_val)
|
||||
elif hash_type == "crc32":
|
||||
sha1 = by_crc32.get(hash_val)
|
||||
|
||||
if not sha1 or sha1 not in files_db:
|
||||
not_in_db.append(hash_val)
|
||||
continue
|
||||
|
||||
entry = files_db[sha1]
|
||||
name = entry.get("name", "")
|
||||
aliases = entry.get("aliases", [])
|
||||
paths = entry.get("paths", [])
|
||||
|
||||
dest = name
|
||||
matched_fe = None
|
||||
for lookup_name in [name] + aliases:
|
||||
if lookup_name.lower() in plat_file_index:
|
||||
matched_fe = plat_file_index[lookup_name.lower()]
|
||||
break
|
||||
|
||||
if matched_fe:
|
||||
if emulator_name and emu_pack_structure is not None:
|
||||
dest = _resolve_destination(matched_fe, emu_pack_structure, standalone)
|
||||
else:
|
||||
dest = matched_fe.get("destination", matched_fe.get("name", name))
|
||||
elif paths:
|
||||
dest = paths[0]
|
||||
|
||||
if base_dest and not dest.startswith(base_dest):
|
||||
full_dest = f"{base_dest}/{dest}"
|
||||
else:
|
||||
full_dest = dest
|
||||
|
||||
if full_dest in seen:
|
||||
continue
|
||||
seen.add(full_dest)
|
||||
|
||||
fe_for_resolve = {"name": name, "sha1": sha1, "md5": entry.get("md5", "")}
|
||||
local_path, status = resolve_file(fe_for_resolve, db, bios_dir, zip_contents)
|
||||
|
||||
if status == "not_found" or not local_path:
|
||||
not_in_repo.append((name, hash_val))
|
||||
continue
|
||||
|
||||
zf.write(local_path, full_dest)
|
||||
packed.append((name, hash_val))
|
||||
|
||||
total = len(hashes)
|
||||
print(f"\nPacked {len(packed)}/{total} requested files")
|
||||
for name, h in packed:
|
||||
print(f" PACKED: {name} ({h[:16]}...)")
|
||||
for name, h in not_in_repo:
|
||||
print(f" NOT IN REPO: {name} ({h[:16]}...)")
|
||||
for h in not_in_db:
|
||||
print(f" NOT IN DB: {h}")
|
||||
|
||||
if not packed:
|
||||
if os.path.exists(zip_path):
|
||||
os.unlink(zip_path)
|
||||
return None
|
||||
return zip_path
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate platform BIOS ZIP packs")
|
||||
parser.add_argument("--platform", "-p", help="Platform name (e.g., retroarch)")
|
||||
@@ -997,6 +1210,10 @@ def main():
|
||||
help="Grouping for --split (default: system)")
|
||||
parser.add_argument("--target", "-t", help="Hardware target (e.g., switch, rpi4)")
|
||||
parser.add_argument("--list-targets", action="store_true", help="List available targets for the platform")
|
||||
parser.add_argument("--from-md5",
|
||||
help="Hash(es) to look up or pack (comma-separated)")
|
||||
parser.add_argument("--from-md5-file",
|
||||
help="File with hashes (one per line)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list:
|
||||
@@ -1031,6 +1248,16 @@ def main():
|
||||
has_all = args.all
|
||||
has_emulator = bool(args.emulator)
|
||||
has_system = bool(args.system)
|
||||
has_from_md5 = bool(args.from_md5 or getattr(args, 'from_md5_file', None))
|
||||
|
||||
if args.from_md5 and getattr(args, 'from_md5_file', None):
|
||||
parser.error("--from-md5 and --from-md5-file are mutually exclusive")
|
||||
if has_from_md5 and has_all:
|
||||
parser.error("--from-md5 requires --platform or --emulator, not --all")
|
||||
if has_from_md5 and has_system:
|
||||
parser.error("--from-md5 and --system are mutually exclusive")
|
||||
if has_from_md5 and args.split:
|
||||
parser.error("--split and --from-md5 are mutually exclusive")
|
||||
|
||||
# --platform/--all and --system can combine (system filters within platform)
|
||||
# --emulator is exclusive with everything else
|
||||
@@ -1038,8 +1265,8 @@ def main():
|
||||
parser.error("--emulator is mutually exclusive with --platform, --all, and --system")
|
||||
if has_platform and has_all:
|
||||
parser.error("--platform and --all are mutually exclusive")
|
||||
if not (has_platform or has_all or has_emulator or has_system):
|
||||
parser.error("Specify --platform, --all, --emulator, or --system")
|
||||
if not (has_platform or has_all or has_emulator or has_system or has_from_md5):
|
||||
parser.error("Specify --platform, --all, --emulator, --system, or --from-md5")
|
||||
if args.standalone and not (has_emulator or (has_system and not has_platform and not has_all)):
|
||||
parser.error("--standalone requires --emulator or --system (without --platform)")
|
||||
if args.split and not (has_platform or has_all):
|
||||
@@ -1053,6 +1280,32 @@ def main():
|
||||
if args.target and has_emulator:
|
||||
parser.error("--target is incompatible with --emulator")
|
||||
|
||||
# Hash lookup / pack mode
|
||||
if has_from_md5:
|
||||
if args.from_md5:
|
||||
hashes = parse_hash_input(args.from_md5)
|
||||
else:
|
||||
hashes = parse_hash_file(args.from_md5_file)
|
||||
if not hashes:
|
||||
print("No valid hashes found in input", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
db = load_database(args.db)
|
||||
if not has_platform and not has_emulator:
|
||||
lookup_hashes(hashes, db, args.bios_dir, args.emulators_dir,
|
||||
args.platforms_dir)
|
||||
return
|
||||
zip_contents = build_zip_contents_index(db)
|
||||
result = generate_md5_pack(
|
||||
hashes=hashes, db=db, bios_dir=args.bios_dir,
|
||||
output_dir=args.output_dir, zip_contents=zip_contents,
|
||||
platform_name=args.platform, platforms_dir=args.platforms_dir,
|
||||
emulator_name=args.emulator, emulators_dir=args.emulators_dir,
|
||||
standalone=getattr(args, "standalone", False),
|
||||
)
|
||||
if not result:
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
db = load_database(args.db)
|
||||
zip_contents = build_zip_contents_index(db)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user