Skip to content

scan_notebook_lineage

Public callable

Extract deterministic lineage steps from notebook code using AST parsing.

Parameters:

Name Type Description Default
code str

Python notebook source code to analyze.

required

Returns:

Type Description
list of dict of str to Any

Ordered lineage step dictionaries inferred from read, transform, and write calls.

Source code in src/fabricops_kit/lineage.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def scan_notebook_lineage(code: str) -> list[dict[str, Any]]:
    """Extract deterministic lineage steps from notebook code using AST parsing.

    Parameters
    ----------
    code : str
        Python notebook source code to analyze.

    Returns
    -------
    list of dict of str to Any
        Ordered lineage step dictionaries inferred from read, transform, and write calls.
    """
    tree = ast.parse(code)
    steps: list[dict[str, Any]] = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Assign) and node.targets and isinstance(node.value, ast.Call):
            lhs = _name(node.targets[0])
            if not lhs:
                continue
            cname = _call_name(node.value)
            if cname in READ_HELPERS:
                steps.append(_step(cname, lhs, f"read via {cname}", READ_HELPERS[cname], "dataframe", "high", node.lineno, ["read"]))
                continue
            if cname in {"read_csv", "read_parquet", "read_excel"}:
                steps.append(_step(cname, lhs, f"read via pandas.{cname}", "file", "dataframe", "high", node.lineno, ["read"]))
                continue
            src, ops = _flatten_chain(node.value)
            if ops:
                steps.append(_step(src or "unknown", lhs, " -> ".join(ops), "dataframe" if src else "unknown", "dataframe", "high" if src else "medium", node.lineno, ops, "" if src else "base dataframe could not be inferred"))

        if isinstance(node, ast.Expr) and isinstance(node.value, ast.Call):
            call, cname = node.value, _call_name(node.value)
            if cname in WRITE_HELPERS and call.args:
                src = _name(call.args[0]) or "unknown"
                steps.append(_step(src, _resolve_write_target(cname, call), f"write via {cname}", "dataframe" if src != "unknown" else "unknown", WRITE_HELPERS[cname], "high" if src != "unknown" else "medium", node.lineno, ["write"]))
    return steps