Skip to content

add_audit_columns

Public callable

Add run tracking and audit columns for ingestion workflows.

Parameters:

Name Type Description Default
df Any

Input pandas or Spark DataFrame.

required
run_id str | None

Pipeline run identifier. If omitted, a UUID is generated.

None
pipeline_name str | None

Pipeline or notebook workflow name.

None
environment str | None

Environment label such as Sandbox or Production.

None
source_system str | None

Upstream source system name.

None
source_table str | None

Upstream table or dataset name.

None
source_extract_timestamp str | None

Timestamp string representing source extract time.

None
notebook_name str | None

Notebook name override. Uses Fabric runtime context when available.

None
loaded_by str | None

User override. Uses Fabric runtime context when available.

None
watermark_column str | None

Source column copied into _watermark_value.

None
bucket_column str | None

Source column used to derive _partition_bucket and _sample_bucket.

None
bucket_size int

Partition bucket modulus. Must be one of 128, 256, 512, 1024.

512
include_row_ingest_id bool

Whether to add _row_ingest_id.

True
engine str

Execution engine (auto, pandas, or spark).

"auto"

Returns:

Type Description
Any

DataFrame with requested audit columns.

Raises:

Type Description
ValueError

If watermark_column or bucket_column does not exist, or bucket_size is invalid.

Notes

Fabric notebook runtime is optional. This function imports locally without Fabric. In Fabric notebooks it reads notebookutils.runtime.context when available; otherwise it falls back to local_notebook and local_user.

Examples:

