feat: core profiles, data_dirs buildbot, cross_ref fix

profiles: amiberry (new), amiarcadia, atari800, azahar, b2,
bk, blastem, bluemsx, freeintv updated with source refs,
upstream field, mode field, data_directories.

_data_dirs.yml: buildbot source for retroarch platforms,
strip_components for nested ZIPs, freeintv-overlays fixed.

cross_reference.py: data_directories-aware gap analysis,
suppresses false gaps when emulator+platform share refs.

refresh_data_dirs.py: ZIP strip_components support,
for_platforms filter, ETag freshness for buildbot.

scraper: bluemsx single ref, freeintv overlays injection.
generate_pack.py: warning on missing data directory cache.
This commit is contained in:
Abdessamad Derraz
2026-03-18 21:20:02 +01:00
parent fbb2079f9b
commit 86dbdf28e5
16 changed files with 540 additions and 156 deletions

View File

@@ -21,6 +21,7 @@ import tarfile
import tempfile
import urllib.error
import urllib.request
import zipfile
from pathlib import Path
try:
@@ -203,6 +204,81 @@ def _download_and_extract(
return file_count
def _download_and_extract_zip(
source_url: str,
local_cache: str,
exclude: list[str] | None = None,
strip_components: int = 0,
) -> int:
"""Download ZIP, extract to local_cache. Returns file count.
strip_components removes N leading path components from each entry
(like tar --strip-components). Useful when a ZIP has a single root
directory that should be flattened.
"""
exclude = exclude or []
cache_dir = Path(local_cache)
with tempfile.TemporaryDirectory() as tmpdir:
zip_path = Path(tmpdir) / "archive.zip"
log.info("downloading %s", source_url)
req = urllib.request.Request(source_url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=DOWNLOAD_TIMEOUT) as resp:
with open(zip_path, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
extract_dir = Path(tmpdir) / "extract"
extract_dir.mkdir()
file_count = 0
with zipfile.ZipFile(zip_path) as zf:
for info in zf.infolist():
if info.is_dir():
continue
name = info.filename
if ".." in name or name.startswith("/"):
continue
# strip leading path components
parts = name.split("/")
if strip_components > 0:
if len(parts) <= strip_components:
continue
parts = parts[strip_components:]
name = "/".join(parts)
# skip excludes (check against stripped path)
top = parts[0] if parts else ""
if top in exclude:
continue
dest = extract_dir / name
dest.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info) as src, open(dest, "wb") as dst:
shutil.copyfileobj(src, dst)
file_count += 1
if cache_dir.exists():
shutil.rmtree(cache_dir)
cache_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(extract_dir), str(cache_dir))
return file_count
def _get_remote_etag(source_url: str) -> str | None:
"""HEAD request to get ETag or Last-Modified for freshness check."""
try:
req = urllib.request.Request(source_url, method="HEAD",
headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp:
return resp.headers.get("ETag") or resp.headers.get("Last-Modified") or ""
except (urllib.error.URLError, OSError):
return None
def refresh_entry(
key: str,
entry: dict,
@@ -215,46 +291,55 @@ def refresh_entry(
Returns True if the entry was refreshed (or would be in dry-run mode).
"""
source_type = entry.get("source_type", "tarball")
version = entry.get("version", "master")
source_url = entry["source_url"].format(version=version)
source_path = entry["source_path"].format(version=version)
local_cache = entry["local_cache"]
exclude = entry.get("exclude", [])
versions = _load_versions(versions_path)
cached = versions.get(key, {})
cached_sha = cached.get("sha")
cached_tag = cached.get("sha") or cached.get("etag")
needs_refresh = force or not Path(local_cache).exists()
remote_tag: str | None = None
if not needs_refresh:
remote_sha = get_remote_sha(entry["source_url"], version)
if remote_sha is None:
if source_type == "zip":
remote_tag = _get_remote_etag(source_url)
else:
remote_tag = get_remote_sha(entry["source_url"], version)
if remote_tag is None:
log.warning("[%s] could not check remote, skipping", key)
return False
needs_refresh = remote_sha != cached_sha
else:
remote_sha = get_remote_sha(entry["source_url"], version) if not force else None
needs_refresh = remote_tag != cached_tag
if not needs_refresh:
log.info("[%s] up to date (sha: %s)", key, cached_sha[:12] if cached_sha else "?")
log.info("[%s] up to date (tag: %s)", key, (cached_tag or "?")[:12])
return False
if dry_run:
log.info("[%s] would refresh (version: %s, cached sha: %s)", key, version, cached_sha or "none")
log.info("[%s] would refresh (type: %s, cached: %s)", key, source_type, cached_tag or "none")
return True
try:
file_count = _download_and_extract(source_url, source_path, local_cache, exclude)
except (urllib.error.URLError, OSError, tarfile.TarError) as exc:
if source_type == "zip":
strip = entry.get("strip_components", 0)
file_count = _download_and_extract_zip(source_url, local_cache, exclude, strip)
else:
source_path = entry["source_path"].format(version=version)
file_count = _download_and_extract(source_url, source_path, local_cache, exclude)
except (urllib.error.URLError, OSError, tarfile.TarError, zipfile.BadZipFile) as exc:
log.warning("[%s] download failed: %s", key, exc)
return False
# update version tracking
if remote_sha is None:
remote_sha = get_remote_sha(entry["source_url"], version)
if remote_tag is None:
if source_type == "zip":
remote_tag = _get_remote_etag(source_url)
else:
remote_tag = get_remote_sha(entry["source_url"], version)
versions = _load_versions(versions_path)
versions[key] = {"sha": remote_sha or "", "version": version}
versions[key] = {"sha": remote_tag or "", "version": version}
_save_versions(versions, versions_path)
log.info("[%s] refreshed: %d files extracted to %s", key, file_count, local_cache)
@@ -267,13 +352,19 @@ def refresh_all(
force: bool = False,
dry_run: bool = False,
versions_path: str = VERSIONS_FILE,
platform: str | None = None,
) -> dict[str, bool]:
"""Refresh all entries in the registry.
If platform is set, only refresh entries whose for_platforms
includes that platform (or entries with no for_platforms restriction).
Returns a dict mapping key -> whether it was refreshed.
"""
results = {}
for key, entry in registry.items():
allowed = entry.get("for_platforms")
if platform and allowed and platform not in allowed:
continue
results[key] = refresh_entry(
key, entry, force=force, dry_run=dry_run, versions_path=versions_path,
)
@@ -285,6 +376,7 @@ def main() -> None:
parser.add_argument("--key", help="Refresh only this entry")
parser.add_argument("--force", action="store_true", help="Re-download even if up to date")
parser.add_argument("--dry-run", action="store_true", help="Preview without downloading")
parser.add_argument("--platform", help="Only refresh entries for this platform")
parser.add_argument("--registry", default=DEFAULT_REGISTRY, help="Path to _data_dirs.yml")
args = parser.parse_args()
@@ -301,7 +393,7 @@ def main() -> None:
raise SystemExit(1)
refresh_entry(args.key, registry[args.key], force=args.force, dry_run=args.dry_run)
else:
refresh_all(registry, force=args.force, dry_run=args.dry_run)
refresh_all(registry, force=args.force, dry_run=args.dry_run, platform=args.platform)
if __name__ == "__main__":