Skip to content

Change Tracking Demo

The change-tracking helpers (current_only_upsert, track_history_upsert, apply_change_tracking) coordinate Delta MERGE operations so you can either keep a single current record per business key or maintain the full history of changes. They also plug into DataFrame.write.change_tracking for fluent option parsing.

from pyspark.sql import Row
from spark_fuse.spark import create_session
from spark_fuse.utils import change_tracking
from spark_fuse.utils.change_tracking import (
    ChangeTrackingMode,
    apply_change_tracking,
    current_only_upsert,
    track_history_upsert,
)

spark = create_session(app_name="spark-fuse-change-tracking-demo")

Current-only merges

current_only_upsert keeps a single current row per business key, overwriting previous values when a change arrives.

initial = spark.createDataFrame(
    [Row(id=1, val="a", ts=1), Row(id=1, val="b", ts=2), Row(id=2, val="x", ts=5)]
)

target_path = "/tmp/current_only_demo"
current_only_upsert(
    spark,
    initial,
    target_path,
    business_keys=["id"],
    tracked_columns=["val"],
    order_by=["ts"],
)

updates = spark.createDataFrame(
    [Row(id=1, val="c", ts=3), Row(id=3, val="z", ts=1)]
)
current_only_upsert(
    spark,
    updates,
    target_path,
    business_keys=["id"],
    tracked_columns=["val"],
    order_by=["ts"],
)

If future batches introduce new attributes, enable Delta schema evolution:

current_only_upsert(
    spark,
    spark.createDataFrame([Row(id=1, val="d", color="blue", ts=4)]),
    target_path,
    business_keys=["id"],
    tracked_columns=["val", "color"],
    order_by=["ts"],
    allow_schema_evolution=True,
)

Track history

track_history_upsert closes the active row for a key (sets is_current=false and writes the expiry timestamp) before inserting a new version. When a batch contains multiple rows for the same key, the helper processes them chronologically so every intermediate change is preserved.

history_target = "/tmp/track_history_demo"

source = spark.createDataFrame(
    [Row(id=1, val="a", ts=1), Row(id=1, val="b", ts=2), Row(id=2, val="x", ts=5)]
)

track_history_upsert(
    spark,
    source,
    history_target,
    business_keys=["id"],
    tracked_columns=["val"],
    order_by=["ts"],
    load_ts_expr="to_timestamp('2024-01-01 00:00:00')",
)

changes = spark.createDataFrame([Row(id=1, val="c", ts=3), Row(id=3, val="z", ts=1)])
track_history_upsert(
    spark,
    changes,
    history_target,
    business_keys=["id"],
    tracked_columns=["val"],
    order_by=["ts"],
    load_ts_expr="to_timestamp('2024-01-02 00:00:00')",
)

When new attributes appear later, schema evolution keeps the Delta table aligned:

track_history_upsert(
    spark,
    spark.createDataFrame([Row(id=1, val="d", color="blue", ts=4)]),
    history_target,
    business_keys=["id"],
    tracked_columns=["val", "color"],
    order_by=["ts"],
    load_ts_expr="current_timestamp()",
    allow_schema_evolution=True,
)

Default expiry value

By default, open/current rows have effective_end_ts = NULL. Pass default_expiry_value to use a sentinel timestamp instead — handy for range filters and avoiding COALESCE:

track_history_upsert(
    spark,
    spark.createDataFrame([Row(id=1, val="a")]),
    "/tmp/track_history_sentinel_demo",
    business_keys=["id"],
    tracked_columns=["val"],
    load_ts_expr="to_timestamp('2024-01-01 00:00:00')",
    default_expiry_value="9999-12-31",
)

Accepts a string (cast to timestamp) or a PySpark Column. Closed rows always receive the load timestamp regardless of this setting.

Unified dispatcher and writer sugar

Use apply_change_tracking to switch between strategies with a shared signature:

dispatcher_target = "/tmp/apply_change_tracking_demo"
apply_change_tracking(
    spark,
    spark.createDataFrame([Row(id=1, val="a"), Row(id=2, val="b")]),
    dispatcher_target,
    change_tracking_mode=ChangeTrackingMode.TRACK_HISTORY,
    business_keys=["id"],
    tracked_columns=["val"],
    load_ts_expr="current_timestamp()",
)

For an even more fluent API, call df.write.change_tracking:

spark.range(2).toDF("id").write.change_tracking.options(
    change_tracking_mode="track_history",
    change_tracking_options={
        "business_keys": ["id"],
        "tracked_columns": ["id"],
        "load_ts_expr": "current_timestamp()",
    },
).table("catalog.schema.dim_id")

Verbose logging

All change-tracking functions accept a verbose: bool = False parameter. When enabled, operational details are logged at INFO level:

current_only_upsert(
    spark,
    source_df,
    target_path,
    business_keys=["id"],
    tracked_columns=["val"],
    order_by=["ts"],
    verbose=True,
)

Example output:

spark_fuse.utils.change_tracking - current_only_upsert: target=/tmp/target, business_keys=['id'], tracked_columns=['val'], hash_col=row_hash
spark_fuse.utils.change_tracking - Source rows after deduplication: 1000
spark_fuse.utils.change_tracking - Target '/tmp/target' exists, executing merge
spark_fuse.utils.change_tracking - Merge executed on '/tmp/target'

