mirror of
https://github.com/Abdess/retroarch_system.git
synced 2026-04-13 12:22:33 -05:00
fix: skip file/directory path conflicts in pack generation
This commit is contained in:
@@ -198,6 +198,36 @@ def _sanitize_path(raw: str) -> str:
|
||||
return "/".join(parts)
|
||||
|
||||
|
||||
def _path_parents(dest: str) -> list[str]:
|
||||
"""Return all parent directory segments of a path."""
|
||||
parts = dest.split("/")
|
||||
return ["/".join(parts[:i]) for i in range(1, len(parts))]
|
||||
|
||||
|
||||
def _has_path_conflict(dest: str, seen_files: set[str],
|
||||
seen_parents: set[str]) -> bool:
|
||||
"""Check if dest conflicts with existing paths (file vs directory).
|
||||
|
||||
Returns True if adding dest would create an impossible extraction:
|
||||
- A parent of dest is already a file (e.g., adding X/Y when X is a file)
|
||||
- dest itself is already used as a directory (e.g., adding X when X/Y exists)
|
||||
"""
|
||||
for parent in _path_parents(dest):
|
||||
if parent in seen_files:
|
||||
return True
|
||||
if dest in seen_parents:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _register_path(dest: str, seen_files: set[str],
|
||||
seen_parents: set[str]) -> None:
|
||||
"""Track a file path and its parent directories."""
|
||||
seen_files.add(dest)
|
||||
for parent in _path_parents(dest):
|
||||
seen_parents.add(parent)
|
||||
|
||||
|
||||
def resolve_file(file_entry: dict, db: dict, bios_dir: str,
|
||||
zip_contents: dict | None = None,
|
||||
dest_hint: str = "") -> tuple[str | None, str]:
|
||||
@@ -381,6 +411,7 @@ def generate_pack(
|
||||
user_provided = []
|
||||
seen_destinations: set[str] = set()
|
||||
seen_lower: set[str] = set() # only used when case_insensitive=True
|
||||
seen_parents: set[str] = set() # parent dirs of added files (path conflict detection)
|
||||
# Per-file status: worst status wins (missing > untested > ok)
|
||||
file_status: dict[str, str] = {}
|
||||
file_reasons: dict[str, str] = {}
|
||||
@@ -435,12 +466,16 @@ def generate_pack(
|
||||
dedup_key = full_dest
|
||||
already_packed = dedup_key in seen_destinations or (case_insensitive and dedup_key.lower() in seen_lower)
|
||||
|
||||
if _has_path_conflict(full_dest, seen_destinations, seen_parents):
|
||||
continue
|
||||
|
||||
storage = file_entry.get("storage", "embedded")
|
||||
|
||||
if storage == "user_provided":
|
||||
if already_packed:
|
||||
continue
|
||||
seen_destinations.add(dedup_key)
|
||||
_register_path(dedup_key, seen_destinations, seen_parents)
|
||||
if case_insensitive:
|
||||
seen_lower.add(dedup_key.lower())
|
||||
file_status.setdefault(dedup_key, "ok")
|
||||
@@ -467,6 +502,7 @@ def generate_pack(
|
||||
else:
|
||||
zf.write(tmp_path, full_dest)
|
||||
seen_destinations.add(dedup_key)
|
||||
_register_path(dedup_key, seen_destinations, seen_parents)
|
||||
if case_insensitive:
|
||||
seen_lower.add(dedup_key.lower())
|
||||
file_status.setdefault(dedup_key, "ok")
|
||||
@@ -543,6 +579,7 @@ def generate_pack(
|
||||
if already_packed:
|
||||
continue
|
||||
seen_destinations.add(dedup_key)
|
||||
_register_path(dedup_key, seen_destinations, seen_parents)
|
||||
if case_insensitive:
|
||||
seen_lower.add(dedup_key.lower())
|
||||
|
||||
@@ -589,6 +626,9 @@ def generate_pack(
|
||||
# Skip case-insensitive duplicates (Windows/macOS FS safety)
|
||||
if full_dest.lower() in seen_lower and case_insensitive:
|
||||
continue
|
||||
# Skip file/directory path conflicts (e.g., SGB1.sfc file vs SGB1.sfc/ dir)
|
||||
if _has_path_conflict(full_dest, seen_destinations, seen_parents):
|
||||
continue
|
||||
|
||||
local_path, status = resolve_file(fe, db, bios_dir, zip_contents)
|
||||
if status in ("not_found", "external", "user_provided"):
|
||||
@@ -599,6 +639,7 @@ def generate_pack(
|
||||
else:
|
||||
zf.write(local_path, full_dest)
|
||||
seen_destinations.add(full_dest)
|
||||
_register_path(full_dest, seen_destinations, seen_parents)
|
||||
if case_insensitive:
|
||||
seen_lower.add(full_dest.lower())
|
||||
core_count += 1
|
||||
@@ -632,7 +673,10 @@ def generate_pack(
|
||||
full = f"{dd_prefix}/{rel}"
|
||||
if full in seen_destinations or full.lower() in seen_lower and case_insensitive:
|
||||
continue
|
||||
if _has_path_conflict(full, seen_destinations, seen_parents):
|
||||
continue
|
||||
seen_destinations.add(full)
|
||||
_register_path(full, seen_destinations, seen_parents)
|
||||
if case_insensitive:
|
||||
seen_lower.add(full.lower())
|
||||
zf.write(src, full)
|
||||
@@ -776,6 +820,7 @@ def generate_emulator_pack(
|
||||
missing_files = []
|
||||
seen_destinations: set[str] = set()
|
||||
seen_lower: set[str] = set()
|
||||
seen_parents: set[str] = set() # parent dirs of added files (path conflict detection)
|
||||
seen_hashes: set[str] = set() # SHA1 dedup for same file, different path
|
||||
data_dir_notices: list[str] = []
|
||||
data_registry = load_data_dir_registry(
|
||||
@@ -810,7 +855,10 @@ def generate_emulator_pack(
|
||||
full = f"{dd_dest}/{rel}" if dd_dest else rel
|
||||
if full.lower() in seen_lower:
|
||||
continue
|
||||
if _has_path_conflict(full, seen_destinations, seen_parents):
|
||||
continue
|
||||
seen_destinations.add(full)
|
||||
_register_path(full, seen_destinations, seen_parents)
|
||||
seen_lower.add(full.lower())
|
||||
zf.write(src, full)
|
||||
total_files += 1
|
||||
@@ -837,6 +885,8 @@ def generate_emulator_pack(
|
||||
|
||||
if archive_dest.lower() in seen_lower:
|
||||
continue
|
||||
if _has_path_conflict(archive_dest, seen_destinations, seen_parents):
|
||||
continue
|
||||
|
||||
archive_entry = {"name": archive_name}
|
||||
local_path, status = resolve_file(archive_entry, db, bios_dir, zip_contents)
|
||||
@@ -846,6 +896,7 @@ def generate_emulator_pack(
|
||||
else:
|
||||
zf.write(local_path, archive_dest)
|
||||
seen_destinations.add(archive_dest)
|
||||
_register_path(archive_dest, seen_destinations, seen_parents)
|
||||
seen_lower.add(archive_dest.lower())
|
||||
total_files += 1
|
||||
else:
|
||||
@@ -864,10 +915,13 @@ def generate_emulator_pack(
|
||||
|
||||
if dest.lower() in seen_lower:
|
||||
continue
|
||||
if _has_path_conflict(dest, seen_destinations, seen_parents):
|
||||
continue
|
||||
|
||||
storage = fe.get("storage", "embedded")
|
||||
if storage == "user_provided":
|
||||
seen_destinations.add(dest)
|
||||
_register_path(dest, seen_destinations, seen_parents)
|
||||
seen_lower.add(dest.lower())
|
||||
instr = fe.get("instructions", "Please provide this file manually.")
|
||||
instr_name = f"INSTRUCTIONS_{fe['name']}.txt"
|
||||
@@ -887,6 +941,7 @@ def generate_emulator_pack(
|
||||
if download_external(fe, tmp_path):
|
||||
zf.write(tmp_path, dest)
|
||||
seen_destinations.add(dest)
|
||||
_register_path(dest, seen_destinations, seen_parents)
|
||||
seen_lower.add(dest.lower())
|
||||
total_files += 1
|
||||
else:
|
||||
@@ -915,6 +970,7 @@ def generate_emulator_pack(
|
||||
else:
|
||||
zf.write(local_path, dest)
|
||||
seen_destinations.add(dest)
|
||||
_register_path(dest, seen_destinations, seen_parents)
|
||||
seen_lower.add(dest.lower())
|
||||
total_files += 1
|
||||
|
||||
|
||||
@@ -2207,6 +2207,86 @@ class TestE2E(unittest.TestCase):
|
||||
records, _ = parse_firmware_database(fragment)
|
||||
self.assertEqual(records[0]["name"], "good.bin")
|
||||
|
||||
def test_157_path_conflict_helpers(self):
|
||||
"""_has_path_conflict detects file/directory naming collisions."""
|
||||
from generate_pack import _has_path_conflict, _register_path
|
||||
|
||||
seen_files: set[str] = set()
|
||||
seen_parents: set[str] = set()
|
||||
|
||||
# Register system/SGB1.sfc as a file
|
||||
_register_path("system/SGB1.sfc", seen_files, seen_parents)
|
||||
|
||||
# Adding system/SGB1.sfc/program.rom should conflict (parent is a file)
|
||||
self.assertTrue(_has_path_conflict("system/SGB1.sfc/program.rom",
|
||||
seen_files, seen_parents))
|
||||
|
||||
# Adding system/other.bin should not conflict
|
||||
self.assertFalse(_has_path_conflict("system/other.bin",
|
||||
seen_files, seen_parents))
|
||||
|
||||
# Reverse: register a nested path first, then check flat
|
||||
seen_files2: set[str] = set()
|
||||
seen_parents2: set[str] = set()
|
||||
_register_path("system/SGB2.sfc/program.rom", seen_files2, seen_parents2)
|
||||
|
||||
# Adding system/SGB2.sfc as a file should conflict (it's a directory)
|
||||
self.assertTrue(_has_path_conflict("system/SGB2.sfc",
|
||||
seen_files2, seen_parents2))
|
||||
|
||||
# Adding system/SGB2.sfc/boot.rom should not conflict (sibling in same dir)
|
||||
self.assertFalse(_has_path_conflict("system/SGB2.sfc/boot.rom",
|
||||
seen_files2, seen_parents2))
|
||||
|
||||
def test_158_pack_skips_file_directory_conflict(self):
|
||||
"""Pack generation skips entries that conflict with existing paths."""
|
||||
from generate_pack import generate_pack
|
||||
|
||||
output_dir = os.path.join(self.root, "pack_conflict")
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Platform declares SGB1.sfc as a flat file
|
||||
config = {
|
||||
"platform": "ConflictTest",
|
||||
"verification_mode": "existence",
|
||||
"base_destination": "system",
|
||||
"systems": {
|
||||
"test-sys": {
|
||||
"files": [
|
||||
{"name": "present_req.bin", "destination": "present_req.bin",
|
||||
"required": True},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
with open(os.path.join(self.platforms_dir, "test_conflict.yml"), "w") as fh:
|
||||
yaml.dump(config, fh)
|
||||
|
||||
# Create an emulator profile with a nested path that conflicts
|
||||
emu = {
|
||||
"emulator": "ConflictCore",
|
||||
"type": "libretro",
|
||||
"systems": ["test-sys"],
|
||||
"files": [
|
||||
{"name": "present_req.bin/nested.rom", "required": False},
|
||||
],
|
||||
}
|
||||
with open(os.path.join(self.emulators_dir, "conflict_core.yml"), "w") as fh:
|
||||
yaml.dump(emu, fh)
|
||||
|
||||
zip_path = generate_pack(
|
||||
"test_conflict", self.platforms_dir, self.db, self.bios_dir,
|
||||
output_dir, emulators_dir=self.emulators_dir,
|
||||
)
|
||||
self.assertIsNotNone(zip_path)
|
||||
with zipfile.ZipFile(zip_path) as zf:
|
||||
names = zf.namelist()
|
||||
# Flat file should be present
|
||||
self.assertTrue(any("present_req.bin" in n and "/" + "nested" not in n
|
||||
for n in names))
|
||||
# Nested conflict should NOT be present
|
||||
self.assertFalse(any("nested.rom" in n for n in names))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user