def _add_spark(df, rules):
from pyspark.sql import functions as F
out = df.withColumn("dq_errors", F.array().cast("array<string>")).withColumn("dq_warnings", F.array().cast("array<string>"))
for i, rule in enumerate(rules):
rt = rule.get("rule_type")
if rt not in ROW_LEVEL_SUPPORTED:
continue
msg = f"{rule.get('rule_id', f'DQ{i + 1:03d}')}: {rule.get('reason') or rt}"
bucket = _severity_bucket(rule.get("severity", "critical"))
cond = None
if rt == "not_null":
cond = F.col(rule["column"]).isNull()
elif rt == "regex_check":
cond = F.col(rule["column"]).isNotNull() & ~F.col(rule["column"]).rlike(rule.get("pattern", ""))
elif rt == "accepted_values":
cond = F.col(rule["column"]).isNotNull() & ~F.col(rule["column"]).isin(rule.get("accepted_values", []))
elif rt == "range_check":
cond = F.lit(False)
if rule.get("min_value") is not None:
cond = cond | (F.col(rule["column"]) < F.lit(rule["min_value"]))
if rule.get("max_value") is not None:
cond = cond | (F.col(rule["column"]) > F.lit(rule["max_value"]))
cond = F.col(rule["column"]).isNotNull() & cond
elif rt == "unique":
c = rule["column"]
dup_keys = (
df.groupBy(c)
.count()
.filter(F.col(c).isNotNull() & (F.col("count") > 1))
.select(F.col(c).alias("__dup_key"))
.withColumn("__dup_marker", F.lit(True))
)
joined = out.join(dup_keys, out[c] == F.col("__dup_key"), "left")
out = joined.withColumn(bucket, F.when(F.col("__dup_marker") == True, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket))).drop("__dup_key", "__dup_marker")
continue
elif rt == "unique_combination":
cols = rule.get("columns", [])
if cols:
dup_combo = (
df.groupBy(*cols)
.count()
.filter(F.col("count") > 1)
.select(*[F.col(c).alias(f"__dup_{c}") for c in cols])
.withColumn("__dup_marker", F.lit(True))
)
join_cond = [out[c] == F.col(f"__dup_{c}") for c in cols]
joined = out.join(dup_combo, join_cond, "left")
drop_cols = [f"__dup_{c}" for c in cols] + ["__dup_marker"]
out = joined.withColumn(bucket, F.when(F.col("__dup_marker") == True, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket))).drop(*drop_cols)
continue
if cond is None:
continue
out = out.withColumn(bucket, F.when(cond, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket)))
return out