Skip to content

_effective_contract_dict

Internal helper
This page documents an internal implementation helper, not a primary public API.
Source code in src/fabricops_kit/quality.py
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
def _effective_contract_dict(contract: dict | DataProductContract) -> dict:
    n = normalize_data_product_contract(contract)
    raw = dict(n.raw) if n.raw else {}
    effective = {
        "dataset": dict(n.dataset),
        "source": asdict(n.source),
        "target": asdict(n.target),
        "quality": asdict(n.quality),
        "drift": asdict(n.drift),
        "governance": asdict(n.governance),
        "metadata": asdict(n.metadata),
        "runtime": asdict(n.runtime),
        "refresh": dict(raw.get("refresh") or {}),
    }
    effective["environment"] = dict(raw.get("environment") or {"name": n.runtime.environment, "metadata_schema": n.metadata.schema})
    effective["keys"] = {
        "business_keys": list(n.source.business_keys),
        "watermark_column": n.source.watermark_column,
        "partition_column": n.source.partition_column,
    }
    effective["schema"] = {
        "required_source_columns": list(n.source.required_columns),
        "required_output_columns": list(n.target.required_columns),
    }
    return effective