diff --git a/scripts/scraper/_hash_merge.py b/scripts/scraper/_hash_merge.py new file mode 100644 index 00000000..72d2e8e0 --- /dev/null +++ b/scripts/scraper/_hash_merge.py @@ -0,0 +1,350 @@ +"""Merge fetched hash data into emulator YAML profiles. + +Supports two strategies: +- MAME: bios_zip entries with contents lists +- FBNeo: individual ROM entries grouped by archive field +""" + +from __future__ import annotations + +import json +import shutil +from pathlib import Path +from typing import Any + +import yaml + + +def merge_mame_profile( + profile_path: str, + hashes_path: str, + write: bool = False, +) -> dict[str, Any]: + """Merge MAME bios_zip entries from upstream hash data. + + Preserves system, note, required per entry. Updates contents and + source_ref from the hashes JSON. New sets get system=None, + required=True, category=bios_zip. Removed sets are flagged with + _upstream_removed=True. + + If write=True, backs up existing profile to .old.yml before writing. + """ + profile = _load_yaml(profile_path) + hashes = _load_json(hashes_path) + + profile['core_version'] = hashes.get('version', profile.get('core_version')) + + files = profile.get('files', []) + bios_zip, non_bios = _split_files(files, lambda f: f.get('category') == 'bios_zip') + + existing_by_name: dict[str, dict] = {} + for entry in bios_zip: + key = _zip_name_to_set(entry['name']) + existing_by_name[key] = entry + + merged: list[dict] = [] + seen_sets: set[str] = set() + + for set_name, set_data in hashes.get('bios_sets', {}).items(): + seen_sets.add(set_name) + contents = _build_contents(set_data.get('roms', [])) + source_ref = _build_source_ref(set_data) + + if set_name in existing_by_name: + entry = existing_by_name[set_name].copy() + entry['contents'] = contents + if source_ref: + entry['source_ref'] = source_ref + else: + entry = { + 'name': f'{set_name}.zip', + 'required': True, + 'category': 'bios_zip', + 'system': None, + 'source_ref': source_ref, + 'contents': contents, + } + + merged.append(entry) + + for set_name, entry in existing_by_name.items(): + if set_name not in seen_sets: + removed = entry.copy() + removed['_upstream_removed'] = True + merged.append(removed) + + profile['files'] = non_bios + merged + + if write: + _backup_and_write(profile_path, profile) + + return profile + + +def merge_fbneo_profile( + profile_path: str, + hashes_path: str, + write: bool = False, +) -> dict[str, Any]: + """Merge FBNeo individual ROM entries from upstream hash data. + + Preserves system, required per entry. Updates crc32, size, and + source_ref. New ROMs get archive=set_name.zip, required=True. + + If write=True, backs up existing profile to .old.yml before writing. + """ + profile = _load_yaml(profile_path) + hashes = _load_json(hashes_path) + + profile['core_version'] = hashes.get('version', profile.get('core_version')) + + files = profile.get('files', []) + archive_files, non_archive = _split_files(files, lambda f: 'archive' in f) + + existing_by_key: dict[tuple[str, str], dict] = {} + for entry in archive_files: + key = (entry['archive'], entry['name']) + existing_by_key[key] = entry + + merged: list[dict] = [] + seen_keys: set[tuple[str, str]] = set() + + for set_name, set_data in hashes.get('bios_sets', {}).items(): + archive_name = f'{set_name}.zip' + source_ref = _build_source_ref(set_data) + + for rom in set_data.get('roms', []): + rom_name = rom['name'] + key = (archive_name, rom_name) + seen_keys.add(key) + + if key in existing_by_key: + entry = existing_by_key[key].copy() + entry['size'] = rom['size'] + entry['crc32'] = rom['crc32'] + if rom.get('sha1'): + entry['sha1'] = rom['sha1'] + if source_ref: + entry['source_ref'] = source_ref + else: + entry = { + 'name': rom_name, + 'archive': archive_name, + 'required': True, + 'size': rom['size'], + 'crc32': rom['crc32'], + } + if rom.get('sha1'): + entry['sha1'] = rom['sha1'] + if source_ref: + entry['source_ref'] = source_ref + + merged.append(entry) + + for key, entry in existing_by_key.items(): + if key not in seen_keys: + removed = entry.copy() + removed['_upstream_removed'] = True + merged.append(removed) + + profile['files'] = non_archive + merged + + if write: + _backup_and_write(profile_path, profile) + + return profile + + +def compute_diff( + profile_path: str, + hashes_path: str, + mode: str = 'mame', +) -> dict[str, Any]: + """Compute diff between profile and hashes without writing. + + Returns counts of added, updated, removed, and unchanged entries. + """ + profile = _load_yaml(profile_path) + hashes = _load_json(hashes_path) + + if mode == 'mame': + return _diff_mame(profile, hashes) + return _diff_fbneo(profile, hashes) + + +def _diff_mame( + profile: dict[str, Any], + hashes: dict[str, Any], +) -> dict[str, Any]: + files = profile.get('files', []) + bios_zip, _ = _split_files(files, lambda f: f.get('category') == 'bios_zip') + + existing_by_name: dict[str, dict] = {} + for entry in bios_zip: + existing_by_name[_zip_name_to_set(entry['name'])] = entry + + added: list[str] = [] + updated: list[str] = [] + unchanged = 0 + + bios_sets = hashes.get('bios_sets', {}) + for set_name, set_data in bios_sets.items(): + if set_name not in existing_by_name: + added.append(set_name) + continue + + old_entry = existing_by_name[set_name] + new_contents = _build_contents(set_data.get('roms', [])) + old_contents = old_entry.get('contents', []) + + if _contents_differ(old_contents, new_contents): + updated.append(set_name) + else: + unchanged += 1 + + removed = [s for s in existing_by_name if s not in bios_sets] + + return { + 'added': added, + 'updated': updated, + 'removed': removed, + 'unchanged': unchanged, + } + + +def _diff_fbneo( + profile: dict[str, Any], + hashes: dict[str, Any], +) -> dict[str, Any]: + files = profile.get('files', []) + archive_files, _ = _split_files(files, lambda f: 'archive' in f) + + existing_by_key: dict[tuple[str, str], dict] = {} + for entry in archive_files: + existing_by_key[(entry['archive'], entry['name'])] = entry + + added: list[str] = [] + updated: list[str] = [] + unchanged = 0 + + seen_keys: set[tuple[str, str]] = set() + bios_sets = hashes.get('bios_sets', {}) + + for set_name, set_data in bios_sets.items(): + archive_name = f'{set_name}.zip' + for rom in set_data.get('roms', []): + key = (archive_name, rom['name']) + seen_keys.add(key) + label = f"{archive_name}:{rom['name']}" + + if key not in existing_by_key: + added.append(label) + continue + + old = existing_by_key[key] + if old.get('crc32') != rom.get('crc32') or old.get('size') != rom.get('size'): + updated.append(label) + else: + unchanged += 1 + + removed = [ + f"{k[0]}:{k[1]}" for k in existing_by_key if k not in seen_keys + ] + + return { + 'added': added, + 'updated': updated, + 'removed': removed, + 'unchanged': unchanged, + } + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _load_yaml(path: str) -> dict[str, Any]: + with open(path, encoding='utf-8') as f: + return yaml.safe_load(f) or {} + + +def _load_json(path: str) -> dict[str, Any]: + with open(path, encoding='utf-8') as f: + return json.load(f) + + +def _split_files( + files: list[dict], + predicate: Any, +) -> tuple[list[dict], list[dict]]: + matching: list[dict] = [] + rest: list[dict] = [] + for f in files: + if predicate(f): + matching.append(f) + else: + rest.append(f) + return matching, rest + + +def _zip_name_to_set(name: str) -> str: + if name.endswith('.zip'): + return name[:-4] + return name + + +def _build_contents(roms: list[dict]) -> list[dict]: + contents: list[dict] = [] + for rom in roms: + entry: dict[str, Any] = { + 'name': rom['name'], + 'size': rom['size'], + 'crc32': rom['crc32'], + } + if rom.get('sha1'): + entry['sha1'] = rom['sha1'] + desc = rom.get('bios_description') or rom.get('bios_label') or '' + if desc: + entry['description'] = desc + if rom.get('bad_dump'): + entry['bad_dump'] = True + contents.append(entry) + return contents + + +def _build_source_ref(set_data: dict) -> str: + source_file = set_data.get('source_file', '') + source_line = set_data.get('source_line') + if source_file and source_line is not None: + return f'{source_file}:{source_line}' + return source_file + + +def _contents_differ(old: list[dict], new: list[dict]) -> bool: + if len(old) != len(new): + return True + old_by_name = {c['name']: c for c in old} + for entry in new: + prev = old_by_name.get(entry['name']) + if prev is None: + return True + if prev.get('crc32') != entry.get('crc32'): + return True + if prev.get('size') != entry.get('size'): + return True + if prev.get('sha1') != entry.get('sha1'): + return True + return False + + +def _backup_and_write(path: str, data: dict) -> None: + p = Path(path) + backup = p.with_suffix('.old.yml') + shutil.copy2(p, backup) + with open(p, 'w', encoding='utf-8') as f: + yaml.dump( + data, + f, + default_flow_style=False, + allow_unicode=True, + sort_keys=False, + ) diff --git a/tests/test_hash_merge.py b/tests/test_hash_merge.py new file mode 100644 index 00000000..05b9a3ca --- /dev/null +++ b/tests/test_hash_merge.py @@ -0,0 +1,419 @@ +"""Tests for the hash merge module.""" + +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path + +import yaml + +from scripts.scraper._hash_merge import ( + compute_diff, + merge_fbneo_profile, + merge_mame_profile, +) + + +def _write_yaml(path: Path, data: dict) -> str: + p = str(path) + with open(p, 'w', encoding='utf-8') as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False) + return p + + +def _write_json(path: Path, data: dict) -> str: + p = str(path) + with open(p, 'w', encoding='utf-8') as f: + json.dump(data, f) + return p + + +def _make_mame_profile(**overrides: object) -> dict: + base = { + 'emulator': 'MAME', + 'core_version': '0.285', + 'files': [ + { + 'name': 'neogeo.zip', + 'required': True, + 'category': 'bios_zip', + 'system': 'snk-neogeo-mvs', + 'source_ref': 'src/mame/neogeo/neogeo.cpp:2400', + 'contents': [ + { + 'name': 'sp-s2.sp1', + 'size': 131072, + 'crc32': 'oldcrc32', + 'description': 'Europe MVS (Ver. 2)', + }, + ], + }, + ], + } + base.update(overrides) + return base + + +def _make_mame_hashes(**overrides: object) -> dict: + base = { + 'source': 'mamedev/mame', + 'version': '0.286', + 'commit': 'abc123', + 'fetched_at': '2026-03-30T12:00:00Z', + 'bios_sets': { + 'neogeo': { + 'source_file': 'src/mame/neogeo/neogeo.cpp', + 'source_line': 2432, + 'roms': [ + { + 'name': 'sp-s2.sp1', + 'size': 131072, + 'crc32': '9036d879', + 'sha1': '4f834c55', + 'region': 'mainbios', + 'bios_label': 'euro', + 'bios_description': 'Europe MVS (Ver. 2)', + }, + ], + }, + }, + } + base.update(overrides) + return base + + +def _make_fbneo_profile(**overrides: object) -> dict: + base = { + 'emulator': 'FinalBurn Neo', + 'core_version': 'v1.0.0.02', + 'files': [ + { + 'name': 'sp-s2.sp1', + 'archive': 'neogeo.zip', + 'system': 'snk-neogeo-mvs', + 'required': True, + 'size': 131072, + 'crc32': 'oldcrc32', + 'source_ref': 'src/burn/drv/neogeo/d_neogeo.cpp:1605', + }, + { + 'name': 'hiscore.dat', + 'required': False, + }, + ], + } + base.update(overrides) + return base + + +def _make_fbneo_hashes(**overrides: object) -> dict: + base = { + 'source': 'finalburnneo/FBNeo', + 'version': 'v1.0.0.03', + 'commit': 'def456', + 'fetched_at': '2026-03-30T12:00:00Z', + 'bios_sets': { + 'neogeo': { + 'source_file': 'src/burn/drv/neogeo/d_neogeo.cpp', + 'source_line': 1604, + 'roms': [ + { + 'name': 'sp-s2.sp1', + 'size': 131072, + 'crc32': '9036d879', + 'sha1': 'aabbccdd', + }, + ], + }, + }, + } + base.update(overrides) + return base + + +class TestMameMerge(unittest.TestCase): + """Tests for merge_mame_profile.""" + + def test_merge_updates_contents(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + result = merge_mame_profile(profile_path, hashes_path) + + bios_files = [f for f in result['files'] if f.get('category') == 'bios_zip'] + self.assertEqual(len(bios_files), 1) + contents = bios_files[0]['contents'] + self.assertEqual(contents[0]['crc32'], '9036d879') + self.assertEqual(contents[0]['sha1'], '4f834c55') + self.assertEqual(contents[0]['description'], 'Europe MVS (Ver. 2)') + + def test_merge_preserves_manual_fields(self) -> None: + profile = _make_mame_profile() + profile['files'][0]['note'] = 'manually curated note' + profile['files'][0]['system'] = 'snk-neogeo-mvs' + profile['files'][0]['required'] = False + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', profile) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + result = merge_mame_profile(profile_path, hashes_path) + + entry = [f for f in result['files'] if f.get('category') == 'bios_zip'][0] + self.assertEqual(entry['note'], 'manually curated note') + self.assertEqual(entry['system'], 'snk-neogeo-mvs') + self.assertFalse(entry['required']) + + def test_merge_adds_new_bios_set(self) -> None: + hashes = _make_mame_hashes() + hashes['bios_sets']['pgm'] = { + 'source_file': 'src/mame/igs/pgm.cpp', + 'source_line': 5515, + 'roms': [ + {'name': 'pgm_t01s.rom', 'size': 2097152, 'crc32': '1a7123a0'}, + ], + } + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + result = merge_mame_profile(profile_path, hashes_path) + + bios_files = [f for f in result['files'] if f.get('category') == 'bios_zip'] + names = {f['name'] for f in bios_files} + self.assertIn('pgm.zip', names) + + pgm = next(f for f in bios_files if f['name'] == 'pgm.zip') + self.assertIsNone(pgm['system']) + self.assertTrue(pgm['required']) + self.assertEqual(pgm['category'], 'bios_zip') + + def test_merge_preserves_non_bios_files(self) -> None: + profile = _make_mame_profile() + profile['files'].append({'name': 'hiscore.dat', 'required': False}) + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', profile) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + result = merge_mame_profile(profile_path, hashes_path) + + non_bios = [f for f in result['files'] if f.get('category') != 'bios_zip'] + self.assertEqual(len(non_bios), 1) + self.assertEqual(non_bios[0]['name'], 'hiscore.dat') + + def test_merge_keeps_removed_bios_set(self) -> None: + hashes = _make_mame_hashes() + hashes['bios_sets'] = {} # neogeo removed upstream + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + result = merge_mame_profile(profile_path, hashes_path) + + bios_files = [f for f in result['files'] if f.get('category') == 'bios_zip'] + self.assertEqual(len(bios_files), 1) + self.assertTrue(bios_files[0].get('_upstream_removed')) + + def test_merge_updates_core_version(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + result = merge_mame_profile(profile_path, hashes_path) + + self.assertEqual(result['core_version'], '0.286') + + def test_merge_backup_created(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + merge_mame_profile(profile_path, hashes_path, write=True) + + backup = p / 'mame.old.yml' + self.assertTrue(backup.exists()) + + with open(backup, encoding='utf-8') as f: + old = yaml.safe_load(f) + self.assertEqual(old['core_version'], '0.285') + + def test_merge_updates_source_ref(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_mame_hashes()) + + result = merge_mame_profile(profile_path, hashes_path) + + entry = [f for f in result['files'] if f.get('category') == 'bios_zip'][0] + self.assertEqual(entry['source_ref'], 'src/mame/neogeo/neogeo.cpp:2432') + + +class TestFbneoMerge(unittest.TestCase): + """Tests for merge_fbneo_profile.""" + + def test_merge_updates_rom_entries(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_fbneo_hashes()) + + result = merge_fbneo_profile(profile_path, hashes_path) + + archive_files = [f for f in result['files'] if 'archive' in f] + self.assertEqual(len(archive_files), 1) + self.assertEqual(archive_files[0]['crc32'], '9036d879') + self.assertEqual(archive_files[0]['system'], 'snk-neogeo-mvs') + + def test_merge_adds_new_roms(self) -> None: + hashes = _make_fbneo_hashes() + hashes['bios_sets']['neogeo']['roms'].append({ + 'name': 'sp-s3.sp1', + 'size': 131072, + 'crc32': '91b64be3', + }) + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + result = merge_fbneo_profile(profile_path, hashes_path) + + archive_files = [f for f in result['files'] if 'archive' in f] + self.assertEqual(len(archive_files), 2) + new_rom = next(f for f in archive_files if f['name'] == 'sp-s3.sp1') + self.assertEqual(new_rom['archive'], 'neogeo.zip') + self.assertTrue(new_rom['required']) + + def test_merge_preserves_non_archive_files(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_fbneo_hashes()) + + result = merge_fbneo_profile(profile_path, hashes_path) + + non_archive = [f for f in result['files'] if 'archive' not in f] + self.assertEqual(len(non_archive), 1) + self.assertEqual(non_archive[0]['name'], 'hiscore.dat') + + def test_merge_marks_removed_roms(self) -> None: + hashes = _make_fbneo_hashes() + hashes['bios_sets'] = {} + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + result = merge_fbneo_profile(profile_path, hashes_path) + + archive_files = [f for f in result['files'] if 'archive' in f] + self.assertEqual(len(archive_files), 1) + self.assertTrue(archive_files[0].get('_upstream_removed')) + + def test_merge_updates_core_version(self) -> None: + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', _make_fbneo_hashes()) + + result = merge_fbneo_profile(profile_path, hashes_path) + + self.assertEqual(result['core_version'], 'v1.0.0.03') + + +class TestDiff(unittest.TestCase): + """Tests for compute_diff.""" + + def test_diff_mame_detects_changes(self) -> None: + hashes = _make_mame_hashes() + hashes['bios_sets']['pgm'] = { + 'source_file': 'src/mame/igs/pgm.cpp', + 'source_line': 5515, + 'roms': [ + {'name': 'pgm_t01s.rom', 'size': 2097152, 'crc32': '1a7123a0'}, + ], + } + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + diff = compute_diff(profile_path, hashes_path, mode='mame') + + self.assertIn('pgm', diff['added']) + self.assertIn('neogeo', diff['updated']) + self.assertEqual(len(diff['removed']), 0) + self.assertEqual(diff['unchanged'], 0) + + def test_diff_mame_detects_removed(self) -> None: + hashes = _make_mame_hashes() + hashes['bios_sets'] = {} + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'mame.yml', _make_mame_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + diff = compute_diff(profile_path, hashes_path, mode='mame') + + self.assertIn('neogeo', diff['removed']) + self.assertEqual(len(diff['added']), 0) + self.assertEqual(len(diff['updated']), 0) + + def test_diff_fbneo_detects_changes(self) -> None: + hashes = _make_fbneo_hashes() + hashes['bios_sets']['neogeo']['roms'].append({ + 'name': 'sp-s3.sp1', + 'size': 131072, + 'crc32': '91b64be3', + }) + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', _make_fbneo_profile()) + hashes_path = _write_json(p / 'hashes.json', hashes) + + diff = compute_diff(profile_path, hashes_path, mode='fbneo') + + self.assertIn('neogeo.zip:sp-s3.sp1', diff['added']) + self.assertIn('neogeo.zip:sp-s2.sp1', diff['updated']) + self.assertEqual(len(diff['removed']), 0) + + def test_diff_fbneo_unchanged(self) -> None: + profile = _make_fbneo_profile() + profile['files'][0]['crc32'] = '9036d879' + profile['files'][0]['size'] = 131072 + + hashes = _make_fbneo_hashes() + + with tempfile.TemporaryDirectory() as td: + p = Path(td) + profile_path = _write_yaml(p / 'fbneo.yml', profile) + hashes_path = _write_json(p / 'hashes.json', hashes) + + diff = compute_diff(profile_path, hashes_path, mode='fbneo') + + self.assertEqual(diff['unchanged'], 1) + self.assertEqual(len(diff['added']), 0) + self.assertEqual(len(diff['updated']), 0) + + +if __name__ == '__main__': + unittest.main()