def _spark_rule(df: Any, rule: dict[str, Any], row_count: int) -> tuple[int, int, Any, str]:
from pyspark.sql import functions as F
rtype = rule["rule_type"]
if rtype == "not_null":
c = rule["column"]
return df.filter(F.col(c).isNull()).count(), row_count, {"not_null": True}, f"Column '{c}' contains nulls"
if rtype == "unique":
c = rule["column"]
failed = df.groupBy(c).count().filter(F.col("count") > 1).agg(F.sum("count").alias("failed")).collect()[0]["failed"] or 0
return int(failed), row_count, {"unique": True}, f"Column '{c}' has duplicates"
if rtype == "unique_combination":
cols = rule["columns"]
failed = df.groupBy(*cols).count().filter(F.col("count") > 1).agg(F.sum("count").alias("failed")).collect()[0]["failed"] or 0
return int(failed), row_count, {"unique_combination": cols}, f"Combination {cols} has duplicates"
if rtype == "accepted_values":
c = rule["column"]
vals = rule.get("accepted_values", [])
failed = df.filter(F.col(c).isNotNull() & ~F.col(c).isin(vals)).count()
return failed, row_count, {"accepted_values": vals}, f"Column '{c}' has unexpected values"
if rtype == "range_check":
c = rule["column"]
min_v, max_v = rule.get("min_value"), rule.get("max_value")
cond = F.lit(False)
if min_v is not None:
cond = cond | (F.col(c) < F.lit(min_v))
if max_v is not None:
cond = cond | (F.col(c) > F.lit(max_v))
failed = df.filter(F.col(c).isNotNull() & cond).count()
return failed, row_count, {"min_value": min_v, "max_value": max_v}, f"Column '{c}' is out of range"
if rtype == "regex_check":
c = rule["column"]
pattern = rule["pattern"]
failed = df.filter(F.col(c).isNotNull() & ~F.col(c).rlike(pattern)).count()
return failed, row_count, {"pattern": pattern}, f"Column '{c}' failed regex check"
if rtype == "row_count_min":
min_count = int(rule["min_count"])
return (1 if row_count < min_count else 0), row_count, {"min_count": min_count}, "Row count below minimum"
if rtype == "row_count_between":
min_count = int(rule["min_count"])
max_count = int(rule["max_count"])
return (1 if row_count < min_count or row_count > max_count else 0), row_count, {"min_count": min_count, "max_count": max_count}, "Row count outside expected range"
if rtype == "freshness_check":
c = rule["column"]
max_age = int(rule["max_age_days"])
max_ts = df.select(F.max(F.col(c)).alias("max_ts")).collect()[0]["max_ts"]
if max_ts is None:
return 1, row_count, {"max_age_days": max_age}, "No valid timestamps found for freshness check"
now_utc = datetime.now(timezone.utc)
if max_ts.tzinfo is None:
max_ts = max_ts.replace(tzinfo=timezone.utc)
age_days = (now_utc - max_ts).total_seconds() / 86400
return (1 if age_days > max_age else 0), row_count, {"max_age_days": max_age}, "Data is stale"
raise ValueError("Unsupported rule type")