Skip to content

Databricks Connector and Utilities

DatabricksDBFSConnector

Bases: Connector

Connector for Databricks DBFS paths and Unity/Hive tables.

Supports reading and writing Delta (default), Parquet, and CSV for DBFS paths.

read

read(spark: SparkSession, source: Any, *, fmt: Optional[str] = None, schema: Optional[Any] = None, source_config: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any) -> DataFrame

Read a dataset from Databricks storage or catalogs.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
source Any

Either a dbfs:/ location, a table identifier string, or a mapping containing table/path metadata.

required
fmt Optional[str]

Optional format override for DBFS reads (delta by default). Ignored for tables.

None
schema Optional[Any]

Optional schema to enforce for file reads. Not supported for table reads.

None
source_config Optional[Mapping[str, Any]]

Connector-specific settings (e.g., catalog, schema, table, path).

None
options Optional[Mapping[str, Any]]

Additional Spark read options.

None
Source code in src/spark_fuse/io/databricks.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read(
    self,
    spark: SparkSession,
    source: Any,
    *,
    fmt: Optional[str] = None,
    schema: Optional[Any] = None,
    source_config: Optional[Mapping[str, Any]] = None,
    options: Optional[Mapping[str, Any]] = None,
    **kwargs: Any,
) -> DataFrame:
    """Read a dataset from Databricks storage or catalogs.

    Args:
        spark: Active `SparkSession`.
        source: Either a `dbfs:/` location, a table identifier string, or a mapping containing
            table/path metadata.
        fmt: Optional format override for DBFS reads (`delta` by default). Ignored for tables.
        schema: Optional schema to enforce for file reads. Not supported for table reads.
        source_config: Connector-specific settings (e.g., `catalog`, `schema`, `table`, `path`).
        options: Additional Spark read options.
    """
    path, table = self._normalize_source(source, source_config)
    opts = dict(options or {})

    if path:
        fmt_value = (fmt or opts.pop("format", None) or "delta").lower()
        reader = spark.read
        if schema is not None:
            reader = reader.schema(schema)
        if opts:
            reader = reader.options(**opts)
        if fmt_value == "delta":
            return reader.format("delta").load(path)
        if fmt_value in {"parquet", "csv"}:
            return reader.format(fmt_value).load(path)
        raise ValueError(f"Unsupported format for Databricks: {fmt_value}")

    if schema is not None:
        raise ValueError("schema parameter is not supported when reading Databricks tables")
    fmt_option = fmt or opts.pop("format", None)
    if fmt_option:
        raise ValueError("fmt/format options are not applicable when reading Databricks tables")
    reader = spark.read
    if opts:
        reader = reader.options(**opts)
    return reader.table(table)

validate_path

validate_path(path: str) -> bool

Return True if the input looks like a DBFS path or Databricks table identifier.

Source code in src/spark_fuse/io/databricks.py
25
26
27
28
29
30
31
32
def validate_path(self, path: str) -> bool:
    """Return True if the input looks like a DBFS path or Databricks table identifier."""
    if not isinstance(path, str):
        return False
    value = path.strip()
    if not value:
        return False
    return value.startswith("dbfs:/") or self._is_table_identifier(value)

write

write(df: DataFrame, path: str, *, fmt: Optional[str] = None, mode: str = 'error', **options: Any) -> None

Write a dataset to DBFS.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write.

required
path str

Output DBFS location.

required
fmt Optional[str]

Optional format override: delta (default), parquet, or csv.

None
mode str

Save mode, e.g. error, overwrite, append.

'error'
**options Any

Additional Spark write options.

