def check_profile_drift(current_profile: dict, baseline_profile: dict | None = None, policy: dict | None = None) -> dict:
"""Compare profile metrics against a baseline profile and drift thresholds.
Parameters
----------
current_profile : Any
Value used by this callable.
baseline_profile : Any
Value used by this callable.
policy : Any
Value used by this callable.
Returns
-------
dict
Structured output produced by this callable.
"""
active = {
"max_row_count_change_percent": 50,
"max_null_percent_change_points": 20,
"max_distinct_percent_change_points": 30,
"fail_on_missing_column": True,
**(policy or {}),
}
if baseline_profile is None:
return {"status": "no_baseline", "can_continue": True, "checks": [], "message": "No baseline profile provided."}
checks = []
blocking = False
b_row = float(baseline_profile.get("row_count") or 0)
c_row = float(current_profile.get("row_count") or 0)
row_delta_pct = 0.0 if b_row == 0 else abs(c_row - b_row) / b_row * 100.0
row_ok = row_delta_pct <= float(active["max_row_count_change_percent"])
checks.append({"check": "row_count_change_percent", "passed": row_ok, "value": row_delta_pct, "threshold": active["max_row_count_change_percent"]})
blocking = blocking or (not row_ok)
b_cols = {c.get("column_name"): c for c in baseline_profile.get("columns", [])}
c_cols = {c.get("column_name"): c for c in current_profile.get("columns", [])}
for col in sorted(set(b_cols) - set(c_cols)):
passed = not bool(active["fail_on_missing_column"])
checks.append({"check": "missing_column", "column": col, "passed": passed})
blocking = blocking or (not passed)
for col in sorted(set(b_cols).intersection(c_cols)):
b = b_cols[col]
c = c_cols[col]
if "null_pct" in b and "null_pct" in c:
delta = abs(float(c.get("null_pct") or 0) - float(b.get("null_pct") or 0))
passed = delta <= float(active["max_null_percent_change_points"])
checks.append({"check": "null_percent_change_points", "column": col, "passed": passed, "value": delta, "threshold": active["max_null_percent_change_points"]})
blocking = blocking or (not passed)
if "distinct_pct" in b and "distinct_pct" in c:
delta = abs(float(c.get("distinct_pct") or 0) - float(b.get("distinct_pct") or 0))
passed = delta <= float(active["max_distinct_percent_change_points"])
checks.append({"check": "distinct_percent_change_points", "column": col, "passed": passed, "value": delta, "threshold": active["max_distinct_percent_change_points"]})
blocking = blocking or (not passed)
if b.get("min_value") != c.get("min_value"):
checks.append({"check": "min_changed", "column": col, "passed": True, "baseline": b.get("min_value"), "current": c.get("min_value")})
if b.get("max_value") != c.get("max_value"):
checks.append({"check": "max_changed", "column": col, "passed": True, "baseline": b.get("max_value"), "current": c.get("max_value")})
return {"status": "failed" if blocking else "passed", "can_continue": not blocking, "checks": checks, "message": "Profile drift check completed."}