chore: lint and format entire codebase

Run ruff check --fix: remove unused imports (F401), fix f-strings
without placeholders (F541), remove unused variables (F841), fix
duplicate dict key (F601).

Run isort --profile black: normalize import ordering across all files.

Run ruff format: apply consistent formatting (black-compatible) to
all 58 Python files.

3 intentional E402 remain (imports after require_yaml() must execute
after yaml is available).
This commit is contained in:
Abdessamad Derraz
2026-04-01 13:17:55 +02:00
parent a2d30557e4
commit 0a272dc4e9
56 changed files with 5115 additions and 2679 deletions

View File

@@ -57,7 +57,9 @@ def _load_versions(versions_path: str = VERSIONS_FILE) -> dict[str, dict]:
return json.load(f)
def _save_versions(versions: dict[str, dict], versions_path: str = VERSIONS_FILE) -> None:
def _save_versions(
versions: dict[str, dict], versions_path: str = VERSIONS_FILE
) -> None:
path = Path(versions_path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
@@ -66,10 +68,13 @@ def _save_versions(versions: dict[str, dict], versions_path: str = VERSIONS_FILE
def _api_request(url: str) -> dict:
req = urllib.request.Request(url, headers={
"User-Agent": USER_AGENT,
"Accept": "application/json",
})
req = urllib.request.Request(
url,
headers={
"User-Agent": USER_AGENT,
"Accept": "application/json",
},
)
token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN")
if token and "github" in url:
req.add_header("Authorization", f"token {token}")
@@ -111,7 +116,9 @@ def get_remote_sha(source_url: str, version: str) -> str | None:
data = _api_request(url)
return data["commit"]["id"]
except (urllib.error.URLError, KeyError, OSError) as exc:
log.warning("failed to fetch remote SHA for %s/%s@%s: %s", owner, repo, version, exc)
log.warning(
"failed to fetch remote SHA for %s/%s@%s: %s", owner, repo, version, exc
)
return None
@@ -167,7 +174,7 @@ def _download_and_extract(
if not member.name.startswith(prefix) and member.name != source_path:
continue
rel = member.name[len(prefix):]
rel = member.name[len(prefix) :]
if not rel:
continue
@@ -285,8 +292,9 @@ def _download_and_extract_zip(
def _get_remote_etag(source_url: str) -> str | None:
"""HEAD request to get ETag or Last-Modified for freshness check."""
try:
req = urllib.request.Request(source_url, method="HEAD",
headers={"User-Agent": USER_AGENT})
req = urllib.request.Request(
source_url, method="HEAD", headers={"User-Agent": USER_AGENT}
)
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp:
return resp.headers.get("ETag") or resp.headers.get("Last-Modified") or ""
except (urllib.error.URLError, OSError):
@@ -333,17 +341,31 @@ def refresh_entry(
return False
if dry_run:
log.info("[%s] would refresh (type: %s, cached: %s)", key, source_type, cached_tag or "none")
log.info(
"[%s] would refresh (type: %s, cached: %s)",
key,
source_type,
cached_tag or "none",
)
return True
try:
if source_type == "zip":
strip = entry.get("strip_components", 0)
file_count = _download_and_extract_zip(source_url, local_cache, exclude, strip)
file_count = _download_and_extract_zip(
source_url, local_cache, exclude, strip
)
else:
source_path = entry["source_path"].format(version=version)
file_count = _download_and_extract(source_url, source_path, local_cache, exclude)
except (urllib.error.URLError, OSError, tarfile.TarError, zipfile.BadZipFile) as exc:
file_count = _download_and_extract(
source_url, source_path, local_cache, exclude
)
except (
urllib.error.URLError,
OSError,
tarfile.TarError,
zipfile.BadZipFile,
) as exc:
log.warning("[%s] download failed: %s", key, exc)
return False
@@ -380,18 +402,30 @@ def refresh_all(
if platform and allowed and platform not in allowed:
continue
results[key] = refresh_entry(
key, entry, force=force, dry_run=dry_run, versions_path=versions_path,
key,
entry,
force=force,
dry_run=dry_run,
versions_path=versions_path,
)
return results
def main() -> None:
parser = argparse.ArgumentParser(description="Refresh cached data directories from upstream")
parser = argparse.ArgumentParser(
description="Refresh cached data directories from upstream"
)
parser.add_argument("--key", help="Refresh only this entry")
parser.add_argument("--force", action="store_true", help="Re-download even if up to date")
parser.add_argument("--dry-run", action="store_true", help="Preview without downloading")
parser.add_argument(
"--force", action="store_true", help="Re-download even if up to date"
)
parser.add_argument(
"--dry-run", action="store_true", help="Preview without downloading"
)
parser.add_argument("--platform", help="Only refresh entries for this platform")
parser.add_argument("--registry", default=DEFAULT_REGISTRY, help="Path to _data_dirs.yml")
parser.add_argument(
"--registry", default=DEFAULT_REGISTRY, help="Path to _data_dirs.yml"
)
args = parser.parse_args()
logging.basicConfig(
@@ -405,9 +439,13 @@ def main() -> None:
if args.key not in registry:
log.error("unknown key: %s (available: %s)", args.key, ", ".join(registry))
raise SystemExit(1)
refresh_entry(args.key, registry[args.key], force=args.force, dry_run=args.dry_run)
refresh_entry(
args.key, registry[args.key], force=args.force, dry_run=args.dry_run
)
else:
refresh_all(registry, force=args.force, dry_run=args.dry_run, platform=args.platform)
refresh_all(
registry, force=args.force, dry_run=args.dry_run, platform=args.platform
)
if __name__ == "__main__":