def _pandas_rule(df: pd.DataFrame, rule: dict[str, Any], row_count: int) -> tuple[int, int, Any, str]:
rtype = rule["rule_type"]
if rtype == "not_null":
c = rule["column"]
return int(df[c].isna().sum()), row_count, {"not_null": True}, f"Column '{c}' contains nulls"
if rtype == "unique":
c = rule["column"]
return int(df[c].duplicated(keep=False).sum()), row_count, {"unique": True}, f"Column '{c}' has duplicates"
if rtype == "unique_combination":
cols = rule["columns"]
return int(df.duplicated(subset=cols, keep=False).sum()), row_count, {"unique_combination": cols}, f"Combination {cols} has duplicates"
if rtype == "accepted_values":
c = rule["column"]
vals = rule.get("accepted_values", [])
mask = df[c].notna() & ~df[c].isin(vals)
return int(mask.sum()), row_count, {"accepted_values": vals}, f"Column '{c}' has unexpected values"
if rtype == "range_check":
c = rule["column"]
min_v, max_v = rule.get("min_value"), rule.get("max_value")
if min_v is None and max_v is None:
raise ValueError("range_check requires at least one of min_value or max_value")
s = df[c]
out_of_range = pd.Series(False, index=df.index)
if min_v is not None:
out_of_range |= s.notna() & (s < min_v)
if max_v is not None:
out_of_range |= s.notna() & (s > max_v)
return int(out_of_range.sum()), row_count, {"min_value": min_v, "max_value": max_v}, f"Column '{c}' is out of range"
if rtype == "regex_check":
c = rule["column"]
pattern = rule["pattern"]
non_null = df[c].dropna().astype(str)
return int((~non_null.str.match(pattern, na=False)).sum()), row_count, {"pattern": pattern}, f"Column '{c}' failed regex check"
if rtype == "row_count_min":
min_count = int(rule["min_count"])
return (1 if row_count < min_count else 0), row_count, {"min_count": min_count}, "Row count below minimum"
if rtype == "row_count_between":
min_count = int(rule["min_count"])
max_count = int(rule["max_count"])
fail = row_count < min_count or row_count > max_count
return (1 if fail else 0), row_count, {"min_count": min_count, "max_count": max_count}, "Row count outside expected range"
if rtype == "freshness_check":
c = rule["column"]
max_age = int(rule["max_age_days"])
s = pd.to_datetime(df[c], errors="coerce", utc=True).dropna()
if s.empty:
return 1, row_count, {"max_age_days": max_age}, "No valid timestamps found for freshness check"
max_ts = s.max()
age_days = (pd.Timestamp.now(tz="UTC") - max_ts).total_seconds() / 86400
return (1 if age_days > max_age else 0), row_count, {"max_age_days": max_age}, "Data is stale"
raise ValueError("Unsupported rule type")