Skip to content

profile_dataframe

Public callable

Build a lightweight profile for pandas or Spark-like DataFrames.

Source code in src/fabricops_kit/profiling.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def profile_dataframe(df, dataset_name: str = "unknown", sample_size: int = 5, top_n: int = 5, engine: str = "auto") -> dict[str, Any]:
    """Build a lightweight profile for pandas or Spark-like DataFrames."""
    selected_engine = validate_engine(engine)
    resolved_engine = detect_dataframe_engine(df) if selected_engine == "auto" else selected_engine
    if resolved_engine == "pandas":
        pdf = df if isinstance(df, pd.DataFrame) else pd.DataFrame(df)
        row_count = int(len(pdf))
        columns = []
        for name in pdf.columns:
            series = pdf[name]
            non_null_count = int(series.notna().sum())
            null_count = int(series.isna().sum())
            null_pct = float((null_count / row_count) * 100) if row_count else 0.0
            distinct_count = int(series.nunique(dropna=True))
            distinct_pct = float((distinct_count / row_count) * 100) if row_count else 0.0
            non_null_values = series.dropna()
            min_value = to_jsonable(non_null_values.min()) if not non_null_values.empty else None
            max_value = to_jsonable(non_null_values.max()) if not non_null_values.empty else None
            sample_values = [to_jsonable(v) for v in non_null_values.head(sample_size).tolist()]
            top_series = series.value_counts(dropna=True).head(top_n)
            top_values = [{"value": to_jsonable(idx), "count": int(count)} for idx, count in top_series.items()]
            columns.append(
                asdict(
                    ColumnProfile(
                        column_name=name,
                        data_type=str(series.dtype),
                        non_null_count=non_null_count,
                        null_count=null_count,
                        null_pct=null_pct,
                        distinct_count=distinct_count,
                        distinct_pct=distinct_pct,
                        sample_values=sample_values,
                        min_value=min_value,
                        max_value=max_value,
                        mean_value=float(non_null_values.mean()) if pd.api.types.is_numeric_dtype(series.dtype) and not non_null_values.empty else None,
                        median_value=float(non_null_values.median()) if pd.api.types.is_numeric_dtype(series.dtype) and not non_null_values.empty else None,
                        std_value=float(non_null_values.std()) if pd.api.types.is_numeric_dtype(series.dtype) and len(non_null_values) > 1 else None,
                        top_values=top_values,
                        inferred_semantic_type="unknown",
                    )
                )
            )
        return asdict(
            DataFrameProfile(
                dataset_name=dataset_name,
                engine=resolved_engine,
                row_count=row_count,
                column_count=len(pdf.columns),
                duplicate_row_count=int(pdf.duplicated().sum()),
                duplicate_row_pct=float((pdf.duplicated().sum() / row_count) * 100) if row_count else 0.0,
                columns=columns,
                generated_at=datetime.utcnow().isoformat(),
            )
        )
    if detect_dataframe_engine(df) == "spark":
        metadata_df = profile_dataframe_to_metadata(df, table_name=dataset_name)
        records = profile_metadata_to_records(metadata_df)
        return {
            "dataset_name": dataset_name,
            "engine": "spark",
            "row_count": int(records[0].get("ROW_COUNT", 0)) if records else 0,
            "column_count": len(records),
            "columns": records,
            "generated_at": datetime.utcnow().isoformat(),
        }
    raise TypeError("Unsupported dataframe type for profile_dataframe.")