For track_history_upsert, you'll see batch progress:

spark_fuse.utils.change_tracking - track_history_upsert: target=/tmp/target, business_keys=['id'], tracked_columns=['val'], effective_col=effective_start_ts, expiry_col=effective_end_ts, current_col=is_current, version_col=version, hash_col=row_hash
spark_fuse.utils.change_tracking - Target '/tmp/target' exists
spark_fuse.utils.change_tracking - Processing 3 batches (max_seq=3)
spark_fuse.utils.change_tracking - Processing batch 1/3
spark_fuse.utils.change_tracking - Running merge on existing target '/tmp/target'
spark_fuse.utils.change_tracking - Rows to insert: 50
spark_fuse.utils.change_tracking - Processing batch 2/3
...

The verbose parameter is also available on the fluent writer:

spark.range(2).toDF("id").write.change_tracking.options(
    change_tracking_mode="current_only",
    change_tracking_options={"business_keys": ["id"]},
).table("catalog.schema.dim_id", verbose=True)

Streaming with foreachBatch

Structured Streaming DataFrames cannot be written directly with the batch change tracking helpers. Instead, use df.writeStream.change_tracking — the streaming accessor delegates each micro-batch to the same batch helpers via foreachBatch, so all idempotency guarantees (row-hash deduplication for history mode, MERGE conditions for current-only mode) apply per micro-batch and provide effectively exactly-once semantics against Delta sinks.

A checkpoint location is required for fault tolerance:

from spark_fuse.utils import enable_change_tracking_accessors

enable_change_tracking_accessors()

# Current-only (Type 1) — keep one current row per key
query = (
    spark.readStream.format("delta").load("/mnt/bronze/events")
    .writeStream.change_tracking
    .options(
        change_tracking_mode="current_only",
        business_keys=["event_id"],
    )
    .toTable("/mnt/silver/events", checkpoint="/mnt/checkpoints/events")
)
query.awaitTermination()
# Track history (Type 2) — create new versions, close previous ones
query = (
    spark.readStream.format("delta").load("/mnt/bronze/customers")
    .writeStream.change_tracking
    .options(
        change_tracking_mode="track_history",
        business_keys=["customer_id"],
    )
    .toTable("/mnt/silver/customers", checkpoint="/mnt/checkpoints/customers")
)
query.awaitTermination()

For batch-style one-shot runs (e.g. in a scheduled job), pass trigger={"availableNow": True}:

query = (
    spark.readStream.format("delta").load("/mnt/bronze/orders")
    .writeStream.change_tracking
    .options(change_tracking_mode="current_only", business_keys=["order_id"])
    .toTable(
        "/mnt/silver/orders",
        checkpoint="/mnt/checkpoints/orders",
        trigger={"availableNow": True},
    )
)
query.awaitTermination()

You can also construct a StreamingChangeTrackingWriter directly and pass it to foreachBatch for full control over the streaming query:

from spark_fuse.utils import StreamingChangeTrackingWriter

writer = StreamingChangeTrackingWriter(
    target="/mnt/silver/orders",
    options={"change_tracking_mode": "current_only", "business_keys": ["order_id"]},
)

(
    spark.readStream.format("delta").load("/mnt/bronze/orders")
    .writeStream
    .option("checkpointLocation", "/mnt/checkpoints/orders")
    .outputMode("update")
    .trigger(availableNow=True)
    .foreachBatch(writer)
    .start()
    .awaitTermination()
)

Declarative pipelines integration

spark_fuse provides decorator helpers for Spark Declarative Pipelines (pyspark.pipelines, available in PySpark >= 4.0). The decorators register pipeline tables and flows that apply change tracking merge semantics inside foreachBatch.

Two-step pattern — explicit table + flow

Use create_change_tracking_table to declare the streaming target, then @change_tracking_flow to register the data flow:

from spark_fuse.utils import change_tracking_flow, create_change_tracking_table

create_change_tracking_table("catalog.schema.customers")

@change_tracking_flow(
    target="catalog.schema.customers",
    change_tracking_mode="current_only",
    business_keys=["customer_id"],
)
def customers_flow():
    return spark.readStream.table("bronze.customers")
# Track history (Type 2)
create_change_tracking_table("catalog.schema.customer_history")

@change_tracking_flow(
    target="catalog.schema.customer_history",
    change_tracking_mode="track_history",
    business_keys=["customer_id"],
    tracked_columns=["name", "email", "tier"],
)
def customer_history_flow():
    return spark.readStream.table("bronze.customers")

Single-decorator pattern

@change_tracking_table combines table creation and flow registration in one decorator:

from spark_fuse.utils import change_tracking_table

@change_tracking_table(
    target="catalog.schema.orders",
    change_tracking_mode="track_history",
    business_keys=["order_id"],
    comment="SCD Type 2 order history",
    table_properties={"delta.enableChangeDataFeed": "true"},
)
def orders():
    return spark.readStream.table("bronze.orders")

Both helpers require PySpark >= 4.0 with pyspark.pipelines present. Calling them in environments without SDP raises an ImportError with a clear message.

Notebook walkthrough