diff --git a/scripts/pipeline.py b/scripts/pipeline.py index 1885c1db..4777f0dc 100644 --- a/scripts/pipeline.py +++ b/scripts/pipeline.py @@ -177,6 +177,28 @@ def main(): print("\n--- 2/9 refresh data directories: SKIPPED (--offline) ---") results["refresh_data"] = True + # Step 2a: Refresh MAME BIOS hashes + if not args.offline: + ok, _ = run( + [sys.executable, "-m", "scripts.scraper.mame_hash_scraper"], + "2a refresh MAME hashes", + ) + results["mame_hashes"] = ok + else: + print("\n--- 2a refresh MAME hashes: SKIPPED (--offline) ---") + results["mame_hashes"] = True + + # Step 2a2: Refresh FBNeo BIOS hashes + if not args.offline: + ok, _ = run( + [sys.executable, "-m", "scripts.scraper.fbneo_hash_scraper"], + "2a2 refresh FBNeo hashes", + ) + results["fbneo_hashes"] = ok + else: + print("\n--- 2a2 refresh FBNeo hashes: SKIPPED (--offline) ---") + results["fbneo_hashes"] = True + # Step 2b: Check buildbot system directory (non-blocking) if args.check_buildbot and not args.offline: ok, _ = run( diff --git a/scripts/scraper/_hash_merge.py b/scripts/scraper/_hash_merge.py index 72d2e8e0..76734e16 100644 --- a/scripts/scraper/_hash_merge.py +++ b/scripts/scraper/_hash_merge.py @@ -19,13 +19,15 @@ def merge_mame_profile( profile_path: str, hashes_path: str, write: bool = False, + add_new: bool = True, ) -> dict[str, Any]: """Merge MAME bios_zip entries from upstream hash data. Preserves system, note, required per entry. Updates contents and - source_ref from the hashes JSON. New sets get system=None, - required=True, category=bios_zip. Removed sets are flagged with - _upstream_removed=True. + source_ref from the hashes JSON. New sets are only added when + add_new=True (main profile). Entries not in the hash data are + left untouched (the scraper only covers MACHINE_IS_BIOS_ROOT sets, + not all machine ROM sets). If write=True, backs up existing profile to .old.yml before writing. """ @@ -42,20 +44,23 @@ def merge_mame_profile( key = _zip_name_to_set(entry['name']) existing_by_name[key] = entry - merged: list[dict] = [] - seen_sets: set[str] = set() + updated_bios: list[dict] = [] + matched_names: set[str] = set() for set_name, set_data in hashes.get('bios_sets', {}).items(): - seen_sets.add(set_name) contents = _build_contents(set_data.get('roms', [])) source_ref = _build_source_ref(set_data) if set_name in existing_by_name: + # Update existing entry: preserve manual fields, update contents entry = existing_by_name[set_name].copy() entry['contents'] = contents if source_ref: entry['source_ref'] = source_ref - else: + updated_bios.append(entry) + matched_names.add(set_name) + elif add_new: + # New BIOS set — only added to the main profile entry = { 'name': f'{set_name}.zip', 'required': True, @@ -64,16 +69,15 @@ def merge_mame_profile( 'source_ref': source_ref, 'contents': contents, } + updated_bios.append(entry) - merged.append(entry) - + # Entries not matched by the scraper stay untouched + # (computer ROMs, device ROMs, etc. — outside BIOS root set scope) for set_name, entry in existing_by_name.items(): - if set_name not in seen_sets: - removed = entry.copy() - removed['_upstream_removed'] = True - merged.append(removed) + if set_name not in matched_names: + updated_bios.append(entry) - profile['files'] = non_bios + merged + profile['files'] = non_bios + updated_bios if write: _backup_and_write(profile_path, profile) @@ -85,11 +89,13 @@ def merge_fbneo_profile( profile_path: str, hashes_path: str, write: bool = False, + add_new: bool = True, ) -> dict[str, Any]: """Merge FBNeo individual ROM entries from upstream hash data. Preserves system, required per entry. Updates crc32, size, and - source_ref. New ROMs get archive=set_name.zip, required=True. + source_ref. New ROMs are only added when add_new=True (main profile). + Entries not in the hash data are left untouched. If write=True, backs up existing profile to .old.yml before writing. """ @@ -107,7 +113,7 @@ def merge_fbneo_profile( existing_by_key[key] = entry merged: list[dict] = [] - seen_keys: set[tuple[str, str]] = set() + matched_keys: set[tuple[str, str]] = set() for set_name, set_data in hashes.get('bios_sets', {}).items(): archive_name = f'{set_name}.zip' @@ -116,7 +122,6 @@ def merge_fbneo_profile( for rom in set_data.get('roms', []): rom_name = rom['name'] key = (archive_name, rom_name) - seen_keys.add(key) if key in existing_by_key: entry = existing_by_key[key].copy() @@ -126,7 +131,9 @@ def merge_fbneo_profile( entry['sha1'] = rom['sha1'] if source_ref: entry['source_ref'] = source_ref - else: + merged.append(entry) + matched_keys.add(key) + elif add_new: entry = { 'name': rom_name, 'archive': archive_name, @@ -138,14 +145,12 @@ def merge_fbneo_profile( entry['sha1'] = rom['sha1'] if source_ref: entry['source_ref'] = source_ref + merged.append(entry) - merged.append(entry) - + # Entries not matched stay untouched for key, entry in existing_by_key.items(): - if key not in seen_keys: - removed = entry.copy() - removed['_upstream_removed'] = True - merged.append(removed) + if key not in matched_keys: + merged.append(entry) profile['files'] = non_archive + merged @@ -202,13 +207,17 @@ def _diff_mame( else: unchanged += 1 - removed = [s for s in existing_by_name if s not in bios_sets] + # Items in profile but not in scraper output = out of scope (not removed) + out_of_scope = len(existing_by_name) - sum( + 1 for s in existing_by_name if s in bios_sets + ) return { 'added': added, 'updated': updated, - 'removed': removed, + 'removed': [], 'unchanged': unchanged, + 'out_of_scope': out_of_scope, } @@ -247,15 +256,14 @@ def _diff_fbneo( else: unchanged += 1 - removed = [ - f"{k[0]}:{k[1]}" for k in existing_by_key if k not in seen_keys - ] + out_of_scope = sum(1 for k in existing_by_key if k not in seen_keys) return { 'added': added, 'updated': updated, - 'removed': removed, + 'removed': [], 'unchanged': unchanged, + 'out_of_scope': out_of_scope, } diff --git a/scripts/scraper/fbneo_hash_scraper.py b/scripts/scraper/fbneo_hash_scraper.py new file mode 100644 index 00000000..2f1e95df --- /dev/null +++ b/scripts/scraper/fbneo_hash_scraper.py @@ -0,0 +1,315 @@ +"""Scrape FBNeo BIOS set hashes from upstream source via sparse clone. + +Does NOT inherit BaseScraper (uses git sparse clone, not URL fetch). +Parses BDF_BOARDROM drivers from src/burn/drv/ to extract CRC32/size +for all BIOS ROM sets, then optionally merges into emulator profiles. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import shutil +import subprocess +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Any + +import yaml + +from scripts.scraper.fbneo_parser import parse_fbneo_source_tree +from scripts.scraper._hash_merge import compute_diff, merge_fbneo_profile + +log = logging.getLogger(__name__) + +REPO_URL = 'https://github.com/finalburnneo/FBNeo.git' +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +CLONE_DIR = REPO_ROOT / 'tmp' / 'fbneo' +CACHE_PATH = REPO_ROOT / 'data' / 'fbneo-hashes.json' +EMULATORS_DIR = REPO_ROOT / 'emulators' +STALE_HOURS = 24 + + +def _is_cache_fresh() -> bool: + """Check if the JSON cache exists and is less than 24 hours old.""" + if not CACHE_PATH.exists(): + return False + try: + data = json.loads(CACHE_PATH.read_text(encoding='utf-8')) + fetched_at = datetime.fromisoformat(data['fetched_at']) + return datetime.now(timezone.utc) - fetched_at < timedelta(hours=STALE_HOURS) + except (json.JSONDecodeError, KeyError, ValueError): + return False + + +def _sparse_clone() -> None: + """Sparse clone FBNeo repo, checking out only src/burn/drv.""" + if CLONE_DIR.exists(): + shutil.rmtree(CLONE_DIR) + + CLONE_DIR.parent.mkdir(parents=True, exist_ok=True) + + subprocess.run( + [ + 'git', 'clone', '--depth', '1', '--filter=blob:none', + '--sparse', REPO_URL, str(CLONE_DIR), + ], + check=True, + capture_output=True, + text=True, + ) + + subprocess.run( + ['git', 'sparse-checkout', 'set', 'src/burn/drv', 'src/burner/resource.h'], + cwd=CLONE_DIR, + check=True, + capture_output=True, + text=True, + ) + + +def _extract_version() -> tuple[str, str]: + """Extract version tag and commit SHA from the cloned repo. + + Returns (version, commit_sha). Falls back to resource.h if no tag. + """ + result = subprocess.run( + ['git', 'describe', '--tags', '--abbrev=0'], + cwd=CLONE_DIR, + capture_output=True, + text=True, + ) + + # Prefer real version tags over pseudo-tags like "latest" + version = 'unknown' + if result.returncode == 0: + tag = result.stdout.strip() + if tag and tag != 'latest': + version = tag + # Fallback: resource.h + if version == 'unknown': + version = _version_from_resource_h() + # Last resort: use GitHub API for latest real release tag + if version == 'unknown': + try: + import urllib.request + import urllib.error + req = urllib.request.Request( + 'https://api.github.com/repos/finalburnneo/FBNeo/tags?per_page=10', + headers={'User-Agent': 'retrobios-scraper/1.0'}, + ) + with urllib.request.urlopen(req, timeout=10) as resp: + import json as json_mod + tags = json_mod.loads(resp.read()) + for t in tags: + if t['name'] != 'latest' and t['name'].startswith('v'): + version = t['name'] + break + except (urllib.error.URLError, OSError): + pass + + sha_result = subprocess.run( + ['git', 'rev-parse', 'HEAD'], + cwd=CLONE_DIR, + capture_output=True, + text=True, + check=True, + ) + commit = sha_result.stdout.strip() + + return version, commit + + +def _version_from_resource_h() -> str: + """Fallback: parse VER_FULL_VERSION_STR from resource.h.""" + resource_h = CLONE_DIR / 'src' / 'burner' / 'resource.h' + if not resource_h.exists(): + return 'unknown' + + text = resource_h.read_text(encoding='utf-8', errors='replace') + for line in text.splitlines(): + if 'VER_FULL_VERSION_STR' in line: + parts = line.split('"') + if len(parts) >= 2: + return parts[1] + return 'unknown' + + +def _cleanup() -> None: + """Remove the sparse clone directory.""" + if CLONE_DIR.exists(): + shutil.rmtree(CLONE_DIR) + + +def fetch_and_cache(force: bool = False) -> dict[str, Any]: + """Clone, parse, and write JSON cache. Returns the cache dict.""" + if not force and _is_cache_fresh(): + log.info('cache fresh, skipping clone (use --force to override)') + return json.loads(CACHE_PATH.read_text(encoding='utf-8')) + + try: + log.info('sparse cloning %s', REPO_URL) + _sparse_clone() + + log.info('extracting version') + version, commit = _extract_version() + + log.info('parsing source tree') + bios_sets = parse_fbneo_source_tree(str(CLONE_DIR)) + + cache: dict[str, Any] = { + 'source': 'finalburnneo/FBNeo', + 'version': version, + 'commit': commit, + 'fetched_at': datetime.now(timezone.utc).isoformat(), + 'bios_sets': bios_sets, + } + + CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) + CACHE_PATH.write_text( + json.dumps(cache, indent=2, ensure_ascii=False) + '\n', + encoding='utf-8', + ) + log.info('wrote %d BIOS sets to %s', len(bios_sets), CACHE_PATH) + + return cache + finally: + _cleanup() + + +def _find_fbneo_profiles() -> list[Path]: + """Find emulator profiles whose upstream references finalburnneo/FBNeo.""" + profiles: list[Path] = [] + for path in sorted(EMULATORS_DIR.glob('*.yml')): + if path.name.endswith('.old.yml'): + continue + try: + data = yaml.safe_load(path.read_text(encoding='utf-8')) + except (yaml.YAMLError, OSError): + continue + if not data or not isinstance(data, dict): + continue + upstream = data.get('upstream', '') + if isinstance(upstream, str) and 'finalburnneo/fbneo' in upstream.lower(): + profiles.append(path) + return profiles + + +def _format_diff(profile_name: str, diff: dict[str, Any], show_added: bool = True) -> str: + """Format diff for a single profile.""" + lines: list[str] = [] + lines.append(f' {profile_name}:') + + added = diff.get('added', []) + updated = diff.get('updated', []) + oos = diff.get('out_of_scope', 0) + + if not added and not updated: + lines.append(' no changes') + if oos: + lines.append(f' . {oos} out of scope') + return '\n'.join(lines) + + if show_added: + for label in added: + lines.append(f' + {label}') + elif added: + lines.append(f' + {len(added)} new ROMs available (main profile only)') + for label in updated: + lines.append(f' ~ {label}') + lines.append(f' = {diff["unchanged"]} unchanged') + if oos: + lines.append(f' . {oos} out of scope') + + return '\n'.join(lines) + + +def run( + dry_run: bool = False, + force: bool = False, + json_output: bool = False, +) -> int: + """Main entry point for the scraper.""" + cache = fetch_and_cache(force=force) + + version = cache.get('version', 'unknown') + commit = cache.get('commit', '?')[:12] + bios_sets = cache.get('bios_sets', {}) + profiles = _find_fbneo_profiles() + + if json_output: + result: dict[str, Any] = { + 'source': cache.get('source'), + 'version': version, + 'commit': cache.get('commit'), + 'bios_set_count': len(bios_sets), + 'profiles': {}, + } + for path in profiles: + diff = compute_diff(str(path), str(CACHE_PATH), mode='fbneo') + result['profiles'][path.stem] = diff + print(json.dumps(result, indent=2)) + return 0 + + header = ( + f'fbneo-hashes: {len(bios_sets)} BIOS sets ' + f'from finalburnneo/FBNeo @ {version} ({commit})' + ) + print(header) + print() + + if not profiles: + print(' no matching emulator profiles found') + return 0 + + for path in profiles: + is_main = path.name == 'fbneo.yml' + diff = compute_diff(str(path), str(CACHE_PATH), mode='fbneo') + print(_format_diff(path.stem, diff, show_added=is_main)) + + if not dry_run and (diff['added'] or diff['updated']): + is_main = path.name == 'fbneo.yml' + merge_fbneo_profile(str(path), str(CACHE_PATH), write=True, add_new=is_main) + log.info('merged changes into %s', path.name) + + return 0 + + +def main() -> None: + parser = argparse.ArgumentParser( + description='Scrape FBNeo BIOS set hashes from upstream source', + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='show diff without writing changes', + ) + parser.add_argument( + '--force', + action='store_true', + help='force re-clone even if cache is fresh', + ) + parser.add_argument( + '--json', + action='store_true', + dest='json_output', + help='output diff as JSON', + ) + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format='%(name)s: %(message)s', + ) + + sys.exit(run( + dry_run=args.dry_run, + force=args.force, + json_output=args.json_output, + )) + + +if __name__ == '__main__': + main() diff --git a/scripts/scraper/mame_hash_scraper.py b/scripts/scraper/mame_hash_scraper.py new file mode 100644 index 00000000..a32360c3 --- /dev/null +++ b/scripts/scraper/mame_hash_scraper.py @@ -0,0 +1,322 @@ +"""Fetch MAME BIOS hashes from mamedev/mame source and merge into profiles. + +Sparse clones the MAME repo, parses the source tree for BIOS root sets, +caches results to data/mame-hashes.json, and optionally merges into +emulator profiles that reference mamedev/mame upstream. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import shutil +import subprocess +import sys +import urllib.error +import urllib.request +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import yaml + +from .mame_parser import parse_mame_source_tree +from ._hash_merge import compute_diff, merge_mame_profile + +log = logging.getLogger(__name__) + +_ROOT = Path(__file__).resolve().parents[2] +_CACHE_PATH = _ROOT / 'data' / 'mame-hashes.json' +_CLONE_DIR = _ROOT / 'tmp' / 'mame' +_EMULATORS_DIR = _ROOT / 'emulators' +_REPO_URL = 'https://github.com/mamedev/mame.git' +_STALE_HOURS = 24 + + +# ── Cache ──────────────────────────────────────────────────────────── + + +def _load_cache() -> dict[str, Any] | None: + if not _CACHE_PATH.exists(): + return None + try: + with open(_CACHE_PATH, encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + return None + + +def _is_stale(cache: dict[str, Any] | None) -> bool: + if cache is None: + return True + fetched_at = cache.get('fetched_at') + if not fetched_at: + return True + try: + ts = datetime.fromisoformat(fetched_at) + age = datetime.now(timezone.utc) - ts + return age.total_seconds() > _STALE_HOURS * 3600 + except (ValueError, TypeError): + return True + + +def _write_cache(data: dict[str, Any]) -> None: + _CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(_CACHE_PATH, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + log.info('cache written to %s', _CACHE_PATH) + + +# ── Git operations ─────────────────────────────────────────────────── + + +def _run_git(args: list[str], cwd: Path | None = None) -> subprocess.CompletedProcess[str]: + return subprocess.run( + ['git', *args], + cwd=cwd, + check=True, + capture_output=True, + text=True, + ) + + +def _sparse_clone() -> None: + if _CLONE_DIR.exists(): + shutil.rmtree(_CLONE_DIR) + _CLONE_DIR.parent.mkdir(parents=True, exist_ok=True) + + log.info('sparse cloning mamedev/mame into %s', _CLONE_DIR) + _run_git([ + 'clone', + '--depth', '1', + '--filter=blob:none', + '--sparse', + _REPO_URL, + str(_CLONE_DIR), + ]) + _run_git( + ['sparse-checkout', 'set', 'src/mame', 'src/devices'], + cwd=_CLONE_DIR, + ) + + +def _get_version() -> str: + # version.cpp is generated at build time, not in the repo. + # Use GitHub API to get the latest release tag. + try: + req = urllib.request.Request( + 'https://api.github.com/repos/mamedev/mame/releases/latest', + headers={'User-Agent': 'retrobios-scraper/1.0', + 'Accept': 'application/vnd.github.v3+json'}, + ) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + tag = data.get('tag_name', '') + if tag: + return _parse_version_tag(tag) + except (urllib.error.URLError, json.JSONDecodeError, OSError): + pass + return 'unknown' + + +def _parse_version_tag(tag: str) -> str: + prefix = 'mame' + raw = tag.removeprefix(prefix) if tag.startswith(prefix) else tag + if raw.isdigit() and len(raw) >= 4: + return f'{raw[0]}.{raw[1:]}' + return raw + + + + +def _get_commit() -> str: + try: + result = _run_git(['rev-parse', 'HEAD'], cwd=_CLONE_DIR) + return result.stdout.strip() + except subprocess.CalledProcessError: + return '' + + +def _cleanup() -> None: + if _CLONE_DIR.exists(): + log.info('cleaning up %s', _CLONE_DIR) + shutil.rmtree(_CLONE_DIR) + + +# ── Profile discovery ──────────────────────────────────────────────── + + +def _find_mame_profiles() -> list[Path]: + profiles: list[Path] = [] + for path in sorted(_EMULATORS_DIR.glob('*.yml')): + if path.name.endswith('.old.yml'): + continue + try: + with open(path, encoding='utf-8') as f: + data = yaml.safe_load(f) + if not isinstance(data, dict): + continue + upstream = data.get('upstream', '') + # Only match profiles tracking current MAME (not frozen snapshots + # which have upstream like "mamedev/mame/tree/mame0139") + if isinstance(upstream, str) and upstream.rstrip('/') == 'https://github.com/mamedev/mame': + profiles.append(path) + except (yaml.YAMLError, OSError): + continue + return profiles + + +# ── Diff formatting ────────────────────────────────────────────────── + + +def _format_diff( + profile_path: Path, + diff: dict[str, Any], + hashes: dict[str, Any], + show_added: bool = True, +) -> list[str]: + lines: list[str] = [] + name = profile_path.stem + + added = diff.get('added', []) + updated = diff.get('updated', []) + removed = diff.get('removed', []) + unchanged = diff.get('unchanged', 0) + + if not added and not updated and not removed: + lines.append(f' {name}:') + lines.append(' no changes') + return lines + + lines.append(f' {name}:') + + if show_added: + bios_sets = hashes.get('bios_sets', {}) + for set_name in added: + rom_count = len(bios_sets.get(set_name, {}).get('roms', [])) + source_file = bios_sets.get(set_name, {}).get('source_file', '') + source_line = bios_sets.get(set_name, {}).get('source_line', '') + ref = f'{source_file}:{source_line}' if source_file else '' + lines.append(f' + {set_name}.zip ({ref}, {rom_count} ROMs)') + elif added: + lines.append(f' + {len(added)} new sets available (main profile only)') + + for set_name in updated: + lines.append(f' ~ {set_name}.zip (contents changed)') + + oos = diff.get('out_of_scope', 0) + lines.append(f' = {unchanged} unchanged') + if oos: + lines.append(f' . {oos} out of scope (not BIOS root sets)') + return lines + + +# ── Main ───────────────────────────────────────────────────────────── + + +def _fetch_hashes(force: bool) -> dict[str, Any]: + cache = _load_cache() + if not force and not _is_stale(cache): + log.info('using cached data from %s', cache.get('fetched_at', '')) + return cache # type: ignore[return-value] + + try: + _sparse_clone() + bios_sets = parse_mame_source_tree(str(_CLONE_DIR)) + version = _get_version() + commit = _get_commit() + + data: dict[str, Any] = { + 'source': 'mamedev/mame', + 'version': version, + 'commit': commit, + 'fetched_at': datetime.now(timezone.utc).isoformat(), + 'bios_sets': bios_sets, + } + _write_cache(data) + return data + finally: + _cleanup() + + +def _run(args: argparse.Namespace) -> None: + hashes = _fetch_hashes(args.force) + + total_sets = len(hashes.get('bios_sets', {})) + version = hashes.get('version', 'unknown') + commit = hashes.get('commit', '')[:12] + + if args.json: + json.dump(hashes, sys.stdout, indent=2, ensure_ascii=False) + sys.stdout.write('\n') + return + + print(f'mame-hashes: {total_sets} BIOS root sets from mamedev/mame' + f' @ {version} ({commit})') + print() + + profiles = _find_mame_profiles() + if not profiles: + print(' no profiles with mamedev/mame upstream found') + return + + for profile_path in profiles: + is_main = profile_path.name == 'mame.yml' + diff = compute_diff(str(profile_path), str(_CACHE_PATH), mode='mame') + lines = _format_diff(profile_path, diff, hashes, show_added=is_main) + for line in lines: + print(line) + + if not args.dry_run: + updated = diff.get('updated', []) + added = diff.get('added', []) if is_main else [] + if added or updated: + merge_mame_profile( + str(profile_path), + str(_CACHE_PATH), + write=True, + add_new=is_main, + ) + log.info('merged into %s', profile_path.name) + + print() + if args.dry_run: + print('(dry run, no files modified)') + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog='mame_hash_scraper', + description='Fetch MAME BIOS hashes from source and merge into profiles.', + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='show diff only, do not modify profiles', + ) + parser.add_argument( + '--json', + action='store_true', + help='output raw JSON to stdout', + ) + parser.add_argument( + '--force', + action='store_true', + help='re-fetch even if cache is fresh', + ) + return parser + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, + format='%(levelname)s: %(message)s', + ) + parser = build_parser() + args = parser.parse_args() + _run(args) + + +if __name__ == '__main__': + main() diff --git a/scripts/scraper/mame_parser.py b/scripts/scraper/mame_parser.py index 0a27de39..da79ef8d 100644 --- a/scripts/scraper/mame_parser.py +++ b/scripts/scraper/mame_parser.py @@ -22,9 +22,9 @@ _MACHINE_MACROS = re.compile( _ROM_START = re.compile(r'ROM_START\s*\(\s*(\w+)\s*\)') _ROM_END = re.compile(r'ROM_END') -# ROM_REGION( tag, offset, size ) +# ROM_REGION variants: ROM_REGION, ROM_REGION16_BE, ROM_REGION16_LE, ROM_REGION32_LE, etc. _ROM_REGION = re.compile( - r'ROM_REGION\s*\(' + r'ROM_REGION\w*\s*\(' r'\s*(0x[\da-fA-F]+|\d+)\s*,' # size r'\s*"([^"]+)"\s*,', # tag ) @@ -37,10 +37,16 @@ _ROM_SYSTEM_BIOS = re.compile( r'\s*"([^"]+)"\s*\)', # description ) -# All ROM_LOAD variants: ROM_LOAD, ROMX_LOAD, ROM_LOAD16_BYTE, ROM_LOAD16_WORD, etc. +# All ROM_LOAD variants including custom BIOS macros. +# Standard: ROM_LOAD("name", offset, size, hash) +# BIOS variant: ROM_LOAD_BIOS(biosidx, "name", offset, size, hash) +# ROM_LOAD16_WORD_SWAP_BIOS(biosidx, "name", offset, size, hash) +# The key pattern: any macro containing "ROM_LOAD" or "ROMX_LOAD" in its name, +# with the first quoted string being the ROM filename. _ROM_LOAD = re.compile( - r'(ROMX?_LOAD(?:16_BYTE|16_WORD|16_WORD_SWAP|32_BYTE|32_WORD|32_WORD_SWAP)?)\s*\(' - r'\s*"([^"]+)"\s*,' # name + r'\b\w*ROMX?_LOAD\w*\s*\(' + r'[^"]*' # skip any args before the filename (e.g., bios index) + r'"([^"]+)"\s*,' # name (first quoted string) r'\s*(0x[\da-fA-F]+|\d+)\s*,' # offset r'\s*(0x[\da-fA-F]+|\d+)\s*,', # size ) @@ -104,9 +110,9 @@ def find_bios_root_sets(source: str, filename: str) -> dict[str, dict]: def parse_rom_block(source: str, set_name: str) -> list[dict]: """Parse ROM definitions for a given set name. - Finds the ROM_START(set_name)...ROM_END block and extracts all - ROM_LOAD entries with their metadata. Skips NO_DUMP entries, - flags BAD_DUMP entries. + Finds the ROM_START(set_name)...ROM_END block, expands local + #define macros that contain ROM_LOAD/ROM_REGION calls, then + extracts all ROM entries. Skips NO_DUMP, flags BAD_DUMP. """ pattern = re.compile( r'ROM_START\s*\(\s*' + re.escape(set_name) + r'\s*\)', @@ -120,6 +126,13 @@ def parse_rom_block(source: str, set_name: str) -> list[dict]: return [] block = source[start_match.end():end_match.start()] + + # Pre-expand macros: find #define macros in the file that contain + # ROM_LOAD/ROM_REGION/ROM_SYSTEM_BIOS calls, then expand their + # invocations within the ROM block. + macros = _collect_rom_macros(source) + block = _expand_macros(block, macros, depth=5) + return _parse_rom_entries(block) @@ -156,6 +169,68 @@ def parse_mame_source_tree(base_path: str) -> dict[str, dict]: return results +# Regex for #define macros that span multiple lines (backslash continuation) +_DEFINE_RE = re.compile( + r'^\s*#\s*define\s+(\w+)(?:\([^)]*\))?\s*((?:.*\\\n)*.*)', + re.MULTILINE, +) + +# ROM-related tokens that indicate a macro is relevant for expansion +_ROM_TOKENS = {'ROM_LOAD', 'ROMX_LOAD', 'ROM_REGION', 'ROM_SYSTEM_BIOS', + 'ROM_FILL', 'ROM_COPY', 'ROM_RELOAD'} + + +def _collect_rom_macros(source: str) -> dict[str, str]: + """Collect #define macros that contain ROM-related calls. + + Returns {macro_name: expanded_body} with backslash continuations joined. + Only collects macros that contain actual ROM data (quoted filenames), + not wrapper macros like ROM_LOAD16_WORD_SWAP_BIOS that just redirect + to ROMX_LOAD with formal parameters. + """ + macros: dict[str, str] = {} + for m in _DEFINE_RE.finditer(source): + name = m.group(1) + body = m.group(2) + # Join backslash-continued lines + body = body.replace('\\\n', ' ') + # Only keep macros that contain ROM-related tokens + if not any(tok in body for tok in _ROM_TOKENS): + continue + # Skip wrapper macros: if the body contains ROMX_LOAD/ROM_LOAD + # with unquoted args (formal parameters), it's a wrapper. + # These are already recognized by the _ROM_LOAD regex directly. + if re.search(r'ROMX?_LOAD\s*\(\s*\w+\s*,\s*\w+\s*,', body): + continue + macros[name] = body + return macros + + +def _expand_macros(block: str, macros: dict[str, str], depth: int = 5) -> str: + """Expand macro invocations in a ROM block. + + Handles both simple macros (NEOGEO_BIOS) and parameterized ones + (NEOGEO_UNIBIOS_2_2_AND_NEWER(16)). Recurses up to `depth` levels + for nested macros. + """ + if depth <= 0 or not macros: + return block + + changed = True + iterations = 0 + while changed and iterations < depth: + changed = False + iterations += 1 + for name, body in macros.items(): + # Match macro invocation: NAME or NAME(args) + pattern = re.compile(r'\b' + re.escape(name) + r'(?:\s*\([^)]*\))?') + if pattern.search(block): + block = pattern.sub(body, block) + changed = True + + return block + + def _find_closing_paren(source: str, start: int) -> int: """Find the matching closing paren for source[start] which must be '('.""" depth = 0 @@ -218,74 +293,90 @@ def _split_macro_args(inner: str) -> list[str]: def _parse_rom_entries(block: str) -> list[dict]: - """Parse ROM entries from a ROM block (content between ROM_START and ROM_END).""" + """Parse ROM entries from a ROM block (content between ROM_START and ROM_END). + + Uses regex scanning over the entire block (not line-by-line) to handle + macro-expanded content where multiple statements may be on one line. + Processes matches in order of appearance to track region and BIOS context. + """ roms: list[dict] = [] current_region = '' - bios_labels: dict[int, tuple[str, str]] = {} # index -> (label, description) + bios_labels: dict[int, tuple[str, str]] = {} - for line in block.split('\n'): - stripped = line.strip() + # Build a combined pattern that matches all interesting tokens + # and process them in order of occurrence + token_patterns = [ + ('region', _ROM_REGION), + ('bios_label', _ROM_SYSTEM_BIOS), + ('rom_load', _ROM_LOAD), + ] - # Track region changes - region_match = _ROM_REGION.search(stripped) - if region_match: - current_region = region_match.group(2) - continue + # Collect all matches with their positions + events: list[tuple[int, str, re.Match]] = [] + for tag, pat in token_patterns: + for m in pat.finditer(block): + events.append((m.start(), tag, m)) - # Track BIOS labels - bios_match = _ROM_SYSTEM_BIOS.search(stripped) - if bios_match: - idx = int(bios_match.group(1)) - bios_labels[idx] = (bios_match.group(2), bios_match.group(3)) - continue + # Sort by position in block + events.sort(key=lambda e: e[0]) - # ROM_LOAD variants - load_match = _ROM_LOAD.search(stripped) - if not load_match: - continue + for _pos, tag, m in events: + if tag == 'region': + current_region = m.group(2) + elif tag == 'bios_label': + idx = int(m.group(1)) + bios_labels[idx] = (m.group(2), m.group(3)) + elif tag == 'rom_load': + # Get the full macro call as context (find closing paren) + context_start = m.start() + # Find the opening paren of the ROM_LOAD macro + paren_pos = block.find('(', context_start) + if paren_pos != -1: + close_pos = _find_closing_paren(block, paren_pos) + context_end = close_pos + 1 if close_pos != -1 else m.end() + 200 + else: + context_end = m.end() + 200 + context = block[context_start:min(context_end, len(block))] - # Skip NO_DUMP - if _NO_DUMP.search(stripped): - continue + if _NO_DUMP.search(context): + continue - rom_name = load_match.group(2) - rom_size = _parse_int(load_match.group(4)) + rom_name = m.group(1) + rom_size = _parse_int(m.group(3)) - # Extract CRC32 and SHA1 - crc_sha_match = _CRC_SHA.search(stripped) - crc32 = '' - sha1 = '' - if crc_sha_match: - crc32 = crc_sha_match.group(1).lower() - sha1 = crc_sha_match.group(2).lower() + crc_sha_match = _CRC_SHA.search(context) + crc32 = '' + sha1 = '' + if crc_sha_match: + crc32 = crc_sha_match.group(1).lower() + sha1 = crc_sha_match.group(2).lower() - bad_dump = bool(_BAD_DUMP.search(stripped)) + bad_dump = bool(_BAD_DUMP.search(context)) - # Check for ROM_BIOS association - bios_index = None - bios_label = '' - bios_description = '' - bios_ref = _ROM_BIOS.search(stripped) - if bios_ref: - bios_index = int(bios_ref.group(1)) - if bios_index in bios_labels: - bios_label, bios_description = bios_labels[bios_index] + bios_index = None + bios_label = '' + bios_description = '' + bios_ref = _ROM_BIOS.search(context) + if bios_ref: + bios_index = int(bios_ref.group(1)) + if bios_index in bios_labels: + bios_label, bios_description = bios_labels[bios_index] - entry: dict = { - 'name': rom_name, - 'size': rom_size, - 'crc32': crc32, - 'sha1': sha1, - 'region': current_region, - 'bad_dump': bad_dump, - } + entry: dict = { + 'name': rom_name, + 'size': rom_size, + 'crc32': crc32, + 'sha1': sha1, + 'region': current_region, + 'bad_dump': bad_dump, + } - if bios_index is not None: - entry['bios_index'] = bios_index - entry['bios_label'] = bios_label - entry['bios_description'] = bios_description + if bios_index is not None: + entry['bios_index'] = bios_index + entry['bios_label'] = bios_label + entry['bios_description'] = bios_description - roms.append(entry) + roms.append(entry) return roms diff --git a/tests/test_e2e.py b/tests/test_e2e.py index e64dffcf..6b497aca 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -3608,5 +3608,125 @@ class TestE2E(unittest.TestCase): self.assertIn("retrobat", exporters) + # --------------------------------------------------------------- + # Hash scraper: parsers + merge + # --------------------------------------------------------------- + + def test_mame_parser_finds_bios_root_sets(self): + from scripts.scraper.mame_parser import find_bios_root_sets, parse_rom_block + source = ''' +ROM_START( neogeo ) + ROM_REGION( 0x020000, "mainbios", 0 ) + ROM_LOAD( "sp-s2.sp1", 0x00000, 0x020000, CRC(9036d879) SHA1(4f834c580f3471ce40c3210ef5e7491df38d8851) ) +ROM_END +GAME( 1990, neogeo, 0, ng, neogeo, ng_state, empty_init, ROT0, "SNK", "Neo Geo", MACHINE_IS_BIOS_ROOT ) +ROM_START( pacman ) + ROM_REGION( 0x10000, "maincpu", 0 ) + ROM_LOAD( "pacman.6e", 0x0000, 0x1000, CRC(c1e6ab10) SHA1(e87e059c5be45753f7e9f33dff851f16d6751181) ) +ROM_END +GAME( 1980, pacman, 0, pacman, pacman, pacman_state, empty_init, ROT90, "Namco", "Pac-Man", 0 ) +''' + sets = find_bios_root_sets(source, "neogeo.cpp") + self.assertIn("neogeo", sets) + self.assertNotIn("pacman", sets) + roms = parse_rom_block(source, "neogeo") + self.assertEqual(len(roms), 1) + self.assertEqual(roms[0]["crc32"], "9036d879") + + def test_fbneo_parser_finds_bios_sets(self): + from scripts.scraper.fbneo_parser import find_bios_sets, parse_rom_info + source = ''' +static struct BurnRomInfo neogeoRomDesc[] = { + { "sp-s2.sp1", 0x020000, 0x9036d879, BRF_ESS | BRF_BIOS }, + { "", 0, 0, 0 } +}; +STD_ROM_PICK(neogeo) +STD_ROM_FN(neogeo) +struct BurnDriver BurnDrvneogeo = { + "neogeo", NULL, NULL, NULL, "1990", + "Neo Geo\\0", "BIOS only", "SNK", "Neo Geo MVS", + NULL, NULL, NULL, NULL, BDF_BOARDROM, 0, 0, + 0, 0, 0, NULL, neogeoRomInfo, neogeoRomName, NULL, NULL, + NULL, NULL, NULL, NULL, 0 +}; +''' + sets = find_bios_sets(source, "d_neogeo.cpp") + self.assertIn("neogeo", sets) + roms = parse_rom_info(source, "neogeo") + self.assertEqual(len(roms), 1) + self.assertEqual(roms[0]["crc32"], "9036d879") + + def test_mame_merge_preserves_manual_fields(self): + import json as json_mod + from scripts.scraper._hash_merge import merge_mame_profile + merge_dir = os.path.join(self.root, "merge_mame") + os.makedirs(merge_dir) + profile = { + "emulator": "Test", "type": "libretro", + "upstream": "https://github.com/mamedev/mame", + "core_version": "0.285", + "files": [{ + "name": "neogeo.zip", "required": True, "category": "bios_zip", + "system": "snk-neogeo-mvs", "note": "MVS BIOS", + "source_ref": "old.cpp:1", + "contents": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "oldcrc"}], + }], + } + profile_path = os.path.join(merge_dir, "test.yml") + with open(profile_path, "w") as f: + yaml.dump(profile, f, sort_keys=False) + hashes = { + "source": "mamedev/mame", "version": "0.286", "commit": "abc", + "fetched_at": "2026-03-30T00:00:00Z", + "bios_sets": {"neogeo": { + "source_file": "neo.cpp", "source_line": 42, + "roms": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "newcrc", "sha1": "abc123"}], + }}, + } + hashes_path = os.path.join(merge_dir, "hashes.json") + with open(hashes_path, "w") as f: + json_mod.dump(hashes, f) + result = merge_mame_profile(profile_path, hashes_path) + neo = next(f for f in result["files"] if f["name"] == "neogeo.zip") + self.assertEqual(neo["contents"][0]["crc32"], "newcrc") + self.assertEqual(neo["system"], "snk-neogeo-mvs") + self.assertEqual(neo["note"], "MVS BIOS") + self.assertEqual(neo["source_ref"], "neo.cpp:42") + self.assertEqual(result["core_version"], "0.286") + + def test_fbneo_merge_updates_individual_roms(self): + import json as json_mod + from scripts.scraper._hash_merge import merge_fbneo_profile + merge_dir = os.path.join(self.root, "merge_fbneo") + os.makedirs(merge_dir) + profile = { + "emulator": "FBNeo", "type": "libretro", + "upstream": "https://github.com/finalburnneo/FBNeo", + "core_version": "v1.0.0.02", + "files": [{"name": "sp-s2.sp1", "archive": "neogeo.zip", + "system": "snk-neogeo-mvs", "required": True, + "size": 131072, "crc32": "oldcrc"}], + } + profile_path = os.path.join(merge_dir, "fbneo.yml") + with open(profile_path, "w") as f: + yaml.dump(profile, f, sort_keys=False) + hashes = { + "source": "finalburnneo/FBNeo", "version": "v1.0.0.03", "commit": "def", + "fetched_at": "2026-03-30T00:00:00Z", + "bios_sets": {"neogeo": { + "source_file": "neo.cpp", "source_line": 10, + "roms": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "newcrc"}], + }}, + } + hashes_path = os.path.join(merge_dir, "hashes.json") + with open(hashes_path, "w") as f: + json_mod.dump(hashes, f) + result = merge_fbneo_profile(profile_path, hashes_path) + rom = next(f for f in result["files"] if f["name"] == "sp-s2.sp1") + self.assertEqual(rom["crc32"], "newcrc") + self.assertEqual(rom["system"], "snk-neogeo-mvs") + self.assertEqual(result["core_version"], "v1.0.0.03") + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_hash_merge.py b/tests/test_hash_merge.py index 05b9a3ca..fbd1c434 100644 --- a/tests/test_hash_merge.py +++ b/tests/test_hash_merge.py @@ -210,9 +210,10 @@ class TestMameMerge(unittest.TestCase): self.assertEqual(len(non_bios), 1) self.assertEqual(non_bios[0]['name'], 'hiscore.dat') - def test_merge_keeps_removed_bios_set(self) -> None: + def test_merge_keeps_unmatched_bios_set(self) -> None: + """Entries not in scraper scope stay untouched (no _upstream_removed).""" hashes = _make_mame_hashes() - hashes['bios_sets'] = {} # neogeo removed upstream + hashes['bios_sets'] = {} # nothing from scraper with tempfile.TemporaryDirectory() as td: p = Path(td) @@ -223,7 +224,8 @@ class TestMameMerge(unittest.TestCase): bios_files = [f for f in result['files'] if f.get('category') == 'bios_zip'] self.assertEqual(len(bios_files), 1) - self.assertTrue(bios_files[0].get('_upstream_removed')) + self.assertNotIn('_upstream_removed', bios_files[0]) + self.assertEqual(bios_files[0]['name'], 'neogeo.zip') def test_merge_updates_core_version(self) -> None: with tempfile.TemporaryDirectory() as td: @@ -311,7 +313,8 @@ class TestFbneoMerge(unittest.TestCase): self.assertEqual(len(non_archive), 1) self.assertEqual(non_archive[0]['name'], 'hiscore.dat') - def test_merge_marks_removed_roms(self) -> None: + def test_merge_keeps_unmatched_roms(self) -> None: + """Entries not in scraper scope stay untouched (no _upstream_removed).""" hashes = _make_fbneo_hashes() hashes['bios_sets'] = {} @@ -324,7 +327,7 @@ class TestFbneoMerge(unittest.TestCase): archive_files = [f for f in result['files'] if 'archive' in f] self.assertEqual(len(archive_files), 1) - self.assertTrue(archive_files[0].get('_upstream_removed')) + self.assertNotIn('_upstream_removed', archive_files[0]) def test_merge_updates_core_version(self) -> None: with tempfile.TemporaryDirectory() as td: @@ -362,7 +365,8 @@ class TestDiff(unittest.TestCase): self.assertEqual(len(diff['removed']), 0) self.assertEqual(diff['unchanged'], 0) - def test_diff_mame_detects_removed(self) -> None: + def test_diff_mame_out_of_scope(self) -> None: + """Items in profile but not in scraper output = out of scope, not removed.""" hashes = _make_mame_hashes() hashes['bios_sets'] = {} @@ -373,9 +377,9 @@ class TestDiff(unittest.TestCase): diff = compute_diff(profile_path, hashes_path, mode='mame') - self.assertIn('neogeo', diff['removed']) + self.assertEqual(diff['removed'], []) + self.assertEqual(diff['out_of_scope'], 1) self.assertEqual(len(diff['added']), 0) - self.assertEqual(len(diff['updated']), 0) def test_diff_fbneo_detects_changes(self) -> None: hashes = _make_fbneo_hashes()