feat: add diff_platform_truth function and tests

This commit is contained in:
Abdessamad Derraz
2026-03-29 13:09:08 +02:00
parent 97eb4835be
commit 2aab7420d7
2 changed files with 291 additions and 1 deletions

View File

@@ -1310,3 +1310,165 @@ def generate_platform_truth(
"cores_unprofiled": sorted(cores_unprofiled),
},
}
# -------------------------------------------------------------------
# Platform truth diffing
# -------------------------------------------------------------------
def _diff_system(truth_sys: dict, scraped_sys: dict) -> dict:
"""Compare files between truth and scraped for a single system."""
# Build truth index: name.lower() -> entry, alias.lower() -> entry
truth_index: dict[str, dict] = {}
for fe in truth_sys.get("files", []):
truth_index[fe["name"].lower()] = fe
for alias in fe.get("aliases", []):
truth_index[alias.lower()] = fe
# Build scraped index: name.lower() -> entry
scraped_index: dict[str, dict] = {}
for fe in scraped_sys.get("files", []):
scraped_index[fe["name"].lower()] = fe
missing: list[dict] = []
hash_mismatch: list[dict] = []
required_mismatch: list[dict] = []
extra_phantom: list[dict] = []
extra_unprofiled: list[dict] = []
matched_truth_names: set[str] = set()
# Compare scraped files against truth
for s_key, s_entry in scraped_index.items():
t_entry = truth_index.get(s_key)
if t_entry is None:
continue
matched_truth_names.add(t_entry["name"].lower())
# Hash comparison
for h in ("sha1", "md5", "crc32"):
t_hash = t_entry.get(h, "")
s_hash = s_entry.get(h, "")
if t_hash and s_hash and t_hash.lower() != s_hash.lower():
hash_mismatch.append({
"name": s_entry["name"],
"hash_type": h,
f"truth_{h}": t_hash,
f"scraped_{h}": s_hash,
"truth_cores": list(t_entry.get("_cores", [])),
})
break
# Required mismatch
t_req = t_entry.get("required")
s_req = s_entry.get("required")
if t_req is not None and s_req is not None and t_req != s_req:
required_mismatch.append({
"name": s_entry["name"],
"truth_required": t_req,
"scraped_required": s_req,
})
# Truth files not matched -> missing
for fe in truth_sys.get("files", []):
if fe["name"].lower() not in matched_truth_names:
missing.append({
"name": fe["name"],
"cores": list(fe.get("_cores", [])),
"source_refs": list(fe.get("_source_refs", [])),
})
# Scraped files not in truth -> extra
coverage = truth_sys.get("_coverage", {})
has_unprofiled = bool(coverage.get("cores_unprofiled"))
for s_key, s_entry in scraped_index.items():
if s_key not in truth_index:
entry = {"name": s_entry["name"]}
if has_unprofiled:
extra_unprofiled.append(entry)
else:
extra_phantom.append(entry)
result: dict = {}
if missing:
result["missing"] = missing
if hash_mismatch:
result["hash_mismatch"] = hash_mismatch
if required_mismatch:
result["required_mismatch"] = required_mismatch
if extra_phantom:
result["extra_phantom"] = extra_phantom
if extra_unprofiled:
result["extra_unprofiled"] = extra_unprofiled
return result
def _has_divergences(sys_div: dict) -> bool:
"""Check if a system divergence dict contains any actual divergences."""
return bool(sys_div)
def _update_summary(summary: dict, sys_div: dict) -> None:
"""Update summary counters from a system divergence dict."""
summary["total_missing"] += len(sys_div.get("missing", []))
summary["total_extra_phantom"] += len(sys_div.get("extra_phantom", []))
summary["total_extra_unprofiled"] += len(sys_div.get("extra_unprofiled", []))
summary["total_hash_mismatch"] += len(sys_div.get("hash_mismatch", []))
summary["total_required_mismatch"] += len(sys_div.get("required_mismatch", []))
def diff_platform_truth(truth: dict, scraped: dict) -> dict:
"""Compare truth YAML against scraped YAML, returning divergences."""
truth_systems = truth.get("systems", {})
scraped_systems = scraped.get("systems", {})
summary = {
"systems_compared": 0,
"systems_fully_covered": 0,
"systems_partially_covered": 0,
"systems_uncovered": 0,
"total_missing": 0,
"total_extra_phantom": 0,
"total_extra_unprofiled": 0,
"total_hash_mismatch": 0,
"total_required_mismatch": 0,
}
divergences: dict[str, dict] = {}
uncovered_systems: list[str] = []
all_sys_ids = sorted(set(truth_systems) | set(scraped_systems))
for sys_id in all_sys_ids:
in_truth = sys_id in truth_systems
in_scraped = sys_id in scraped_systems
if in_scraped and not in_truth:
uncovered_systems.append(sys_id)
summary["systems_uncovered"] += 1
continue
summary["systems_compared"] += 1
if in_truth and not in_scraped:
# All truth files are missing
truth_sys = truth_systems[sys_id]
sys_div = _diff_system(truth_sys, {"files": []})
else:
truth_sys = truth_systems[sys_id]
scraped_sys = scraped_systems[sys_id]
sys_div = _diff_system(truth_sys, scraped_sys)
if _has_divergences(sys_div):
divergences[sys_id] = sys_div
_update_summary(summary, sys_div)
summary["systems_partially_covered"] += 1
else:
summary["systems_fully_covered"] += 1
result: dict = {"summary": summary}
if divergences:
result["divergences"] = divergences
if uncovered_systems:
result["uncovered_systems"] = uncovered_systems
return result