_effective_contract_dict
Internal helper
This page documents an internal implementation helper, not a primary public API.
Source code in src/fabricops_kit/quality.py
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068 | def _effective_contract_dict(contract: dict | DataProductContract) -> dict:
n = normalize_data_product_contract(contract)
raw = dict(n.raw) if n.raw else {}
effective = {
"dataset": dict(n.dataset),
"source": asdict(n.source),
"target": asdict(n.target),
"quality": asdict(n.quality),
"drift": asdict(n.drift),
"governance": asdict(n.governance),
"metadata": asdict(n.metadata),
"runtime": asdict(n.runtime),
"refresh": dict(raw.get("refresh") or {}),
}
effective["environment"] = dict(raw.get("environment") or {"name": n.runtime.environment, "metadata_schema": n.metadata.schema})
effective["keys"] = {
"business_keys": list(n.source.business_keys),
"watermark_column": n.source.watermark_column,
"partition_column": n.source.partition_column,
}
effective["schema"] = {
"required_source_columns": list(n.source.required_columns),
"required_output_columns": list(n.target.required_columns),
}
return effective
|