Skip to content

build_lineage_from_notebook_code

Public callable

Scan, optionally enrich, and validate lineage from notebook source code.

Parameters:

Name Type Description Default
code str

Notebook source code to parse.

required
use_ai bool

Whether to invoke AI enrichment when a helper is provided.

True
ai_helper Any or None

Optional callable used to enrich deterministic lineage steps.

None

Returns:

Type Description
dict of str to Any

End-to-end lineage result with steps, validation, review status, and notes.

Source code in src/fabricops_kit/lineage.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def build_lineage_from_notebook_code(code: str, use_ai: bool = True, ai_helper: Any | None = None) -> dict[str, Any]:
    """Scan, optionally enrich, and validate lineage from notebook source code.

    Parameters
    ----------
    code : str
        Notebook source code to parse.
    use_ai : bool, default=True
        Whether to invoke AI enrichment when a helper is provided.
    ai_helper : Any or None, default=None
        Optional callable used to enrich deterministic lineage steps.

    Returns
    -------
    dict of str to Any
        End-to-end lineage result with steps, validation, review status, and notes.
    """
    steps = scan_notebook_lineage(code)
    enrichment = enrich_lineage_steps_with_ai(steps, ai_helper=ai_helper) if use_ai else {"steps": steps, "ai_used": False, "fallback_prompt": "", "notes": "AI disabled."}
    validation = validate_lineage_steps(enrichment["steps"]) if enrichment["steps"] else {"is_valid": False, "errors": ["No lineage detected."], "warnings": [], "review_required": True}
    return {"steps": enrichment["steps"], "validation": validation, "review_required": True if not validation["is_valid"] else (validation["review_required"] or not enrichment.get("ai_used", False)), "fallback_prompt": enrichment.get("fallback_prompt", ""), "ai_used": enrichment.get("ai_used", False), "notes": enrichment.get("notes", "")}