Skip to content

_build_pandas_partition_snapshot

Internal helper
This page documents an internal implementation helper, not a primary public API.
Source code in src/fabricops_kit/drift.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def _build_pandas_partition_snapshot(df, *, dataset_name: str, table_name: str, partition_column: str, business_keys: list[str], watermark_column: str | None, run_id: str | None) -> list[dict]:
    generated_at = datetime.now(timezone.utc).isoformat()
    rows: list[dict] = []
    grouped = df.groupby(partition_column, dropna=False)
    for partition_value, group in grouped:
        row_count = int(group.shape[0])
        key_rows = group[business_keys].astype(str).drop_duplicates().apply(lambda r: "||".join(r.values.tolist()), axis=1)
        sorted_key_rows = sorted(key_rows.tolist())
        business_key_hash = _hash("##".join(sorted_key_rows))
        business_key_count = int(len(sorted_key_rows))

        max_watermark = min_watermark = None
        if watermark_column:
            max_watermark = to_jsonable(group[watermark_column].max())
            min_watermark = to_jsonable(group[watermark_column].min())

        partition_hash = _build_partition_hash(partition_value, row_count, business_key_count, max_watermark, min_watermark, business_key_hash)
        rows.append(
            {
                "dataset_name": str(dataset_name),
                "table_name": str(table_name),
                "run_id": run_id,
                "engine": "pandas",
                "generated_at": generated_at,
                "partition_column": str(partition_column),
                "partition_value": to_jsonable(partition_value),
                "row_count": row_count,
                "business_key_count": business_key_count,
                "max_watermark": max_watermark,
                "min_watermark": min_watermark,
                "partition_hash": partition_hash,
                "business_key_hash": business_key_hash,
            }
        )

    return sorted(rows, key=lambda r: str(r["partition_value"]))