Skip to content

Fabric OneLake Connector

FabricLakehouseConnector

Bases: Connector

Connector for Microsoft Fabric Lakehouses via OneLake URIs.

Accepts either onelake://... URIs or abfss://...@onelake.dfs.fabric.microsoft.com/.... Supports Delta (default), Parquet, and CSV.

read

read(spark: SparkSession, source: Any, *, fmt: Optional[str] = None, schema: Optional[Any] = None, source_config: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any) -> DataFrame

Read a dataset from a Fabric OneLake-backed location.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
source Any

OneLake or abfss-on-OneLake path.

required
fmt Optional[str]

Optional format override: delta (default), parquet, or csv.

None
schema Optional[Any]

Optional schema to enforce when reading structured data.

None
source_config Optional[Mapping[str, Any]]

Unused for Fabric, accepted for interface compatibility.

None
options Optional[Mapping[str, Any]]

Additional Spark read options.

None
Source code in src/spark_fuse/io/fabric.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def read(
    self,
    spark: SparkSession,
    source: Any,
    *,
    fmt: Optional[str] = None,
    schema: Optional[Any] = None,
    source_config: Optional[Mapping[str, Any]] = None,
    options: Optional[Mapping[str, Any]] = None,
    **kwargs: Any,
) -> DataFrame:
    """Read a dataset from a Fabric OneLake-backed location.

    Args:
        spark: Active `SparkSession`.
        source: OneLake or abfss-on-OneLake path.
        fmt: Optional format override: `delta` (default), `parquet`, or `csv`.
        schema: Optional schema to enforce when reading structured data.
        source_config: Unused for Fabric, accepted for interface compatibility.
        options: Additional Spark read options.
    """
    if not isinstance(source, str):
        raise TypeError("FabricLakehouseConnector.read expects 'source' to be a string path")
    path = source
    if not self.validate_path(path):
        raise ValueError(f"Invalid Fabric OneLake path: {path}")
    opts = dict(options or {})
    fmt = (fmt or opts.pop("format", None) or "delta").lower()
    reader = spark.read
    if schema is not None:
        reader = reader.schema(schema)
    if opts:
        reader = reader.options(**opts)
    if fmt == "delta":
        return reader.format("delta").load(path)
    elif fmt in {"parquet", "csv"}:
        return reader.format(fmt).load(path)
    else:
        raise ValueError(f"Unsupported format for Fabric: {fmt}")

validate_path

validate_path(path: str) -> bool

Return True if the path looks like a valid OneLake URI.

Source code in src/spark_fuse/io/fabric.py
28
29
30
def validate_path(self, path: str) -> bool:
    """Return True if the path looks like a valid OneLake URI."""
    return bool(_ONELAKE_SCHEME.match(path) or _ONELAKE_ABFSS.match(path))

write

write(df: DataFrame, path: str, *, fmt: Optional[str] = None, mode: str = 'error', **options: Any) -> None

Write a dataset to a Fabric OneLake-backed location.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write.

required
path str

Output OneLake path.

required
fmt Optional[str]

Optional format override: delta (default), parquet, or csv.

None
mode str

Save mode, e.g. error, overwrite, append.

'error'
**options Any

Additional Spark write options.

{}
Source code in src/spark_fuse/io/fabric.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def write(
    self,
    df: DataFrame,
    path: str,
    *,
    fmt: Optional[str] = None,
    mode: str = "error",
    **options: Any,
) -> None:
    """Write a dataset to a Fabric OneLake-backed location.

    Args:
        df: DataFrame to write.
        path: Output OneLake path.
        fmt: Optional format override: `delta` (default), `parquet`, or `csv`.
        mode: Save mode, e.g. `error`, `overwrite`, `append`.
        **options: Additional Spark write options.
    """
    if not self.validate_path(path):
        raise ValueError(f"Invalid Fabric OneLake path: {path}")
    fmt = (fmt or options.pop("format", None) or "delta").lower()
    if fmt == "delta":
        from ..utils.scd import SCDMode, apply_scd

        scd_mode_opt = (
            options.pop("scd_mode", None) or options.pop("scd", None) or "scd2"
        ).upper()
        if scd_mode_opt not in {"SCD1", "SCD2"}:
            raise ValueError("scd_mode must be 'SCD1' or 'SCD2'")
        scd_mode = SCDMode[scd_mode_opt]

        business_keys = options.pop("business_keys", None)
        if business_keys is None:
            raise ValueError("business_keys must be provided for SCD writes to Delta")
        if not business_keys or len(business_keys) == 0:
            raise ValueError(
                "business_keys must be a non-empty sequence for SCD writes to Delta"
            )

        tracked_columns = as_seq(options.pop("tracked_columns", None))
        dedupe_keys = as_seq(options.pop("dedupe_keys", None))
        order_by = as_seq(options.pop("order_by", None))

        effective_col = options.pop("effective_col", "effective_start_ts")
        expiry_col = options.pop("expiry_col", "effective_end_ts")
        current_col = options.pop("current_col", "is_current")
        version_col = options.pop("version_col", "version")
        hash_col = options.pop("hash_col", "row_hash")

        load_ts_expr_opt = options.pop("load_ts_expr", None)
        load_ts_expr = F.expr(load_ts_expr_opt) if isinstance(load_ts_expr_opt, str) else None

        null_key_policy = options.pop("null_key_policy", "error")
        create_if_not_exists = as_bool(options.pop("create_if_not_exists", True), True)

        kwargs: dict[str, Any] = {
            "business_keys": business_keys,
            "tracked_columns": tracked_columns,
            "dedupe_keys": dedupe_keys,
            "order_by": order_by,
            "hash_col": hash_col,
            "null_key_policy": null_key_policy,
            "create_if_not_exists": create_if_not_exists,
        }
        if scd_mode == SCDMode.SCD2:
            kwargs.update(
                {
                    "effective_col": effective_col,
                    "expiry_col": expiry_col,
                    "current_col": current_col,
                    "version_col": version_col,
                    "load_ts_expr": load_ts_expr,
                }
            )

        spark = df.sparkSession
        apply_scd(
            spark,
            df,
            path,
            scd_mode=scd_mode,
            **{k: v for k, v in kwargs.items() if v is not None},
        )
    elif fmt in {"parquet", "csv"}:
        writer = df.write.mode(mode).options(**options)
        writer.format(fmt).save(path)
    else:
        raise ValueError(f"Unsupported format for Fabric: {fmt}")