>>> import pandas as pd
>>> df = pd.DataFrame({"BUSINESS_KEY": ["A1"], "updated_at": ["2026-01-01T00:00:00Z"]})
>>> out = add_audit_columns(
...     df,
...     pipeline_name="orders_pipeline",
...     environment="Sandbox",
...     source_table="orders",
...     watermark_column="updated_at",
...     bucket_column="BUSINESS_KEY",
...     engine="pandas",
... )
>>> "_pipeline_run_id" in out.columns
True
Source code in src/fabricops_kit/technical_columns.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def add_audit_columns(
    df,
    *,
    run_id: str | None = None,
    pipeline_name: str | None = None,
    environment: str | None = None,
    source_system: str | None = None,
    source_table: str | None = None,
    source_extract_timestamp: str | None = None,
    notebook_name: str | None = None,
    loaded_by: str | None = None,
    watermark_column: str | None = None,
    bucket_column: str | None = None,
    bucket_size: int = 512,
    include_row_ingest_id: bool = True,
    engine: str = "auto",
):
    """Add run tracking and audit columns for ingestion workflows.

    Parameters
    ----------
    df : Any
        Input pandas or Spark DataFrame.
    run_id : str | None, optional
        Pipeline run identifier. If omitted, a UUID is generated.
    pipeline_name : str | None, optional
        Pipeline or notebook workflow name.
    environment : str | None, optional
        Environment label such as ``Sandbox`` or ``Production``.
    source_system : str | None, optional
        Upstream source system name.
    source_table : str | None, optional
        Upstream table or dataset name.
    source_extract_timestamp : str | None, optional
        Timestamp string representing source extract time.
    notebook_name : str | None, optional
        Notebook name override. Uses Fabric runtime context when available.
    loaded_by : str | None, optional
        User override. Uses Fabric runtime context when available.
    watermark_column : str | None, optional
        Source column copied into ``_watermark_value``.
    bucket_column : str | None, optional
        Source column used to derive ``_partition_bucket`` and ``_sample_bucket``.
    bucket_size : int, default=512
        Partition bucket modulus. Must be one of ``128``, ``256``, ``512``, ``1024``.
    include_row_ingest_id : bool, default=True
        Whether to add ``_row_ingest_id``.
    engine : str, default="auto"
        Execution engine (``auto``, ``pandas``, or ``spark``).

    Returns
    -------
    Any
        DataFrame with requested audit columns.

    Raises
    ------
    ValueError
        If `watermark_column` or `bucket_column` does not exist, or `bucket_size` is invalid.

    Notes
    -----
    Fabric notebook runtime is optional. This function imports locally without Fabric.
    In Fabric notebooks it reads ``notebookutils.runtime.context`` when available;
    otherwise it falls back to ``local_notebook`` and ``local_user``.

    Examples
    --------
    >>> import pandas as pd
    >>> df = pd.DataFrame({"BUSINESS_KEY": ["A1"], "updated_at": ["2026-01-01T00:00:00Z"]})
    >>> out = add_audit_columns(
    ...     df,
    ...     pipeline_name="orders_pipeline",
    ...     environment="Sandbox",
    ...     source_table="orders",
    ...     watermark_column="updated_at",
    ...     bucket_column="BUSINESS_KEY",
    ...     engine="pandas",
    ... )
    >>> "_pipeline_run_id" in out.columns
    True
    """
    selected_engine = _resolve_engine(df, engine)
    resolved_run_id = run_id or str(uuid.uuid4())
    context = _get_fabric_runtime_context()
    resolved_notebook_name = notebook_name or context.get("currentNotebookName", "local_notebook")
    resolved_loaded_by = loaded_by or context.get("userName", "local_user")

    if watermark_column is not None:
        _assert_columns_exist(df, [watermark_column])
    if bucket_column is not None:
        _assert_columns_exist(df, [bucket_column])
        if bucket_size not in {128, 256, 512, 1024}:
            raise ValueError("bucket_size must be one of 128, 256, 512, or 1024.")

    if selected_engine == "pandas":
        out = df.copy()
        out["_pipeline_run_id"] = resolved_run_id
        if pipeline_name is not None:
            out["_pipeline_name"] = pipeline_name
        if environment is not None:
            out["_pipeline_environment"] = environment
        if source_system is not None:
            out["_source_system"] = source_system
        if source_table is not None:
            out["_source_table"] = source_table
        if source_extract_timestamp is not None:
            out["_source_extract_timestamp"] = source_extract_timestamp
        out["_record_loaded_timestamp"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        out["_notebook_name"] = resolved_notebook_name
        out["_loaded_by"] = resolved_loaded_by
        if watermark_column is not None:
            out["_watermark_value"] = out[watermark_column]
        if bucket_column is not None:
            p_bucket, s_bucket = _bucket_values_pandas(out[bucket_column], bucket_size)
            out["_partition_bucket"] = p_bucket
            out["_sample_bucket"] = s_bucket
        if include_row_ingest_id:
            out["_row_ingest_id"] = [str(uuid.uuid4()) for _ in range(len(out))]
        return out

    from pyspark.sql import functions as F

    out = df.withColumn("_pipeline_run_id", F.lit(resolved_run_id))
    for name, value in [
        ("_pipeline_name", pipeline_name),
        ("_pipeline_environment", environment),
        ("_source_system", source_system),
        ("_source_table", source_table),
        ("_source_extract_timestamp", source_extract_timestamp),
    ]:
        if value is not None:
            out = out.withColumn(name, F.lit(value))
    out = out.withColumn("_record_loaded_timestamp", F.current_timestamp())
    out = out.withColumn("_notebook_name", F.lit(resolved_notebook_name))
    out = out.withColumn("_loaded_by", F.lit(resolved_loaded_by))
    if watermark_column is not None:
        out = out.withColumn("_watermark_value", F.col(watermark_column))
    if bucket_column is not None:
        value = F.abs(F.hash(F.col(bucket_column)))
        out = out.withColumn("_partition_bucket", F.pmod(value, F.lit(bucket_size))).withColumn("_sample_bucket", F.pmod(value, F.lit(1_000_000)))
    if include_row_ingest_id:
        out = out.withColumn("_row_ingest_id", F.expr("uuid()"))
    return out