Skip to content

SPARQL Data Source

Use build_sparql_config with spark.read.format("spark-fuse-sparql") to run SPARQL queries against HTTP endpoints, apply retry/backoff policies, and map result bindings (including optional metadata) into Spark DataFrames.

Build the options payload consumed by the SPARQL data source.

Source code in src/spark_fuse/io/sparql.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def build_sparql_config(
    spark: SparkSession,
    source: Any,
    *,
    source_config: Optional[Mapping[str, Any]] = None,
    options: Optional[Mapping[str, Any]] = None,
    headers: Optional[Mapping[str, str]] = None,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Build the options payload consumed by the SPARQL data source."""

    config: Dict[str, Any] = {}
    for mapping in (source_config, options, kwargs):
        if mapping:
            config.update(mapping)

    endpoint: Optional[str] = None
    queries: List[str] = []

    if isinstance(source, Mapping):
        endpoint = source.get("endpoint") or source.get("url") or source.get("path")
        if "query" in source:
            queries.append(str(source["query"]))
        if "queries" in source:
            queries.extend([str(q) for q in _as_sequence(source["queries"])])
    elif isinstance(source, str):
        endpoint = source
    elif source is not None:
        raise TypeError("SPARQL source must be a string endpoint or configuration mapping")

    endpoint = endpoint or config.get("endpoint") or config.get("url")
    if not isinstance(endpoint, str) or not _validate_endpoint(endpoint):
        raise ValueError("SPARQL reader requires an HTTP(S) endpoint URL")

    if "query" in config:
        queries.append(str(config["query"]))
    if "queries" in config:
        queries.extend([str(q) for q in _as_sequence(config["queries"])])

    queries = [query.strip() for query in queries if isinstance(query, str) and query.strip()]
    if not queries:
        raise ValueError("SPARQL reader requires at least one query to execute")

    params = config.get("params")
    if isinstance(params, Mapping):
        base_params: Mapping[str, Any] = params
    elif params is None:
        base_params = {}
    else:
        raise TypeError("SPARQL params configuration must be a mapping if provided")

    request_type = str(config.get("request_type", "POST")).upper()
    if request_type not in {"GET", "POST"}:
        raise ValueError("SPARQL request_type must be either 'GET' or 'POST'")

    payload_mode = str(config.get("payload_mode", "form")).lower()
    if payload_mode not in {"form", "json", "raw"}:
        raise ValueError("payload_mode must be one of {'form', 'json', 'raw'}")

    query_param = str(config.get("query_param", "query"))
    request_timeout = float(config.get("request_timeout", 30.0))
    max_retries = int(config.get("max_retries", 3))
    backoff_factor = float(config.get("retry_backoff_factor", 0.5))

    include_metadata = bool(config.get("include_metadata", False))
    metadata_suffix = str(config.get("metadata_suffix", "__"))
    coerce_types = bool(config.get("coerce_types", True))

    base_headers: Dict[str, str] = {"Accept": _DEFAULT_ACCEPT}
    if payload_mode == "raw":
        base_headers.setdefault("Content-Type", "application/sparql-query")
    for header_map in (config.get("headers"), headers):
        if isinstance(header_map, Mapping):
            base_headers.update({str(k): str(v) for k, v in header_map.items()})

    auth_value = config.get("auth")
    auth = None
    if isinstance(auth_value, Sequence) and len(auth_value) == 2:
        auth = [str(auth_value[0]), str(auth_value[1])]

    parallelism = max(int(config.get("parallelism", len(queries) or 1)), 1)

    payload_config = {
        "endpoint": endpoint,
        "queries": queries,
        "params": dict(base_params),
        "headers": base_headers,
        "auth": auth,
        "request_type": request_type,
        "payload_mode": payload_mode,
        "query_param": query_param,
        "include_metadata": include_metadata,
        "metadata_suffix": metadata_suffix,
        "coerce_types": coerce_types,
        "timeout": request_timeout,
        "max_retries": max_retries,
        "backoff_factor": backoff_factor,
        "parallelism": parallelism,
    }

    return payload_config

Bases: DataSource

Source code in src/spark_fuse/io/sparql.py
387
388
389
390
391
392
393
394
395
396
def __init__(self, options: Mapping[str, str]) -> None:
    super().__init__(options)
    raw_config = options.get(SPARQL_CONFIG_OPTION)
    if not raw_config:
        raise ValueError("SPARQL data source requires the config option to be provided")
    self._config = _SPARQLResolvedConfig.from_dict(json.loads(raw_config))
    schema_json = options.get(SPARQL_SCHEMA_OPTION)
    self._user_schema = StructType.fromJson(json.loads(schema_json)) if schema_json else None
    self._schema_cache: Optional[StructType] = None
    self._partitions: Optional[List[_SPARQLInputPartition]] = None