Skip to content

IO Core

Connector

Bases: ABC

Abstract base class for IO connectors.

Connector implementations must define a class attribute name and implement validate_path, read, and write.

read abstractmethod

read(spark: SparkSession, source: Any, *, fmt: Optional[str] = None, schema: Optional[Any] = None, source_config: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any) -> DataFrame

Load a dataset from the given source and return a Spark DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
source Any

Canonical identifier for the data source (e.g. filesystem path, database table name, REST endpoint, etc.).

required
fmt Optional[str]

Optional format hint (e.g. delta, csv) for connectors that support multiple serialization formats.

None
schema Optional[Any]

Optional explicit schema to enforce. Connectors may ignore this parameter if schema enforcement is not supported.

None
source_config Optional[Mapping[str, Any]]

Connector-specific configuration describing how to read from source (e.g. pagination settings for REST APIs, credentials, or authentication hints).

None
options Optional[Mapping[str, Any]]

Additional connector options. This can be used for Spark read options or arbitrary connector-specific parameters.

None
**kwargs Any

Future extension point for connector-specific keyword arguments.

{}

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: The loaded dataset.

Source code in src/spark_fuse/io/base.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
def read(
    self,
    spark: SparkSession,
    source: Any,
    *,
    fmt: Optional[str] = None,
    schema: Optional[Any] = None,
    source_config: Optional[Mapping[str, Any]] = None,
    options: Optional[Mapping[str, Any]] = None,
    **kwargs: Any,
) -> DataFrame:
    """Load a dataset from the given source and return a Spark DataFrame.

    Args:
        spark: Active ``SparkSession``.
        source: Canonical identifier for the data source (e.g. filesystem path,
            database table name, REST endpoint, etc.).
        fmt: Optional format hint (e.g. ``delta``, ``csv``) for connectors that
            support multiple serialization formats.
        schema: Optional explicit schema to enforce. Connectors may ignore this
            parameter if schema enforcement is not supported.
        source_config: Connector-specific configuration describing how to read
            from ``source`` (e.g. pagination settings for REST APIs, credentials,
            or authentication hints).
        options: Additional connector options. This can be used for Spark read
            options or arbitrary connector-specific parameters.
        **kwargs: Future extension point for connector-specific keyword arguments.

    Returns:
        pyspark.sql.DataFrame: The loaded dataset.
    """

validate_path abstractmethod

validate_path(path: str) -> bool

Return True if the given path/URI is supported by this connector.

Source code in src/spark_fuse/io/base.py
19
20
21
@abstractmethod
def validate_path(self, path: str) -> bool:
    """Return True if the given path/URI is supported by this connector."""

write abstractmethod

write(df: DataFrame, path: str, *, fmt: Optional[str] = None, mode: str = 'error', **options: Any) -> None

Write a dataset to the given path using the connector.

Source code in src/spark_fuse/io/base.py
56
57
58
59
60
61
62
63
64
65
66
@abstractmethod
def write(
    self,
    df: DataFrame,
    path: str,
    *,
    fmt: Optional[str] = None,
    mode: str = "error",
    **options: Any,
) -> None:
    """Write a dataset to the given path using the connector."""

connector_for_path

connector_for_path(path: str) -> Optional[Connector]

Return the first connector instance that validate_path for the given path.

Iterates through registered connectors and instantiates each to check path support. Returns None if no connector validates the path.

Source code in src/spark_fuse/io/registry.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def connector_for_path(path: str) -> Optional[Connector]:
    """Return the first connector instance that `validate_path` for the given path.

    Iterates through registered connectors and instantiates each to check path support.
    Returns `None` if no connector validates the path.
    """
    for cls in _REGISTRY.values():
        inst = cls()
        try:
            if inst.validate_path(path):
                return inst
        except Exception:
            continue
    return None

get_connector

get_connector(name: str) -> Optional[Type[Connector]]

Return a registered connector class by name, if present.

Source code in src/spark_fuse/io/registry.py
20
21
22
def get_connector(name: str) -> Optional[Type[Connector]]:
    """Return a registered connector class by name, if present."""
    return _REGISTRY.get(name)

list_connectors

list_connectors() -> Iterable[str]

List available connector names in sorted order.

Source code in src/spark_fuse/io/registry.py
25
26
27
def list_connectors() -> Iterable[str]:
    """List available connector names in sorted order."""
    return sorted(_REGISTRY.keys())

register_connector

register_connector(connector_cls: Type[Connector]) -> Type[Connector]

Class decorator to register a connector by its name attribute.

Source code in src/spark_fuse/io/registry.py
11
12
13
14
15
16
17
def register_connector(connector_cls: Type[Connector]) -> Type[Connector]:
    """Class decorator to register a connector by its `name` attribute."""
    name = getattr(connector_cls, "name", None)
    if not name:
        raise ValueError("Connector class must define a non-empty 'name' attribute")
    _REGISTRY[name] = connector_cls
    return connector_cls