fix: audit fixes across verify, pack, security, and performance

- fix KeyError in compute_coverage (generate_readme, generate_site)
- fix comma-separated MD5 handling in generate_pack check_inside_zip
- fix _verify_file_hash to handle multi-MD5 for large files
- fix external downloads not tracked in seen_destinations/file_status
- fix tar path traversal in _is_safe_tar_member (refresh_data_dirs)
- fix predictable tmp path in download.py
- fix _sanitize_path to filter "." components
- remove blanket data_dir suppression in find_undeclared_files
- remove blanket data_dir suppression in cross_reference
- add status_counts to verify_platform return value
- add md5_composite cache for repeated ZIP hashing
This commit is contained in:
Abdessamad Derraz
2026-03-19 14:04:34 +01:00
parent e1410ef4a6
commit 38d605c7d5
9 changed files with 68 additions and 45 deletions

View File

@@ -58,19 +58,28 @@ def md5sum(source: str | Path | object) -> str:
return h.hexdigest() return h.hexdigest()
_md5_composite_cache: dict[str, str] = {}
def md5_composite(filepath: str | Path) -> str: def md5_composite(filepath: str | Path) -> str:
"""Compute composite MD5 of a ZIP - matches Recalbox's Zip::Md5Composite(). """Compute composite MD5 of a ZIP - matches Recalbox's Zip::Md5Composite().
Sorts filenames alphabetically, reads each file's contents in order, Sorts filenames alphabetically, reads each file's contents in order,
feeds everything into a single MD5 hasher. The result is independent feeds everything into a single MD5 hasher. The result is independent
of ZIP compression level or metadata. of ZIP compression level or metadata. Results are cached per path.
""" """
key = str(filepath)
cached = _md5_composite_cache.get(key)
if cached is not None:
return cached
with zipfile.ZipFile(filepath) as zf: with zipfile.ZipFile(filepath) as zf:
names = sorted(n for n in zf.namelist() if not n.endswith("/")) names = sorted(n for n in zf.namelist() if not n.endswith("/"))
h = hashlib.md5() h = hashlib.md5()
for name in names: for name in names:
h.update(zf.read(name)) h.update(zf.read(name))
return h.hexdigest() result = h.hexdigest()
_md5_composite_cache[key] = result
return result
def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict: def load_platform_config(platform_name: str, platforms_dir: str = "platforms") -> dict:

View File

