Skip to content

build_lineage_record_from_steps

Public callable

Create metadata-ready lineage records from validated lineage steps.

Parameters:

Name Type Description Default
dataset_name str

Dataset identifier associated with the lineage record set.

required
lineage_steps list of dict

Validated lineage step dictionaries.

required
run_id str or None

Optional run identifier for traceability.

None
notebook_name str or None

Optional Fabric notebook name.

None
workspace_name str or None

Optional workspace display name.

None
created_by str or None

Optional creator identity string.

None

Returns:

Type Description
list of dict

Lineage rows with step numbers and creation timestamp.

Raises:

Type Description
ValueError

If lineage steps fail schema validation.

Source code in src/fabricops_kit/lineage.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def build_lineage_record_from_steps(dataset_name: str, lineage_steps: list[dict], run_id: str | None = None, notebook_name: str | None = None, workspace_name: str | None = None, created_by: str | None = None) -> list[dict]:
    """Create metadata-ready lineage records from validated lineage steps.

    Parameters
    ----------
    dataset_name : str
        Dataset identifier associated with the lineage record set.
    lineage_steps : list of dict
        Validated lineage step dictionaries.
    run_id : str or None, default=None
        Optional run identifier for traceability.
    notebook_name : str or None, default=None
        Optional Fabric notebook name.
    workspace_name : str or None, default=None
        Optional workspace display name.
    created_by : str or None, default=None
        Optional creator identity string.

    Returns
    -------
    list of dict
        Lineage rows with step numbers and creation timestamp.

    Raises
    ------
    ValueError
        If lineage steps fail schema validation.
    """
    v = validate_lineage_steps(lineage_steps)
    if not v["is_valid"]:
        raise ValueError(f"Invalid lineage_steps: {v['errors']}")
    ts = datetime.now(timezone.utc).isoformat()
    return [{"dataset_name": dataset_name, "step_number": i, **s, "run_id": run_id, "notebook_name": notebook_name, "workspace_name": workspace_name, "created_by": created_by, "created_ts": ts} for i, s in enumerate(lineage_steps, 1)]