SPARQL Data Source Demo¶
The SPARQL data source executes SPARQL queries over HTTP(S) endpoints and returns the results as Spark
DataFrames. The companion notebook notebooks/demos/sparql_reader_demo.ipynb walks through a live query
against the public Wikidata service and includes a small fallback dataset so the example remains
useful when the endpoint is unavailable (for example in air-gapped environments).
Prerequisites¶
- A running
SparkSession. - Outbound network access to your SPARQL endpoint (unless you rely on the built-in fallback rows).
- A descriptive
User-Agentheader that complies with the endpoint's usage policy. Wikidata requires contact information in the header string.
Basic usage¶
import json
from spark_fuse.io import (
SPARQL_CONFIG_OPTION,
SPARQL_DATA_SOURCE_NAME,
build_sparql_config,
register_sparql_data_source,
)
endpoint = "https://query.wikidata.org/sparql"
sample_query = """
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
SELECT ?pokemon ?pokemonLabel ?pokedexNumber WHERE {
?pokemon wdt:P31 wd:Q3966183 .
?pokemon wdt:P1685 ?pokedexNumber .
}
ORDER BY ASC(?pokedexNumber)
LIMIT 10
""".strip()
config = {
"query": sample_query,
"request_type": "POST",
"include_metadata": True,
"metadata_suffix": "__",
"coerce_types": True,
"headers": {"User-Agent": "spark-fuse-demo/1.0 (contact@example.com)"},
}
register_sparql_data_source(spark)
options = build_sparql_config(spark, endpoint, source_config=config)
pokemon_df = (
spark.read.format(SPARQL_DATA_SOURCE_NAME)
.option(SPARQL_CONFIG_OPTION, json.dumps(options))
.load()
)
pokemon_df.printSchema()
if pokemon_df.rdd.isEmpty():
print("Endpoint unavailable — using built-in fallback rows.")
else:
pokemon_df.show(5, truncate=False)
Boolean queries (ASK)¶
The data source also supports ASK queries that return a boolean payload.
ask_options = build_sparql_config(
spark,
{"endpoint": endpoint, "query": "ASK WHERE { wd:Q3966183 wdt:P31 wd:Q1656682 }"},
source_config={
"request_type": "POST",
"headers": {"User-Agent": "spark-fuse-demo/1.0 (contact@example.com)"},
},
)
ask_df = (
spark.read.format(SPARQL_DATA_SOURCE_NAME)
.option(SPARQL_CONFIG_OPTION, json.dumps(ask_options))
.load()
)
ask_df.show()
Handling empty responses¶
Some environments may block outbound requests. When the reader cannot retrieve any rows, you can still test downstream transformations by supplying fallback data:
if pokemon_df.rdd.isEmpty():
fallback_rows = [
{"pokemon": "bulbasaur", "pokemonLabel": "Bulbasaur", "pokedexNumber": "001"},
{"pokemon": "charmander", "pokemonLabel": "Charmander", "pokedexNumber": "004"},
{"pokemon": "squirtle", "pokemonLabel": "Squirtle", "pokedexNumber": "007"},
]
pokemon_df = spark.createDataFrame(fallback_rows, schema=pokemon_df.schema)
The demo notebook includes this logic so you can follow along even when the live Wikidata query is not reachable.