def profile_dataframe(df, dataset_name: str = "unknown", sample_size: int = 5, top_n: int = 5, engine: str = "auto") -> dict[str, Any]:
"""Build a lightweight profile for pandas or Spark-like DataFrames."""
selected_engine = validate_engine(engine)
resolved_engine = detect_dataframe_engine(df) if selected_engine == "auto" else selected_engine
if resolved_engine == "pandas":
pdf = df if isinstance(df, pd.DataFrame) else pd.DataFrame(df)
row_count = int(len(pdf))
columns = []
for name in pdf.columns:
series = pdf[name]
non_null_count = int(series.notna().sum())
null_count = int(series.isna().sum())
null_pct = float((null_count / row_count) * 100) if row_count else 0.0
distinct_count = int(series.nunique(dropna=True))
distinct_pct = float((distinct_count / row_count) * 100) if row_count else 0.0
non_null_values = series.dropna()
min_value = to_jsonable(non_null_values.min()) if not non_null_values.empty else None
max_value = to_jsonable(non_null_values.max()) if not non_null_values.empty else None
sample_values = [to_jsonable(v) for v in non_null_values.head(sample_size).tolist()]
top_series = series.value_counts(dropna=True).head(top_n)
top_values = [{"value": to_jsonable(idx), "count": int(count)} for idx, count in top_series.items()]
columns.append(
asdict(
ColumnProfile(
column_name=name,
data_type=str(series.dtype),
non_null_count=non_null_count,
null_count=null_count,
null_pct=null_pct,
distinct_count=distinct_count,
distinct_pct=distinct_pct,
sample_values=sample_values,
min_value=min_value,
max_value=max_value,
mean_value=float(non_null_values.mean()) if pd.api.types.is_numeric_dtype(series.dtype) and not non_null_values.empty else None,
median_value=float(non_null_values.median()) if pd.api.types.is_numeric_dtype(series.dtype) and not non_null_values.empty else None,
std_value=float(non_null_values.std()) if pd.api.types.is_numeric_dtype(series.dtype) and len(non_null_values) > 1 else None,
top_values=top_values,
inferred_semantic_type="unknown",
)
)
)
return asdict(
DataFrameProfile(
dataset_name=dataset_name,
engine=resolved_engine,
row_count=row_count,
column_count=len(pdf.columns),
duplicate_row_count=int(pdf.duplicated().sum()),
duplicate_row_pct=float((pdf.duplicated().sum() / row_count) * 100) if row_count else 0.0,
columns=columns,
generated_at=datetime.utcnow().isoformat(),
)
)
if detect_dataframe_engine(df) == "spark":
metadata_df = profile_dataframe_to_metadata(df, table_name=dataset_name)
records = profile_metadata_to_records(metadata_df)
return {
"dataset_name": dataset_name,
"engine": "spark",
"row_count": int(records[0].get("ROW_COUNT", 0)) if records else 0,
"column_count": len(records),
"columns": records,
"generated_at": datetime.utcnow().isoformat(),
}
raise TypeError("Unsupported dataframe type for profile_dataframe.")