feat: add MAME/FBNeo hash auto-fetch scrapers

sparse clone upstream repos, parse BIOS root sets from C source,
cache as JSON, merge into emulator profiles with backup.
covers macro expansion, version detection, subset profile protection.
This commit is contained in:
Abdessamad Derraz
2026-03-30 19:11:26 +02:00
parent 94c3ac9834
commit 75e34898ee
7 changed files with 982 additions and 100 deletions

View File

@@ -177,6 +177,28 @@ def main():
print("\n--- 2/9 refresh data directories: SKIPPED (--offline) ---")
results["refresh_data"] = True
# Step 2a: Refresh MAME BIOS hashes
if not args.offline:
ok, _ = run(
[sys.executable, "-m", "scripts.scraper.mame_hash_scraper"],
"2a refresh MAME hashes",
)
results["mame_hashes"] = ok
else:
print("\n--- 2a refresh MAME hashes: SKIPPED (--offline) ---")
results["mame_hashes"] = True
# Step 2a2: Refresh FBNeo BIOS hashes
if not args.offline:
ok, _ = run(
[sys.executable, "-m", "scripts.scraper.fbneo_hash_scraper"],
"2a2 refresh FBNeo hashes",
)
results["fbneo_hashes"] = ok
else:
print("\n--- 2a2 refresh FBNeo hashes: SKIPPED (--offline) ---")
results["fbneo_hashes"] = True
# Step 2b: Check buildbot system directory (non-blocking)
if args.check_buildbot and not args.offline:
ok, _ = run(

View File

@@ -19,13 +19,15 @@ def merge_mame_profile(
profile_path: str,
hashes_path: str,
write: bool = False,
add_new: bool = True,
) -> dict[str, Any]:
"""Merge MAME bios_zip entries from upstream hash data.
Preserves system, note, required per entry. Updates contents and
source_ref from the hashes JSON. New sets get system=None,
required=True, category=bios_zip. Removed sets are flagged with
_upstream_removed=True.
source_ref from the hashes JSON. New sets are only added when
add_new=True (main profile). Entries not in the hash data are
left untouched (the scraper only covers MACHINE_IS_BIOS_ROOT sets,
not all machine ROM sets).
If write=True, backs up existing profile to .old.yml before writing.
"""
@@ -42,20 +44,23 @@ def merge_mame_profile(
key = _zip_name_to_set(entry['name'])
existing_by_name[key] = entry
merged: list[dict] = []
seen_sets: set[str] = set()
updated_bios: list[dict] = []
matched_names: set[str] = set()
for set_name, set_data in hashes.get('bios_sets', {}).items():
seen_sets.add(set_name)
contents = _build_contents(set_data.get('roms', []))
source_ref = _build_source_ref(set_data)
if set_name in existing_by_name:
# Update existing entry: preserve manual fields, update contents
entry = existing_by_name[set_name].copy()
entry['contents'] = contents
if source_ref:
entry['source_ref'] = source_ref
else:
updated_bios.append(entry)
matched_names.add(set_name)
elif add_new:
# New BIOS set — only added to the main profile
entry = {
'name': f'{set_name}.zip',
'required': True,
@@ -64,16 +69,15 @@ def merge_mame_profile(
'source_ref': source_ref,
'contents': contents,
}
updated_bios.append(entry)
merged.append(entry)
# Entries not matched by the scraper stay untouched
# (computer ROMs, device ROMs, etc. — outside BIOS root set scope)
for set_name, entry in existing_by_name.items():
if set_name not in seen_sets:
removed = entry.copy()
removed['_upstream_removed'] = True
merged.append(removed)
if set_name not in matched_names:
updated_bios.append(entry)
profile['files'] = non_bios + merged
profile['files'] = non_bios + updated_bios
if write:
_backup_and_write(profile_path, profile)
@@ -85,11 +89,13 @@ def merge_fbneo_profile(
profile_path: str,
hashes_path: str,
write: bool = False,
add_new: bool = True,
) -> dict[str, Any]:
"""Merge FBNeo individual ROM entries from upstream hash data.
Preserves system, required per entry. Updates crc32, size, and
source_ref. New ROMs get archive=set_name.zip, required=True.
source_ref. New ROMs are only added when add_new=True (main profile).
Entries not in the hash data are left untouched.
If write=True, backs up existing profile to .old.yml before writing.
"""
@@ -107,7 +113,7 @@ def merge_fbneo_profile(
existing_by_key[key] = entry
merged: list[dict] = []
seen_keys: set[tuple[str, str]] = set()
matched_keys: set[tuple[str, str]] = set()
for set_name, set_data in hashes.get('bios_sets', {}).items():
archive_name = f'{set_name}.zip'
@@ -116,7 +122,6 @@ def merge_fbneo_profile(
for rom in set_data.get('roms', []):
rom_name = rom['name']
key = (archive_name, rom_name)
seen_keys.add(key)
if key in existing_by_key:
entry = existing_by_key[key].copy()
@@ -126,7 +131,9 @@ def merge_fbneo_profile(
entry['sha1'] = rom['sha1']
if source_ref:
entry['source_ref'] = source_ref
else:
merged.append(entry)
matched_keys.add(key)
elif add_new:
entry = {
'name': rom_name,
'archive': archive_name,
@@ -138,14 +145,12 @@ def merge_fbneo_profile(
entry['sha1'] = rom['sha1']
if source_ref:
entry['source_ref'] = source_ref
merged.append(entry)
merged.append(entry)
# Entries not matched stay untouched
for key, entry in existing_by_key.items():
if key not in seen_keys:
removed = entry.copy()
removed['_upstream_removed'] = True
merged.append(removed)
if key not in matched_keys:
merged.append(entry)
profile['files'] = non_archive + merged
@@ -202,13 +207,17 @@ def _diff_mame(
else:
unchanged += 1
removed = [s for s in existing_by_name if s not in bios_sets]
# Items in profile but not in scraper output = out of scope (not removed)
out_of_scope = len(existing_by_name) - sum(
1 for s in existing_by_name if s in bios_sets
)
return {
'added': added,
'updated': updated,
'removed': removed,
'removed': [],
'unchanged': unchanged,
'out_of_scope': out_of_scope,
}
@@ -247,15 +256,14 @@ def _diff_fbneo(
else:
unchanged += 1
removed = [
f"{k[0]}:{k[1]}" for k in existing_by_key if k not in seen_keys
]
out_of_scope = sum(1 for k in existing_by_key if k not in seen_keys)
return {
'added': added,
'updated': updated,
'removed': removed,
'removed': [],
'unchanged': unchanged,
'out_of_scope': out_of_scope,
}

View File

@@ -0,0 +1,315 @@
"""Scrape FBNeo BIOS set hashes from upstream source via sparse clone.
Does NOT inherit BaseScraper (uses git sparse clone, not URL fetch).
Parses BDF_BOARDROM drivers from src/burn/drv/ to extract CRC32/size
for all BIOS ROM sets, then optionally merges into emulator profiles.
"""
from __future__ import annotations
import argparse
import json
import logging
import shutil
import subprocess
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any
import yaml
from scripts.scraper.fbneo_parser import parse_fbneo_source_tree
from scripts.scraper._hash_merge import compute_diff, merge_fbneo_profile
log = logging.getLogger(__name__)
REPO_URL = 'https://github.com/finalburnneo/FBNeo.git'
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
CLONE_DIR = REPO_ROOT / 'tmp' / 'fbneo'
CACHE_PATH = REPO_ROOT / 'data' / 'fbneo-hashes.json'
EMULATORS_DIR = REPO_ROOT / 'emulators'
STALE_HOURS = 24
def _is_cache_fresh() -> bool:
"""Check if the JSON cache exists and is less than 24 hours old."""
if not CACHE_PATH.exists():
return False
try:
data = json.loads(CACHE_PATH.read_text(encoding='utf-8'))
fetched_at = datetime.fromisoformat(data['fetched_at'])
return datetime.now(timezone.utc) - fetched_at < timedelta(hours=STALE_HOURS)
except (json.JSONDecodeError, KeyError, ValueError):
return False
def _sparse_clone() -> None:
"""Sparse clone FBNeo repo, checking out only src/burn/drv."""
if CLONE_DIR.exists():
shutil.rmtree(CLONE_DIR)
CLONE_DIR.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
[
'git', 'clone', '--depth', '1', '--filter=blob:none',
'--sparse', REPO_URL, str(CLONE_DIR),
],
check=True,
capture_output=True,
text=True,
)
subprocess.run(
['git', 'sparse-checkout', 'set', 'src/burn/drv', 'src/burner/resource.h'],
cwd=CLONE_DIR,
check=True,
capture_output=True,
text=True,
)
def _extract_version() -> tuple[str, str]:
"""Extract version tag and commit SHA from the cloned repo.
Returns (version, commit_sha). Falls back to resource.h if no tag.
"""
result = subprocess.run(
['git', 'describe', '--tags', '--abbrev=0'],
cwd=CLONE_DIR,
capture_output=True,
text=True,
)
# Prefer real version tags over pseudo-tags like "latest"
version = 'unknown'
if result.returncode == 0:
tag = result.stdout.strip()
if tag and tag != 'latest':
version = tag
# Fallback: resource.h
if version == 'unknown':
version = _version_from_resource_h()
# Last resort: use GitHub API for latest real release tag
if version == 'unknown':
try:
import urllib.request
import urllib.error
req = urllib.request.Request(
'https://api.github.com/repos/finalburnneo/FBNeo/tags?per_page=10',
headers={'User-Agent': 'retrobios-scraper/1.0'},
)
with urllib.request.urlopen(req, timeout=10) as resp:
import json as json_mod
tags = json_mod.loads(resp.read())
for t in tags:
if t['name'] != 'latest' and t['name'].startswith('v'):
version = t['name']
break
except (urllib.error.URLError, OSError):
pass
sha_result = subprocess.run(
['git', 'rev-parse', 'HEAD'],
cwd=CLONE_DIR,
capture_output=True,
text=True,
check=True,
)
commit = sha_result.stdout.strip()
return version, commit
def _version_from_resource_h() -> str:
"""Fallback: parse VER_FULL_VERSION_STR from resource.h."""
resource_h = CLONE_DIR / 'src' / 'burner' / 'resource.h'
if not resource_h.exists():
return 'unknown'
text = resource_h.read_text(encoding='utf-8', errors='replace')
for line in text.splitlines():
if 'VER_FULL_VERSION_STR' in line:
parts = line.split('"')
if len(parts) >= 2:
return parts[1]
return 'unknown'
def _cleanup() -> None:
"""Remove the sparse clone directory."""
if CLONE_DIR.exists():
shutil.rmtree(CLONE_DIR)
def fetch_and_cache(force: bool = False) -> dict[str, Any]:
"""Clone, parse, and write JSON cache. Returns the cache dict."""
if not force and _is_cache_fresh():
log.info('cache fresh, skipping clone (use --force to override)')
return json.loads(CACHE_PATH.read_text(encoding='utf-8'))
try:
log.info('sparse cloning %s', REPO_URL)
_sparse_clone()
log.info('extracting version')
version, commit = _extract_version()
log.info('parsing source tree')
bios_sets = parse_fbneo_source_tree(str(CLONE_DIR))
cache: dict[str, Any] = {
'source': 'finalburnneo/FBNeo',
'version': version,
'commit': commit,
'fetched_at': datetime.now(timezone.utc).isoformat(),
'bios_sets': bios_sets,
}
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(
json.dumps(cache, indent=2, ensure_ascii=False) + '\n',
encoding='utf-8',
)
log.info('wrote %d BIOS sets to %s', len(bios_sets), CACHE_PATH)
return cache
finally:
_cleanup()
def _find_fbneo_profiles() -> list[Path]:
"""Find emulator profiles whose upstream references finalburnneo/FBNeo."""
profiles: list[Path] = []
for path in sorted(EMULATORS_DIR.glob('*.yml')):
if path.name.endswith('.old.yml'):
continue
try:
data = yaml.safe_load(path.read_text(encoding='utf-8'))
except (yaml.YAMLError, OSError):
continue
if not data or not isinstance(data, dict):
continue
upstream = data.get('upstream', '')
if isinstance(upstream, str) and 'finalburnneo/fbneo' in upstream.lower():
profiles.append(path)
return profiles
def _format_diff(profile_name: str, diff: dict[str, Any], show_added: bool = True) -> str:
"""Format diff for a single profile."""
lines: list[str] = []
lines.append(f' {profile_name}:')
added = diff.get('added', [])
updated = diff.get('updated', [])
oos = diff.get('out_of_scope', 0)
if not added and not updated:
lines.append(' no changes')
if oos:
lines.append(f' . {oos} out of scope')
return '\n'.join(lines)
if show_added:
for label in added:
lines.append(f' + {label}')
elif added:
lines.append(f' + {len(added)} new ROMs available (main profile only)')
for label in updated:
lines.append(f' ~ {label}')
lines.append(f' = {diff["unchanged"]} unchanged')
if oos:
lines.append(f' . {oos} out of scope')
return '\n'.join(lines)
def run(
dry_run: bool = False,
force: bool = False,
json_output: bool = False,
) -> int:
"""Main entry point for the scraper."""
cache = fetch_and_cache(force=force)
version = cache.get('version', 'unknown')
commit = cache.get('commit', '?')[:12]
bios_sets = cache.get('bios_sets', {})
profiles = _find_fbneo_profiles()
if json_output:
result: dict[str, Any] = {
'source': cache.get('source'),
'version': version,
'commit': cache.get('commit'),
'bios_set_count': len(bios_sets),
'profiles': {},
}
for path in profiles:
diff = compute_diff(str(path), str(CACHE_PATH), mode='fbneo')
result['profiles'][path.stem] = diff
print(json.dumps(result, indent=2))
return 0
header = (
f'fbneo-hashes: {len(bios_sets)} BIOS sets '
f'from finalburnneo/FBNeo @ {version} ({commit})'
)
print(header)
print()
if not profiles:
print(' no matching emulator profiles found')
return 0
for path in profiles:
is_main = path.name == 'fbneo.yml'
diff = compute_diff(str(path), str(CACHE_PATH), mode='fbneo')
print(_format_diff(path.stem, diff, show_added=is_main))
if not dry_run and (diff['added'] or diff['updated']):
is_main = path.name == 'fbneo.yml'
merge_fbneo_profile(str(path), str(CACHE_PATH), write=True, add_new=is_main)
log.info('merged changes into %s', path.name)
return 0
def main() -> None:
parser = argparse.ArgumentParser(
description='Scrape FBNeo BIOS set hashes from upstream source',
)
parser.add_argument(
'--dry-run',
action='store_true',
help='show diff without writing changes',
)
parser.add_argument(
'--force',
action='store_true',
help='force re-clone even if cache is fresh',
)
parser.add_argument(
'--json',
action='store_true',
dest='json_output',
help='output diff as JSON',
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format='%(name)s: %(message)s',
)
sys.exit(run(
dry_run=args.dry_run,
force=args.force,
json_output=args.json_output,
))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,322 @@
"""Fetch MAME BIOS hashes from mamedev/mame source and merge into profiles.
Sparse clones the MAME repo, parses the source tree for BIOS root sets,
caches results to data/mame-hashes.json, and optionally merges into
emulator profiles that reference mamedev/mame upstream.
"""
from __future__ import annotations
import argparse
import json
import logging
import shutil
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from .mame_parser import parse_mame_source_tree
from ._hash_merge import compute_diff, merge_mame_profile
log = logging.getLogger(__name__)
_ROOT = Path(__file__).resolve().parents[2]
_CACHE_PATH = _ROOT / 'data' / 'mame-hashes.json'
_CLONE_DIR = _ROOT / 'tmp' / 'mame'
_EMULATORS_DIR = _ROOT / 'emulators'
_REPO_URL = 'https://github.com/mamedev/mame.git'
_STALE_HOURS = 24
# ── Cache ────────────────────────────────────────────────────────────
def _load_cache() -> dict[str, Any] | None:
if not _CACHE_PATH.exists():
return None
try:
with open(_CACHE_PATH, encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def _is_stale(cache: dict[str, Any] | None) -> bool:
if cache is None:
return True
fetched_at = cache.get('fetched_at')
if not fetched_at:
return True
try:
ts = datetime.fromisoformat(fetched_at)
age = datetime.now(timezone.utc) - ts
return age.total_seconds() > _STALE_HOURS * 3600
except (ValueError, TypeError):
return True
def _write_cache(data: dict[str, Any]) -> None:
_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_CACHE_PATH, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
log.info('cache written to %s', _CACHE_PATH)
# ── Git operations ───────────────────────────────────────────────────
def _run_git(args: list[str], cwd: Path | None = None) -> subprocess.CompletedProcess[str]:
return subprocess.run(
['git', *args],
cwd=cwd,
check=True,
capture_output=True,
text=True,
)
def _sparse_clone() -> None:
if _CLONE_DIR.exists():
shutil.rmtree(_CLONE_DIR)
_CLONE_DIR.parent.mkdir(parents=True, exist_ok=True)
log.info('sparse cloning mamedev/mame into %s', _CLONE_DIR)
_run_git([
'clone',
'--depth', '1',
'--filter=blob:none',
'--sparse',
_REPO_URL,
str(_CLONE_DIR),
])
_run_git(
['sparse-checkout', 'set', 'src/mame', 'src/devices'],
cwd=_CLONE_DIR,
)
def _get_version() -> str:
# version.cpp is generated at build time, not in the repo.
# Use GitHub API to get the latest release tag.
try:
req = urllib.request.Request(
'https://api.github.com/repos/mamedev/mame/releases/latest',
headers={'User-Agent': 'retrobios-scraper/1.0',
'Accept': 'application/vnd.github.v3+json'},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
tag = data.get('tag_name', '')
if tag:
return _parse_version_tag(tag)
except (urllib.error.URLError, json.JSONDecodeError, OSError):
pass
return 'unknown'
def _parse_version_tag(tag: str) -> str:
prefix = 'mame'
raw = tag.removeprefix(prefix) if tag.startswith(prefix) else tag
if raw.isdigit() and len(raw) >= 4:
return f'{raw[0]}.{raw[1:]}'
return raw
def _get_commit() -> str:
try:
result = _run_git(['rev-parse', 'HEAD'], cwd=_CLONE_DIR)
return result.stdout.strip()
except subprocess.CalledProcessError:
return ''
def _cleanup() -> None:
if _CLONE_DIR.exists():
log.info('cleaning up %s', _CLONE_DIR)
shutil.rmtree(_CLONE_DIR)
# ── Profile discovery ────────────────────────────────────────────────
def _find_mame_profiles() -> list[Path]:
profiles: list[Path] = []
for path in sorted(_EMULATORS_DIR.glob('*.yml')):
if path.name.endswith('.old.yml'):
continue
try:
with open(path, encoding='utf-8') as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
continue
upstream = data.get('upstream', '')
# Only match profiles tracking current MAME (not frozen snapshots
# which have upstream like "mamedev/mame/tree/mame0139")
if isinstance(upstream, str) and upstream.rstrip('/') == 'https://github.com/mamedev/mame':
profiles.append(path)
except (yaml.YAMLError, OSError):
continue
return profiles
# ── Diff formatting ──────────────────────────────────────────────────
def _format_diff(
profile_path: Path,
diff: dict[str, Any],
hashes: dict[str, Any],
show_added: bool = True,
) -> list[str]:
lines: list[str] = []
name = profile_path.stem
added = diff.get('added', [])
updated = diff.get('updated', [])
removed = diff.get('removed', [])
unchanged = diff.get('unchanged', 0)
if not added and not updated and not removed:
lines.append(f' {name}:')
lines.append(' no changes')
return lines
lines.append(f' {name}:')
if show_added:
bios_sets = hashes.get('bios_sets', {})
for set_name in added:
rom_count = len(bios_sets.get(set_name, {}).get('roms', []))
source_file = bios_sets.get(set_name, {}).get('source_file', '')
source_line = bios_sets.get(set_name, {}).get('source_line', '')
ref = f'{source_file}:{source_line}' if source_file else ''
lines.append(f' + {set_name}.zip ({ref}, {rom_count} ROMs)')
elif added:
lines.append(f' + {len(added)} new sets available (main profile only)')
for set_name in updated:
lines.append(f' ~ {set_name}.zip (contents changed)')
oos = diff.get('out_of_scope', 0)
lines.append(f' = {unchanged} unchanged')
if oos:
lines.append(f' . {oos} out of scope (not BIOS root sets)')
return lines
# ── Main ─────────────────────────────────────────────────────────────
def _fetch_hashes(force: bool) -> dict[str, Any]:
cache = _load_cache()
if not force and not _is_stale(cache):
log.info('using cached data from %s', cache.get('fetched_at', ''))
return cache # type: ignore[return-value]
try:
_sparse_clone()
bios_sets = parse_mame_source_tree(str(_CLONE_DIR))
version = _get_version()
commit = _get_commit()
data: dict[str, Any] = {
'source': 'mamedev/mame',
'version': version,
'commit': commit,
'fetched_at': datetime.now(timezone.utc).isoformat(),
'bios_sets': bios_sets,
}
_write_cache(data)
return data
finally:
_cleanup()
def _run(args: argparse.Namespace) -> None:
hashes = _fetch_hashes(args.force)
total_sets = len(hashes.get('bios_sets', {}))
version = hashes.get('version', 'unknown')
commit = hashes.get('commit', '')[:12]
if args.json:
json.dump(hashes, sys.stdout, indent=2, ensure_ascii=False)
sys.stdout.write('\n')
return
print(f'mame-hashes: {total_sets} BIOS root sets from mamedev/mame'
f' @ {version} ({commit})')
print()
profiles = _find_mame_profiles()
if not profiles:
print(' no profiles with mamedev/mame upstream found')
return
for profile_path in profiles:
is_main = profile_path.name == 'mame.yml'
diff = compute_diff(str(profile_path), str(_CACHE_PATH), mode='mame')
lines = _format_diff(profile_path, diff, hashes, show_added=is_main)
for line in lines:
print(line)
if not args.dry_run:
updated = diff.get('updated', [])
added = diff.get('added', []) if is_main else []
if added or updated:
merge_mame_profile(
str(profile_path),
str(_CACHE_PATH),
write=True,
add_new=is_main,
)
log.info('merged into %s', profile_path.name)
print()
if args.dry_run:
print('(dry run, no files modified)')
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog='mame_hash_scraper',
description='Fetch MAME BIOS hashes from source and merge into profiles.',
)
parser.add_argument(
'--dry-run',
action='store_true',
help='show diff only, do not modify profiles',
)
parser.add_argument(
'--json',
action='store_true',
help='output raw JSON to stdout',
)
parser.add_argument(
'--force',
action='store_true',
help='re-fetch even if cache is fresh',
)
return parser
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s',
)
parser = build_parser()
args = parser.parse_args()
_run(args)
if __name__ == '__main__':
main()

View File

@@ -22,9 +22,9 @@ _MACHINE_MACROS = re.compile(
_ROM_START = re.compile(r'ROM_START\s*\(\s*(\w+)\s*\)')
_ROM_END = re.compile(r'ROM_END')
# ROM_REGION( tag, offset, size )
# ROM_REGION variants: ROM_REGION, ROM_REGION16_BE, ROM_REGION16_LE, ROM_REGION32_LE, etc.
_ROM_REGION = re.compile(
r'ROM_REGION\s*\('
r'ROM_REGION\w*\s*\('
r'\s*(0x[\da-fA-F]+|\d+)\s*,' # size
r'\s*"([^"]+)"\s*,', # tag
)
@@ -37,10 +37,16 @@ _ROM_SYSTEM_BIOS = re.compile(
r'\s*"([^"]+)"\s*\)', # description
)
# All ROM_LOAD variants: ROM_LOAD, ROMX_LOAD, ROM_LOAD16_BYTE, ROM_LOAD16_WORD, etc.
# All ROM_LOAD variants including custom BIOS macros.
# Standard: ROM_LOAD("name", offset, size, hash)
# BIOS variant: ROM_LOAD_BIOS(biosidx, "name", offset, size, hash)
# ROM_LOAD16_WORD_SWAP_BIOS(biosidx, "name", offset, size, hash)
# The key pattern: any macro containing "ROM_LOAD" or "ROMX_LOAD" in its name,
# with the first quoted string being the ROM filename.
_ROM_LOAD = re.compile(
r'(ROMX?_LOAD(?:16_BYTE|16_WORD|16_WORD_SWAP|32_BYTE|32_WORD|32_WORD_SWAP)?)\s*\('
r'\s*"([^"]+)"\s*,' # name
r'\b\w*ROMX?_LOAD\w*\s*\('
r'[^"]*' # skip any args before the filename (e.g., bios index)
r'"([^"]+)"\s*,' # name (first quoted string)
r'\s*(0x[\da-fA-F]+|\d+)\s*,' # offset
r'\s*(0x[\da-fA-F]+|\d+)\s*,', # size
)
@@ -104,9 +110,9 @@ def find_bios_root_sets(source: str, filename: str) -> dict[str, dict]:
def parse_rom_block(source: str, set_name: str) -> list[dict]:
"""Parse ROM definitions for a given set name.
Finds the ROM_START(set_name)...ROM_END block and extracts all
ROM_LOAD entries with their metadata. Skips NO_DUMP entries,
flags BAD_DUMP entries.
Finds the ROM_START(set_name)...ROM_END block, expands local
#define macros that contain ROM_LOAD/ROM_REGION calls, then
extracts all ROM entries. Skips NO_DUMP, flags BAD_DUMP.
"""
pattern = re.compile(
r'ROM_START\s*\(\s*' + re.escape(set_name) + r'\s*\)',
@@ -120,6 +126,13 @@ def parse_rom_block(source: str, set_name: str) -> list[dict]:
return []
block = source[start_match.end():end_match.start()]
# Pre-expand macros: find #define macros in the file that contain
# ROM_LOAD/ROM_REGION/ROM_SYSTEM_BIOS calls, then expand their
# invocations within the ROM block.
macros = _collect_rom_macros(source)
block = _expand_macros(block, macros, depth=5)
return _parse_rom_entries(block)
@@ -156,6 +169,68 @@ def parse_mame_source_tree(base_path: str) -> dict[str, dict]:
return results
# Regex for #define macros that span multiple lines (backslash continuation)
_DEFINE_RE = re.compile(
r'^\s*#\s*define\s+(\w+)(?:\([^)]*\))?\s*((?:.*\\\n)*.*)',
re.MULTILINE,
)
# ROM-related tokens that indicate a macro is relevant for expansion
_ROM_TOKENS = {'ROM_LOAD', 'ROMX_LOAD', 'ROM_REGION', 'ROM_SYSTEM_BIOS',
'ROM_FILL', 'ROM_COPY', 'ROM_RELOAD'}
def _collect_rom_macros(source: str) -> dict[str, str]:
"""Collect #define macros that contain ROM-related calls.
Returns {macro_name: expanded_body} with backslash continuations joined.
Only collects macros that contain actual ROM data (quoted filenames),
not wrapper macros like ROM_LOAD16_WORD_SWAP_BIOS that just redirect
to ROMX_LOAD with formal parameters.
"""
macros: dict[str, str] = {}
for m in _DEFINE_RE.finditer(source):
name = m.group(1)
body = m.group(2)
# Join backslash-continued lines
body = body.replace('\\\n', ' ')
# Only keep macros that contain ROM-related tokens
if not any(tok in body for tok in _ROM_TOKENS):
continue
# Skip wrapper macros: if the body contains ROMX_LOAD/ROM_LOAD
# with unquoted args (formal parameters), it's a wrapper.
# These are already recognized by the _ROM_LOAD regex directly.
if re.search(r'ROMX?_LOAD\s*\(\s*\w+\s*,\s*\w+\s*,', body):
continue
macros[name] = body
return macros
def _expand_macros(block: str, macros: dict[str, str], depth: int = 5) -> str:
"""Expand macro invocations in a ROM block.
Handles both simple macros (NEOGEO_BIOS) and parameterized ones
(NEOGEO_UNIBIOS_2_2_AND_NEWER(16)). Recurses up to `depth` levels
for nested macros.
"""
if depth <= 0 or not macros:
return block
changed = True
iterations = 0
while changed and iterations < depth:
changed = False
iterations += 1
for name, body in macros.items():
# Match macro invocation: NAME or NAME(args)
pattern = re.compile(r'\b' + re.escape(name) + r'(?:\s*\([^)]*\))?')
if pattern.search(block):
block = pattern.sub(body, block)
changed = True
return block
def _find_closing_paren(source: str, start: int) -> int:
"""Find the matching closing paren for source[start] which must be '('."""
depth = 0
@@ -218,74 +293,90 @@ def _split_macro_args(inner: str) -> list[str]:
def _parse_rom_entries(block: str) -> list[dict]:
"""Parse ROM entries from a ROM block (content between ROM_START and ROM_END)."""
"""Parse ROM entries from a ROM block (content between ROM_START and ROM_END).
Uses regex scanning over the entire block (not line-by-line) to handle
macro-expanded content where multiple statements may be on one line.
Processes matches in order of appearance to track region and BIOS context.
"""
roms: list[dict] = []
current_region = ''
bios_labels: dict[int, tuple[str, str]] = {} # index -> (label, description)
bios_labels: dict[int, tuple[str, str]] = {}
for line in block.split('\n'):
stripped = line.strip()
# Build a combined pattern that matches all interesting tokens
# and process them in order of occurrence
token_patterns = [
('region', _ROM_REGION),
('bios_label', _ROM_SYSTEM_BIOS),
('rom_load', _ROM_LOAD),
]
# Track region changes
region_match = _ROM_REGION.search(stripped)
if region_match:
current_region = region_match.group(2)
continue
# Collect all matches with their positions
events: list[tuple[int, str, re.Match]] = []
for tag, pat in token_patterns:
for m in pat.finditer(block):
events.append((m.start(), tag, m))
# Track BIOS labels
bios_match = _ROM_SYSTEM_BIOS.search(stripped)
if bios_match:
idx = int(bios_match.group(1))
bios_labels[idx] = (bios_match.group(2), bios_match.group(3))
continue
# Sort by position in block
events.sort(key=lambda e: e[0])
# ROM_LOAD variants
load_match = _ROM_LOAD.search(stripped)
if not load_match:
continue
for _pos, tag, m in events:
if tag == 'region':
current_region = m.group(2)
elif tag == 'bios_label':
idx = int(m.group(1))
bios_labels[idx] = (m.group(2), m.group(3))
elif tag == 'rom_load':
# Get the full macro call as context (find closing paren)
context_start = m.start()
# Find the opening paren of the ROM_LOAD macro
paren_pos = block.find('(', context_start)
if paren_pos != -1:
close_pos = _find_closing_paren(block, paren_pos)
context_end = close_pos + 1 if close_pos != -1 else m.end() + 200
else:
context_end = m.end() + 200
context = block[context_start:min(context_end, len(block))]
# Skip NO_DUMP
if _NO_DUMP.search(stripped):
continue
if _NO_DUMP.search(context):
continue
rom_name = load_match.group(2)
rom_size = _parse_int(load_match.group(4))
rom_name = m.group(1)
rom_size = _parse_int(m.group(3))
# Extract CRC32 and SHA1
crc_sha_match = _CRC_SHA.search(stripped)
crc32 = ''
sha1 = ''
if crc_sha_match:
crc32 = crc_sha_match.group(1).lower()
sha1 = crc_sha_match.group(2).lower()
crc_sha_match = _CRC_SHA.search(context)
crc32 = ''
sha1 = ''
if crc_sha_match:
crc32 = crc_sha_match.group(1).lower()
sha1 = crc_sha_match.group(2).lower()
bad_dump = bool(_BAD_DUMP.search(stripped))
bad_dump = bool(_BAD_DUMP.search(context))
# Check for ROM_BIOS association
bios_index = None
bios_label = ''
bios_description = ''
bios_ref = _ROM_BIOS.search(stripped)
if bios_ref:
bios_index = int(bios_ref.group(1))
if bios_index in bios_labels:
bios_label, bios_description = bios_labels[bios_index]
bios_index = None
bios_label = ''
bios_description = ''
bios_ref = _ROM_BIOS.search(context)
if bios_ref:
bios_index = int(bios_ref.group(1))
if bios_index in bios_labels:
bios_label, bios_description = bios_labels[bios_index]
entry: dict = {
'name': rom_name,
'size': rom_size,
'crc32': crc32,
'sha1': sha1,
'region': current_region,
'bad_dump': bad_dump,
}
entry: dict = {
'name': rom_name,
'size': rom_size,
'crc32': crc32,
'sha1': sha1,
'region': current_region,
'bad_dump': bad_dump,
}
if bios_index is not None:
entry['bios_index'] = bios_index
entry['bios_label'] = bios_label
entry['bios_description'] = bios_description
if bios_index is not None:
entry['bios_index'] = bios_index
entry['bios_label'] = bios_label
entry['bios_description'] = bios_description
roms.append(entry)
roms.append(entry)
return roms

View File

@@ -3608,5 +3608,125 @@ class TestE2E(unittest.TestCase):
self.assertIn("retrobat", exporters)
# ---------------------------------------------------------------
# Hash scraper: parsers + merge
# ---------------------------------------------------------------
def test_mame_parser_finds_bios_root_sets(self):
from scripts.scraper.mame_parser import find_bios_root_sets, parse_rom_block
source = '''
ROM_START( neogeo )
ROM_REGION( 0x020000, "mainbios", 0 )
ROM_LOAD( "sp-s2.sp1", 0x00000, 0x020000, CRC(9036d879) SHA1(4f834c580f3471ce40c3210ef5e7491df38d8851) )
ROM_END
GAME( 1990, neogeo, 0, ng, neogeo, ng_state, empty_init, ROT0, "SNK", "Neo Geo", MACHINE_IS_BIOS_ROOT )
ROM_START( pacman )
ROM_REGION( 0x10000, "maincpu", 0 )
ROM_LOAD( "pacman.6e", 0x0000, 0x1000, CRC(c1e6ab10) SHA1(e87e059c5be45753f7e9f33dff851f16d6751181) )
ROM_END
GAME( 1980, pacman, 0, pacman, pacman, pacman_state, empty_init, ROT90, "Namco", "Pac-Man", 0 )
'''
sets = find_bios_root_sets(source, "neogeo.cpp")
self.assertIn("neogeo", sets)
self.assertNotIn("pacman", sets)
roms = parse_rom_block(source, "neogeo")
self.assertEqual(len(roms), 1)
self.assertEqual(roms[0]["crc32"], "9036d879")
def test_fbneo_parser_finds_bios_sets(self):
from scripts.scraper.fbneo_parser import find_bios_sets, parse_rom_info
source = '''
static struct BurnRomInfo neogeoRomDesc[] = {
{ "sp-s2.sp1", 0x020000, 0x9036d879, BRF_ESS | BRF_BIOS },
{ "", 0, 0, 0 }
};
STD_ROM_PICK(neogeo)
STD_ROM_FN(neogeo)
struct BurnDriver BurnDrvneogeo = {
"neogeo", NULL, NULL, NULL, "1990",
"Neo Geo\\0", "BIOS only", "SNK", "Neo Geo MVS",
NULL, NULL, NULL, NULL, BDF_BOARDROM, 0, 0,
0, 0, 0, NULL, neogeoRomInfo, neogeoRomName, NULL, NULL,
NULL, NULL, NULL, NULL, 0
};
'''
sets = find_bios_sets(source, "d_neogeo.cpp")
self.assertIn("neogeo", sets)
roms = parse_rom_info(source, "neogeo")
self.assertEqual(len(roms), 1)
self.assertEqual(roms[0]["crc32"], "9036d879")
def test_mame_merge_preserves_manual_fields(self):
import json as json_mod
from scripts.scraper._hash_merge import merge_mame_profile
merge_dir = os.path.join(self.root, "merge_mame")
os.makedirs(merge_dir)
profile = {
"emulator": "Test", "type": "libretro",
"upstream": "https://github.com/mamedev/mame",
"core_version": "0.285",
"files": [{
"name": "neogeo.zip", "required": True, "category": "bios_zip",
"system": "snk-neogeo-mvs", "note": "MVS BIOS",
"source_ref": "old.cpp:1",
"contents": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "oldcrc"}],
}],
}
profile_path = os.path.join(merge_dir, "test.yml")
with open(profile_path, "w") as f:
yaml.dump(profile, f, sort_keys=False)
hashes = {
"source": "mamedev/mame", "version": "0.286", "commit": "abc",
"fetched_at": "2026-03-30T00:00:00Z",
"bios_sets": {"neogeo": {
"source_file": "neo.cpp", "source_line": 42,
"roms": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "newcrc", "sha1": "abc123"}],
}},
}
hashes_path = os.path.join(merge_dir, "hashes.json")
with open(hashes_path, "w") as f:
json_mod.dump(hashes, f)
result = merge_mame_profile(profile_path, hashes_path)
neo = next(f for f in result["files"] if f["name"] == "neogeo.zip")
self.assertEqual(neo["contents"][0]["crc32"], "newcrc")
self.assertEqual(neo["system"], "snk-neogeo-mvs")
self.assertEqual(neo["note"], "MVS BIOS")
self.assertEqual(neo["source_ref"], "neo.cpp:42")
self.assertEqual(result["core_version"], "0.286")
def test_fbneo_merge_updates_individual_roms(self):
import json as json_mod
from scripts.scraper._hash_merge import merge_fbneo_profile
merge_dir = os.path.join(self.root, "merge_fbneo")
os.makedirs(merge_dir)
profile = {
"emulator": "FBNeo", "type": "libretro",
"upstream": "https://github.com/finalburnneo/FBNeo",
"core_version": "v1.0.0.02",
"files": [{"name": "sp-s2.sp1", "archive": "neogeo.zip",
"system": "snk-neogeo-mvs", "required": True,
"size": 131072, "crc32": "oldcrc"}],
}
profile_path = os.path.join(merge_dir, "fbneo.yml")
with open(profile_path, "w") as f:
yaml.dump(profile, f, sort_keys=False)
hashes = {
"source": "finalburnneo/FBNeo", "version": "v1.0.0.03", "commit": "def",
"fetched_at": "2026-03-30T00:00:00Z",
"bios_sets": {"neogeo": {
"source_file": "neo.cpp", "source_line": 10,
"roms": [{"name": "sp-s2.sp1", "size": 131072, "crc32": "newcrc"}],
}},
}
hashes_path = os.path.join(merge_dir, "hashes.json")
with open(hashes_path, "w") as f:
json_mod.dump(hashes, f)
result = merge_fbneo_profile(profile_path, hashes_path)
rom = next(f for f in result["files"] if f["name"] == "sp-s2.sp1")
self.assertEqual(rom["crc32"], "newcrc")
self.assertEqual(rom["system"], "snk-neogeo-mvs")
self.assertEqual(result["core_version"], "v1.0.0.03")
if __name__ == "__main__":
unittest.main()

View File

@@ -210,9 +210,10 @@ class TestMameMerge(unittest.TestCase):
self.assertEqual(len(non_bios), 1)
self.assertEqual(non_bios[0]['name'], 'hiscore.dat')
def test_merge_keeps_removed_bios_set(self) -> None:
def test_merge_keeps_unmatched_bios_set(self) -> None:
"""Entries not in scraper scope stay untouched (no _upstream_removed)."""
hashes = _make_mame_hashes()
hashes['bios_sets'] = {} # neogeo removed upstream
hashes['bios_sets'] = {} # nothing from scraper
with tempfile.TemporaryDirectory() as td:
p = Path(td)
@@ -223,7 +224,8 @@ class TestMameMerge(unittest.TestCase):
bios_files = [f for f in result['files'] if f.get('category') == 'bios_zip']
self.assertEqual(len(bios_files), 1)
self.assertTrue(bios_files[0].get('_upstream_removed'))
self.assertNotIn('_upstream_removed', bios_files[0])
self.assertEqual(bios_files[0]['name'], 'neogeo.zip')
def test_merge_updates_core_version(self) -> None:
with tempfile.TemporaryDirectory() as td:
@@ -311,7 +313,8 @@ class TestFbneoMerge(unittest.TestCase):
self.assertEqual(len(non_archive), 1)
self.assertEqual(non_archive[0]['name'], 'hiscore.dat')
def test_merge_marks_removed_roms(self) -> None:
def test_merge_keeps_unmatched_roms(self) -> None:
"""Entries not in scraper scope stay untouched (no _upstream_removed)."""
hashes = _make_fbneo_hashes()
hashes['bios_sets'] = {}
@@ -324,7 +327,7 @@ class TestFbneoMerge(unittest.TestCase):
archive_files = [f for f in result['files'] if 'archive' in f]
self.assertEqual(len(archive_files), 1)
self.assertTrue(archive_files[0].get('_upstream_removed'))
self.assertNotIn('_upstream_removed', archive_files[0])
def test_merge_updates_core_version(self) -> None:
with tempfile.TemporaryDirectory() as td:
@@ -362,7 +365,8 @@ class TestDiff(unittest.TestCase):
self.assertEqual(len(diff['removed']), 0)
self.assertEqual(diff['unchanged'], 0)
def test_diff_mame_detects_removed(self) -> None:
def test_diff_mame_out_of_scope(self) -> None:
"""Items in profile but not in scraper output = out of scope, not removed."""
hashes = _make_mame_hashes()
hashes['bios_sets'] = {}
@@ -373,9 +377,9 @@ class TestDiff(unittest.TestCase):
diff = compute_diff(profile_path, hashes_path, mode='mame')
self.assertIn('neogeo', diff['removed'])
self.assertEqual(diff['removed'], [])
self.assertEqual(diff['out_of_scope'], 1)
self.assertEqual(len(diff['added']), 0)
self.assertEqual(len(diff['updated']), 0)
def test_diff_fbneo_detects_changes(self) -> None:
hashes = _make_fbneo_hashes()