@@ -93,15 +93,6 @@ def cross_reference(
for sys_id in systems: for sys_id in systems:
platform_names.update(declared.get(sys_id, set())) platform_names.update(declared.get(sys_id, set()))
# data_directories: check if the emulator's data_dir refs are provided
# by ANY platform for ANY system (not limited to matching system IDs,
# since emulator profiles and platforms use different ID conventions)
all_plat_dd_refs = set()
for dd_set in platform_data_dirs.values():
all_plat_dd_refs.update(dd_set)
emu_dd_refs = {dd.get("ref", "") for dd in profile.get("data_directories", [])}
covered_dd = emu_dd_refs & all_plat_dd_refs
gaps = [] gaps = []
covered = [] covered = []
for f in emu_files: for f in emu_files:
@@ -117,9 +108,6 @@ def cross_reference(
continue continue
in_platform = fname in platform_names in_platform = fname in platform_names
# files covered by shared data_directories are effectively in the platform pack
if not in_platform and covered_dd:
in_platform = True
in_repo = _find_in_repo(fname, by_name, by_name_lower) in_repo = _find_in_repo(fname, by_name, by_name_lower)
entry = { entry = {

View File

@@ -233,7 +233,8 @@ Examples:
sys.exit(1) sys.exit(1)
import tempfile import tempfile
zip_path = os.path.join(tempfile.gettempdir(), asset["name"]) fd, zip_path = tempfile.mkstemp(suffix=".zip")
os.close(fd)
print(f"Downloading {asset['name']} ({asset['size']:,} bytes)...") print(f"Downloading {asset['name']} ({asset['size']:,} bytes)...")
download_file(asset["browser_download_url"], zip_path, asset["size"]) download_file(asset["browser_download_url"], zip_path, asset["size"])

View File

@@ -54,7 +54,8 @@ def _verify_file_hash(path: str, expected_sha1: str = "",
hashes = compute_hashes(path) hashes = compute_hashes(path)
if expected_sha1: if expected_sha1:
return hashes["sha1"] == expected_sha1 return hashes["sha1"] == expected_sha1
return hashes["md5"] == expected_md5 md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()]
return hashes["md5"].lower() in md5_list
def fetch_large_file(name: str, dest_dir: str = ".cache/large", def fetch_large_file(name: str, dest_dir: str = ".cache/large",
@@ -94,7 +95,7 @@ def fetch_large_file(name: str, dest_dir: str = ".cache/large",
def _sanitize_path(raw: str) -> str: def _sanitize_path(raw: str) -> str:
"""Strip path traversal components from a relative path.""" """Strip path traversal components from a relative path."""
raw = raw.replace("\\", "/") raw = raw.replace("\\", "/")
parts = [p for p in raw.split("/") if p and p != ".."] parts = [p for p in raw.split("/") if p and p not in ("..", ".")]
return "/".join(parts) return "/".join(parts)
@@ -299,9 +300,12 @@ def generate_pack(
_extract_zip_to_archive(tmp_path, full_dest, zf) _extract_zip_to_archive(tmp_path, full_dest, zf)
else: else:
zf.write(tmp_path, full_dest) zf.write(tmp_path, full_dest)
seen_destinations.add(dedup_key)
file_status.setdefault(dedup_key, "ok")
total_files += 1 total_files += 1
else: else:
missing_files.append(file_entry["name"]) missing_files.append(file_entry["name"])
file_status[dedup_key] = "missing"
finally: finally:
if os.path.exists(tmp_path): if os.path.exists(tmp_path):
os.unlink(tmp_path) os.unlink(tmp_path)
@@ -316,16 +320,26 @@ def generate_pack(
if status == "hash_mismatch" and verification_mode != "existence": if status == "hash_mismatch" and verification_mode != "existence":
zf_name = file_entry.get("zipped_file") zf_name = file_entry.get("zipped_file")
if zf_name and local_path: if zf_name and local_path:
inner_md5 = file_entry.get("md5", "") inner_md5_raw = file_entry.get("md5", "")
inner_result = check_inside_zip(local_path, zf_name, inner_md5) inner_md5_list = (
if inner_result == "ok": [m.strip() for m in inner_md5_raw.split(",") if m.strip()]
if inner_md5_raw else [""]
)
zip_ok = False
last_result = "not_in_zip"
for md5_candidate in inner_md5_list:
last_result = check_inside_zip(local_path, zf_name, md5_candidate)
if last_result == "ok":
zip_ok = True
break
if zip_ok:
file_status.setdefault(dedup_key, "ok") file_status.setdefault(dedup_key, "ok")
elif inner_result == "not_in_zip": elif last_result == "not_in_zip":
file_status[dedup_key] = "untested" file_status[dedup_key] = "untested"
file_reasons[dedup_key] = f"{zf_name} not found inside ZIP" file_reasons[dedup_key] = f"{zf_name} not found inside ZIP"
elif inner_result == "error": elif last_result == "error":
file_status[dedup_key] = "untested" file_status[dedup_key] = "untested"
file_reasons[dedup_key] = f"cannot read ZIP" file_reasons[dedup_key] = "cannot read ZIP"
else: else:
file_status[dedup_key] = "untested" file_status[dedup_key] = "untested"
file_reasons[dedup_key] = f"{zf_name} MD5 mismatch inside ZIP" file_reasons[dedup_key] = f"{zf_name} MD5 mismatch inside ZIP"

View File

@@ -24,14 +24,19 @@ from verify import verify_platform
def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict: def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict:
config = load_platform_config(platform_name, platforms_dir) config = load_platform_config(platform_name, platforms_dir)
result = verify_platform(config, db) result = verify_platform(config, db)
present = result["ok"] + result["untested"] sc = result.get("status_counts", {})
pct = (present / result["total"] * 100) if result["total"] > 0 else 0 ok = sc.get("ok", 0)
untested = sc.get("untested", 0)
missing = sc.get("missing", 0)
total = result["total_files"]
present = ok + untested
pct = (present / total * 100) if total > 0 else 0
return { return {
"platform": config.get("platform", platform_name), "platform": config.get("platform", platform_name),
"total": result["total"], "total": total,
"verified": result["ok"], "verified": ok,
"untested": result["untested"], "untested": untested,
"missing": result["missing"], "missing": missing,
"present": present, "present": present,
"percentage": pct, "percentage": pct,
"mode": config.get("verification_mode", "existence"), "mode": config.get("verification_mode", "existence"),

View File

@@ -68,14 +68,19 @@ def _status_icon(pct: float) -> str:
def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict: def compute_coverage(platform_name: str, platforms_dir: str, db: dict) -> dict:
config = load_platform_config(platform_name, platforms_dir) config = load_platform_config(platform_name, platforms_dir)
result = verify_platform(config, db) result = verify_platform(config, db)
present = result["ok"] + result["untested"] sc = result.get("status_counts", {})
pct = (present / result["total"] * 100) if result["total"] > 0 else 0 ok = sc.get("ok", 0)
untested = sc.get("untested", 0)
missing = sc.get("missing", 0)
total = result["total_files"]
present = ok + untested
pct = (present / total * 100) if total > 0 else 0
return { return {
"platform": config.get("platform", platform_name), "platform": config.get("platform", platform_name),
"total": result["total"], "total": total,
"verified": result["ok"], "verified": ok,
"untested": result["untested"], "untested": untested,
"missing": result["missing"], "missing": missing,
"present": present, "present": present,
"percentage": pct, "percentage": pct,
"mode": config.get("verification_mode", "existence"), "mode": config.get("verification_mode", "existence"),

View File

@@ -120,7 +120,8 @@ def _is_safe_tar_member(member: tarfile.TarInfo, dest: Path) -> bool:
if member.name.startswith("/") or ".." in member.name.split("/"): if member.name.startswith("/") or ".." in member.name.split("/"):
return False return False
resolved = (dest / member.name).resolve() resolved = (dest / member.name).resolve()
if not str(resolved).startswith(str(dest.resolve())): dest_str = str(dest.resolve()) + os.sep
if not str(resolved).startswith(dest_str) and str(resolved) != str(dest.resolve()):
return False return False
return True return True

View File

@@ -228,10 +228,6 @@ def find_undeclared_files(
if not emu_systems & platform_systems: if not emu_systems & platform_systems:
continue continue
# Skip if emulator's data_directories cover the files
emu_dd = {dd.get("ref", "") for dd in profile.get("data_directories", [])}
covered_by_dd = bool(emu_dd & declared_dd)
for f in profile.get("files", []): for f in profile.get("files", []):
fname = f.get("name", "") fname = f.get("name", "")
if not fname or fname in seen: if not fname or fname in seen:
@@ -241,8 +237,6 @@ def find_undeclared_files(
continue continue
if fname in declared_names: if fname in declared_names:
continue continue
if covered_by_dd:
continue
in_repo = fname in by_name or fname.rsplit("/", 1)[-1] in by_name in_repo = fname in by_name or fname.rsplit("/", 1)[-1] in by_name
seen.add(fname) seen.add(fname)
@@ -381,6 +375,11 @@ def verify_platform(
for s in file_severity.values(): for s in file_severity.values():
counts[s] = counts.get(s, 0) + 1 counts[s] = counts.get(s, 0) + 1
# Count by file status (ok/untested/missing)
status_counts: dict[str, int] = {}
for s in file_status.values():
status_counts[s] = status_counts.get(s, 0) + 1
# Cross-reference undeclared files # Cross-reference undeclared files
undeclared = find_undeclared_files(config, emulators_dir, db, emu_profiles) undeclared = find_undeclared_files(config, emulators_dir, db, emu_profiles)
exclusions = find_exclusion_notes(config, emulators_dir, emu_profiles) exclusions = find_exclusion_notes(config, emulators_dir, emu_profiles)
@@ -390,6 +389,7 @@ def verify_platform(
"verification_mode": mode, "verification_mode": mode,
"total_files": len(file_status), "total_files": len(file_status),
"severity_counts": counts, "severity_counts": counts,
"status_counts": status_counts,
"undeclared_files": undeclared, "undeclared_files": undeclared,
"exclusion_notes": exclusions, "exclusion_notes": exclusions,
"details": details, "details": details,

View File

@@ -471,13 +471,13 @@ class TestE2E(unittest.TestCase):
profiles = load_emulator_profiles(self.emulators_dir) profiles = load_emulator_profiles(self.emulators_dir)
self.assertNotIn("test_alias", profiles) self.assertNotIn("test_alias", profiles)
def test_43_cross_ref_data_dir_suppresses_gaps(self): def test_43_cross_ref_data_dir_does_not_suppress_files(self):
config = load_platform_config("test_md5", self.platforms_dir) config = load_platform_config("test_md5", self.platforms_dir)
profiles = load_emulator_profiles(self.emulators_dir) profiles = load_emulator_profiles(self.emulators_dir)
undeclared = find_undeclared_files(config, self.emulators_dir, self.db, profiles) undeclared = find_undeclared_files(config, self.emulators_dir, self.db, profiles)
names = {u["name"] for u in undeclared} names = {u["name"] for u in undeclared}
# dd_covered.bin from TestEmuDD should NOT appear (data_dir match) # dd_covered.bin is a file entry, not data_dir content — still undeclared
self.assertNotIn("dd_covered.bin", names) self.assertIn("dd_covered.bin", names)
def test_44_cross_ref_skips_launchers(self): def test_44_cross_ref_skips_launchers(self):
config = load_platform_config("test_existence", self.platforms_dir) config = load_platform_config("test_existence", self.platforms_dir)