def _build_pandas_partition_snapshot(df, *, dataset_name: str, table_name: str, partition_column: str, business_keys: list[str], watermark_column: str | None, run_id: str | None) -> list[dict]:
generated_at = datetime.now(timezone.utc).isoformat()
rows: list[dict] = []
grouped = df.groupby(partition_column, dropna=False)
for partition_value, group in grouped:
row_count = int(group.shape[0])
key_rows = group[business_keys].astype(str).drop_duplicates().apply(lambda r: "||".join(r.values.tolist()), axis=1)
sorted_key_rows = sorted(key_rows.tolist())
business_key_hash = _hash("##".join(sorted_key_rows))
business_key_count = int(len(sorted_key_rows))
max_watermark = min_watermark = None
if watermark_column:
max_watermark = to_jsonable(group[watermark_column].max())
min_watermark = to_jsonable(group[watermark_column].min())
partition_hash = _build_partition_hash(partition_value, row_count, business_key_count, max_watermark, min_watermark, business_key_hash)
rows.append(
{
"dataset_name": str(dataset_name),
"table_name": str(table_name),
"run_id": run_id,
"engine": "pandas",
"generated_at": generated_at,
"partition_column": str(partition_column),
"partition_value": to_jsonable(partition_value),
"row_count": row_count,
"business_key_count": business_key_count,
"max_watermark": max_watermark,
"min_watermark": min_watermark,
"partition_hash": partition_hash,
"business_key_hash": business_key_hash,
}
)
return sorted(rows, key=lambda r: str(r["partition_value"]))