def profile_dataframe_to_metadata(
df,
table_name: str,
*,
exclude_columns: list[str] | set[str] | None = None,
run_timestamp_timezone: str = "Asia/Singapore",
):
"""Profile a Spark/Fabric DataFrame into metadata-compatible metadata rows.
Notes
-----
This function is Fabric-first and expects a Spark DataFrame runtime.
It produces standardized metadata facts for logging, governance review,
schema/drift comparison, and AI-assisted DQ hint generation.
Examples
--------
>>> profile_df = profile_dataframe_to_metadata(df, "orders_clean")
>>> # lakehouse_table_write(profile_df, lh_out, "METADATA_DEX_UNIFIED", mode="append")
"""
from pyspark.sql import functions as F
eligible_columns = get_profiled_columns(df, exclude_columns=exclude_columns)
if not eligible_columns:
raise ValueError("No eligible non-technical columns found for metadata profiling.")
dtype_map = dict(df.dtypes)
row_count = int(df.count())
agg_exprs = []
for column_name in eligible_columns:
agg_exprs.append(F.sum(F.col(column_name).isNull().cast("int")).alias(f"{column_name}_NULL_COUNT"))
agg_exprs.append(F.countDistinct(F.col(column_name)).alias(f"{column_name}_DISTINCT_COUNT"))
if is_min_max_supported_type(dtype_map[column_name]):
agg_exprs.append(F.min(F.col(column_name)).alias(f"{column_name}_MIN"))
agg_exprs.append(F.max(F.col(column_name)).alias(f"{column_name}_MAX"))
agg_df = df.agg(*agg_exprs)
denominator = F.lit(row_count if row_count > 0 else 1).cast("double")
rows = []
for column_name in eligible_columns:
rows.append(
agg_df.select(
F.lit(table_name).alias("TABLE_NAME"),
F.from_utc_timestamp(F.current_timestamp(), run_timestamp_timezone).alias("RUN_TIMESTAMP"),
F.lit(column_name).alias("COLUMN_NAME"),
F.lit(dtype_map[column_name]).alias("DATA_TYPE"),
F.lit(row_count).alias("ROW_COUNT"),
F.coalesce(F.col(f"{column_name}_NULL_COUNT"), F.lit(0)).cast("long").alias("NULL_COUNT"),
F.round((F.coalesce(F.col(f"{column_name}_NULL_COUNT"), F.lit(0)).cast("double") / denominator) * 100, 3).alias("NULL_PERCENT"),
F.coalesce(F.col(f"{column_name}_DISTINCT_COUNT"), F.lit(0)).cast("long").alias("DISTINCT_COUNT"),
F.round((F.coalesce(F.col(f"{column_name}_DISTINCT_COUNT"), F.lit(0)).cast("double") / denominator) * 100, 3).alias("DISTINCT_PERCENT"),
F.col(f"{column_name}_MIN").cast("string").alias("MIN_VALUE") if f"{column_name}_MIN" in agg_df.columns else F.lit(None).cast("string").alias("MIN_VALUE"),
F.col(f"{column_name}_MAX").cast("string").alias("MAX_VALUE") if f"{column_name}_MAX" in agg_df.columns else F.lit(None).cast("string").alias("MAX_VALUE"),
)
)
out = rows[0]
for next_row in rows[1:]:
out = out.unionByName(next_row)
return out