{}
Source code in src/spark_fuse/io/databricks.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def write(
    self,
    df: DataFrame,
    path: str,
    *,
    fmt: Optional[str] = None,
    mode: str = "error",
    **options: Any,
) -> None:
    """Write a dataset to DBFS.

    Args:
        df: DataFrame to write.
        path: Output DBFS location.
        fmt: Optional format override: `delta` (default), `parquet`, or `csv`.
        mode: Save mode, e.g. `error`, `overwrite`, `append`.
        **options: Additional Spark write options.
    """
    if not self.validate_path(path):
        raise ValueError(f"Invalid DBFS path: {path}")
    fmt = (fmt or options.pop("format", None) or "delta").lower()
    # When writing Delta, use SCD upsert helpers
    if fmt == "delta":
        # Import at call time to avoid importing Delta libs unless needed
        from ..utils.scd import SCDMode, apply_scd

        scd_mode_opt = (
            options.pop("scd_mode", None) or options.pop("scd", None) or "scd2"
        ).upper()
        if scd_mode_opt not in {"SCD1", "SCD2"}:
            raise ValueError("scd_mode must be 'SCD1' or 'SCD2'")
        scd_mode = SCDMode[scd_mode_opt]

        business_keys = options.pop("business_keys", None)
        if business_keys is None:
            raise ValueError("business_keys must be provided for SCD writes to Delta")
        if not business_keys or len(business_keys) == 0:
            raise ValueError(
                "business_keys must be a non-empty sequence for SCD writes to Delta"
            )

        tracked_columns = as_seq(options.pop("tracked_columns", None))
        dedupe_keys = as_seq(options.pop("dedupe_keys", None))
        order_by = as_seq(options.pop("order_by", None))

        # SCD2-specific options (also used by SCD1 for hash_col)
        effective_col = options.pop("effective_col", "effective_start_ts")
        expiry_col = options.pop("expiry_col", "effective_end_ts")
        current_col = options.pop("current_col", "is_current")
        version_col = options.pop("version_col", "version")
        hash_col = options.pop("hash_col", "row_hash")

        load_ts_expr_opt = options.pop("load_ts_expr", None)
        load_ts_expr = F.expr(load_ts_expr_opt) if isinstance(load_ts_expr_opt, str) else None

        null_key_policy = options.pop("null_key_policy", "error")
        create_if_not_exists = as_bool(options.pop("create_if_not_exists", True), True)

        kwargs: Dict[str, Any] = {
            "business_keys": business_keys,
            "tracked_columns": tracked_columns,
            "dedupe_keys": dedupe_keys,
            "order_by": order_by,
            "hash_col": hash_col,
            "null_key_policy": null_key_policy,
            "create_if_not_exists": create_if_not_exists,
        }

        if scd_mode == SCDMode.SCD2:
            kwargs.update(
                {
                    "effective_col": effective_col,
                    "expiry_col": expiry_col,
                    "current_col": current_col,
                    "version_col": version_col,
                    "load_ts_expr": load_ts_expr,
                }
            )

        # Use DataFrame's session
        spark = df.sparkSession
        apply_scd(
            spark,
            df,
            path,
            scd_mode=scd_mode,
            **{k: v for k, v in kwargs.items() if v is not None},
        )
    elif fmt in {"parquet", "csv"}:
        writer = df.write.mode(mode).options(**options)
        writer.format(fmt).save(path)
    else:
        raise ValueError(f"Unsupported format for Databricks: {fmt}")

databricks_submit_job

databricks_submit_job(payload: Dict[str, Any], *, host: Optional[str] = None, token: Optional[str] = None) -> Dict[str, Any]

Submit a job run to Databricks using the 2.1 Runs Submit API.

Environment variables DATABRICKS_HOST and DATABRICKS_TOKEN are used if not provided. Returns the parsed JSON response or raises for HTTP errors.

Source code in src/spark_fuse/io/databricks.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def databricks_submit_job(
    payload: Dict[str, Any], *, host: Optional[str] = None, token: Optional[str] = None
) -> Dict[str, Any]:
    """Submit a job run to Databricks using the 2.1 Runs Submit API.

    Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` are used if not provided.
    Returns the parsed JSON response or raises for HTTP errors.
    """
    host = host or os.environ.get("DATABRICKS_HOST")
    token = token or os.environ.get("DATABRICKS_TOKEN")
    if not host or not token:
        raise ValueError("DATABRICKS_HOST and DATABRICKS_TOKEN must be set to submit jobs")

    url = host.rstrip("/") + "/api/2.1/jobs/runs/submit"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    resp = requests.post(url, headers=headers, data=json.dumps(payload), timeout=60)
    resp.raise_for_status()
    return resp.json()