Skip to content

lakehouse_table_write

Public callable

Write a Spark DataFrame to a Fabric lakehouse Delta table.

This writes to the lakehouse Tables/ area using the ABFSS root stored in a Housepath. Use this in the Unified/Product stage after transformations, DQ checks, and technical-column enrichment are complete.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame to write.

required
lh Housepath

Lakehouse path object returned by get_path.

required
tablename str

Target table name under the lakehouse Tables/ folder.

required
mode str

Spark write mode. Supported values are "append", "overwrite", "errorifexists", and "ignore".

"append"
partition_by str or list[str]

Column or columns used to physically partition the Delta table.

None
repartition_by int, str, list, or tuple

Optional repartitioning before write.

None
overwrite_schema bool

Whether to set Spark Delta overwriteSchema=true before saving.

True

Returns:

Type Description
None

The DataFrame is written to the target Delta table path.

Notes

Side effects: - Persists data to OneLake Delta storage under Tables/<tablename>. - Optional repartitioning can change output file sizing and partition layout.

Raises:

Type Description
ValueError

If lh.root, tablename, or mode is invalid.

Examples:

>>> lh_unified = get_path("Sandbox", "Unified", config=CONFIG)
>>> lakehouse_table_write(
...     df,
...     lh_unified,
...     "CLEAN_ORDERS",
...     mode="overwrite",
...     partition_by="p_bucket",
...     repartition_by=(200, "p_bucket"),
... )
Source code in src/fabricops_kit/fabric_io.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def lakehouse_table_write(
    df,
    lh,
    tablename,
    mode="append",
    partition_by=None,
    repartition_by=None,
    overwrite_schema=True,
):
    """Write a Spark DataFrame to a Fabric lakehouse Delta table.

    This writes to the lakehouse `Tables/` area using the ABFSS root stored in
    a `Housepath`. Use this in the Unified/Product stage after transformations,
    DQ checks, and technical-column enrichment are complete.

    Parameters
    ----------
    df : pyspark.sql.DataFrame
        Spark DataFrame to write.
    lh : Housepath
        Lakehouse path object returned by `get_path`.
    tablename : str
        Target table name under the lakehouse `Tables/` folder.
    mode : str, default "append"
        Spark write mode. Supported values are `"append"`, `"overwrite"`,
        `"errorifexists"`, and `"ignore"`.
    partition_by : str or list[str], optional
        Column or columns used to physically partition the Delta table.
    repartition_by : int, str, list, or tuple, optional
        Optional repartitioning before write.
    overwrite_schema : bool, default True
        Whether to set Spark Delta `overwriteSchema=true` before saving.

    Returns
    -------
    None
        The DataFrame is written to the target Delta table path.

    Notes
    -----
    Side effects:
    - Persists data to OneLake Delta storage under ``Tables/<tablename>``.
    - Optional repartitioning can change output file sizing and partition
      layout.

    Raises
    ------
    ValueError
        If `lh.root`, `tablename`, or `mode` is invalid.

    Examples
    --------
    >>> lh_unified = get_path("Sandbox", "Unified", config=CONFIG)
    >>> lakehouse_table_write(
    ...     df,
    ...     lh_unified,
    ...     "CLEAN_ORDERS",
    ...     mode="overwrite",
    ...     partition_by="p_bucket",
    ...     repartition_by=(200, "p_bucket"),
    ... )
    """
    if not getattr(lh, "root", None):
        raise ValueError("lh.root is required.")
    if not tablename:
        raise ValueError("tablename is required.")

    normalized_mode = str(mode or "").lower().strip()
    if normalized_mode not in {"append", "overwrite", "errorifexists", "ignore"}:
        raise ValueError("mode must be one of append, overwrite, errorifexists, ignore.")

    path = f"{lh.root.rstrip('/')}/Tables/{tablename}"

    if repartition_by is not None:
        if isinstance(repartition_by, (list, tuple)):
            if len(repartition_by) > 0 and isinstance(repartition_by[0], int):
                df = df.repartition(repartition_by[0], *repartition_by[1:])
            else:
                df = df.repartition(*repartition_by)
        elif isinstance(repartition_by, int):
            df = df.repartition(repartition_by)
        else:
            df = df.repartition(repartition_by)

    writer = df.write.mode(normalized_mode).format("delta")

    if partition_by is not None:
        if isinstance(partition_by, (list, tuple)):
            writer = writer.partitionBy(*partition_by)
        else:
            writer = writer.partitionBy(partition_by)

    if overwrite_schema:
        writer = writer.option("overwriteSchema", "true")

    writer.save(path)