Skip to content

_add_spark

Internal helper
This page documents an internal implementation helper, not a primary public API.
Source code in src/fabricops_kit/quality.py
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
def _add_spark(df, rules):
    from pyspark.sql import functions as F

    out = df.withColumn("dq_errors", F.array().cast("array<string>")).withColumn("dq_warnings", F.array().cast("array<string>"))
    for i, rule in enumerate(rules):
        rt = rule.get("rule_type")
        if rt not in ROW_LEVEL_SUPPORTED:
            continue
        msg = f"{rule.get('rule_id', f'DQ{i + 1:03d}')}: {rule.get('reason') or rt}"
        bucket = _severity_bucket(rule.get("severity", "critical"))
        cond = None
        if rt == "not_null":
            cond = F.col(rule["column"]).isNull()
        elif rt == "regex_check":
            cond = F.col(rule["column"]).isNotNull() & ~F.col(rule["column"]).rlike(rule.get("pattern", ""))
        elif rt == "accepted_values":
            cond = F.col(rule["column"]).isNotNull() & ~F.col(rule["column"]).isin(rule.get("accepted_values", []))
        elif rt == "range_check":
            cond = F.lit(False)
            if rule.get("min_value") is not None:
                cond = cond | (F.col(rule["column"]) < F.lit(rule["min_value"]))
            if rule.get("max_value") is not None:
                cond = cond | (F.col(rule["column"]) > F.lit(rule["max_value"]))
            cond = F.col(rule["column"]).isNotNull() & cond
        elif rt == "unique":
            c = rule["column"]
            dup_keys = (
                df.groupBy(c)
                .count()
                .filter(F.col(c).isNotNull() & (F.col("count") > 1))
                .select(F.col(c).alias("__dup_key"))
                .withColumn("__dup_marker", F.lit(True))
            )
            joined = out.join(dup_keys, out[c] == F.col("__dup_key"), "left")
            out = joined.withColumn(bucket, F.when(F.col("__dup_marker") == True, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket))).drop("__dup_key", "__dup_marker")
            continue
        elif rt == "unique_combination":
            cols = rule.get("columns", [])
            if cols:
                dup_combo = (
                    df.groupBy(*cols)
                    .count()
                    .filter(F.col("count") > 1)
                    .select(*[F.col(c).alias(f"__dup_{c}") for c in cols])
                    .withColumn("__dup_marker", F.lit(True))
                )
                join_cond = [out[c] == F.col(f"__dup_{c}") for c in cols]
                joined = out.join(dup_combo, join_cond, "left")
                drop_cols = [f"__dup_{c}" for c in cols] + ["__dup_marker"]
                out = joined.withColumn(bucket, F.when(F.col("__dup_marker") == True, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket))).drop(*drop_cols)
                continue
        if cond is None:
            continue
        out = out.withColumn(bucket, F.when(cond, F.array_union(F.col(bucket), F.array(F.lit(msg)))).otherwise(F.col(bucket)))
    return out