Skip to content

ADLS Gen2 Connector

ADLSGen2Connector

Bases: Connector

Connector for Azure Data Lake Storage Gen2 using abfss:// URIs.

  • Supports reading and writing Delta (default), Parquet, and CSV.
  • Path must match abfss://<container>@<account>.dfs.core.windows.net/<path>.

read

read(spark: SparkSession, source: Any, *, fmt: Optional[str] = None, schema: Optional[Any] = None, source_config: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any) -> DataFrame

Read a dataset from ADLS Gen2.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
source Any

abfss:// location to read from.

required
fmt Optional[str]

Optional format override: delta (default), parquet, or csv.

None
schema Optional[Any]

Optional schema for structured reads.

None
source_config Optional[Mapping[str, Any]]

Unused for ADLS, accepted for interface compatibility.

None
options Optional[Mapping[str, Any]]

Additional Spark read options.

None
Source code in src/spark_fuse/io/azure_adls.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def read(
    self,
    spark: SparkSession,
    source: Any,
    *,
    fmt: Optional[str] = None,
    schema: Optional[Any] = None,
    source_config: Optional[Mapping[str, Any]] = None,
    options: Optional[Mapping[str, Any]] = None,
    **kwargs: Any,
) -> DataFrame:
    """Read a dataset from ADLS Gen2.

    Args:
        spark: Active `SparkSession`.
        source: `abfss://` location to read from.
        fmt: Optional format override: `delta` (default), `parquet`, or `csv`.
        schema: Optional schema for structured reads.
        source_config: Unused for ADLS, accepted for interface compatibility.
        options: Additional Spark read options.
    """
    if not isinstance(source, str):
        raise TypeError("ADLSGen2Connector.read expects 'source' to be a string path")
    path = source
    if not self.validate_path(path):
        raise ValueError(f"Invalid ADLS Gen2 path: {path}")
    opts = dict(options or {})
    fmt = (fmt or opts.pop("format", None) or "delta").lower()
    reader = spark.read
    if schema is not None:
        reader = reader.schema(schema)
    if opts:
        reader = reader.options(**opts)
    if fmt == "delta":
        return reader.format("delta").load(path)
    elif fmt in {"parquet", "csv"}:
        return reader.format(fmt).load(path)
    else:
        raise ValueError(f"Unsupported format for ADLS: {fmt}")

validate_path

validate_path(path: str) -> bool

Return True if the path is a valid ADLS Gen2 abfss:// URI.

Source code in src/spark_fuse/io/azure_adls.py
25
26
27
def validate_path(self, path: str) -> bool:
    """Return True if the path is a valid ADLS Gen2 `abfss://` URI."""
    return bool(_ABFSS_RE.match(path))

write

write(df: DataFrame, path: str, *, fmt: Optional[str] = None, mode: str = 'error', **options: Any) -> None

Write a dataset to ADLS Gen2.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write.

required
path str

abfss:// output location.

required
fmt Optional[str]

Optional format override: delta (default), parquet, or csv.

None
mode str

Save mode, e.g. error, overwrite, append.

'error'
**options Any

Additional Spark write options.

{}
Source code in src/spark_fuse/io/azure_adls.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def write(
    self,
    df: DataFrame,
    path: str,
    *,
    fmt: Optional[str] = None,
    mode: str = "error",
    **options: Any,
) -> None:
    """Write a dataset to ADLS Gen2.

    Args:
        df: DataFrame to write.
        path: `abfss://` output location.
        fmt: Optional format override: `delta` (default), `parquet`, or `csv`.
        mode: Save mode, e.g. `error`, `overwrite`, `append`.
        **options: Additional Spark write options.
    """
    if not self.validate_path(path):
        raise ValueError(f"Invalid ADLS Gen2 path: {path}")
    fmt = (fmt or options.pop("format", None) or "delta").lower()
    writer = df.write.mode(mode).options(**options)
    if fmt == "delta":
        writer.format("delta").save(path)
    elif fmt in {"parquet", "csv"}:
        writer.format(fmt).save(path)
    else:
        raise ValueError(f"Unsupported format for ADLS: {fmt}")