Skip to content

Change Tracking Utilities

ChangeTrackingMode

Bases: str, Enum

Accepted values for change_tracking_mode.

  • CURRENT_ONLY – keep exactly one current row per key (Type 1 semantics).
  • TRACK_HISTORY – create new versions + close previous ones (Type 2 semantics).

ChangeTrackingWriteBuilder

ChangeTrackingWriteBuilder(df: DataFrame)

Thin wrapper that applies change-tracking semantics via df.write.change_tracking.

Source code in src/spark_fuse/utils/change_tracking.py
137
138
139
140
def __init__(self, df: DataFrame):
    self._df = df
    self._spark = df.sparkSession
    self._options: Dict[str, Any] = {}

table

table(name: str, verbose: bool = False, **options: Any) -> None

Write to target table with change tracking semantics.

Parameters:

Name Type Description Default
name str

Target table name or Delta path.

required
verbose bool

When True, log operational details at INFO level.

False
**options Any

Additional change tracking options.

{}
Source code in src/spark_fuse/utils/change_tracking.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def table(self, name: str, verbose: bool = False, **options: Any) -> None:
    """Write to target table with change tracking semantics.

    Args:
        name: Target table name or Delta path.
        verbose: When True, log operational details at INFO level.
        **options: Additional change tracking options.
    """
    if options:
        self.options(**options)
    # Use a copy so that downstream mutations do not affect stored state.
    payload = dict(self._options)
    try:
        apply_change_tracking_from_options(
            spark=self._spark, source_df=self._df, target=name, options=payload, verbose=verbose
        )
    finally:
        self.clear()

apply_change_tracking

apply_change_tracking(spark: SparkSession, source_df: DataFrame, target: str, *, change_tracking_mode: Union[ChangeTrackingMode, str, int], verbose: bool = False, **kwargs: Any) -> None

Unified entry point for change-tracking writes.

change_tracking_mode accepts: - :class:ChangeTrackingMode - "current_only" / "track_history" - 1 / 2 (handy when passing options via strings)

Parameters:

Name Type Description Default
spark SparkSession

SparkSession used to read/write Delta tables.

required
source_df DataFrame

DataFrame containing incoming records.

required
target str

Unity Catalog table name or Delta path.

required
change_tracking_mode Union[ChangeTrackingMode, str, int]

The change tracking mode to use.

required
verbose bool

When True, log operational details at INFO level.

False
**kwargs Any

Additional arguments passed to the underlying upsert function.

{}
Source code in src/spark_fuse/utils/change_tracking.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
def apply_change_tracking(
    spark: SparkSession,
    source_df: DataFrame,
    target: str,
    *,
    change_tracking_mode: Union[ChangeTrackingMode, str, int],
    verbose: bool = False,
    **kwargs: Any,
) -> None:
    """Unified entry point for change-tracking writes.

    ``change_tracking_mode`` accepts:
      - :class:`ChangeTrackingMode`
      - ``"current_only"`` / ``"track_history"``
      - ``1`` / ``2`` (handy when passing options via strings)

    Args:
        spark: SparkSession used to read/write Delta tables.
        source_df: DataFrame containing incoming records.
        target: Unity Catalog table name or Delta path.
        change_tracking_mode: The change tracking mode to use.
        verbose: When ``True``, log operational details at INFO level.
        **kwargs: Additional arguments passed to the underlying upsert function.
    """
    resolved = _resolve_mode(change_tracking_mode)
    if resolved == ChangeTrackingMode.CURRENT_ONLY:
        return current_only_upsert(spark, source_df, target, verbose=verbose, **kwargs)
    else:
        return track_history_upsert(spark, source_df, target, verbose=verbose, **kwargs)

apply_change_tracking_from_options

apply_change_tracking_from_options(spark: SparkSession, source_df: DataFrame, target: str, *, options: Mapping[str, Any], verbose: bool = False) -> None

Route the write based on df.write-style options.

Used by :class:ChangeTrackingWriteBuilder and data-frame accessors.

Expected keys
  • change_tracking_mode: accepts 1/2 or current_only/track_history.
  • Strategy options via current_only_options/track_history_options or the generic change_tracking_options key.

Parameters:

Name Type Description Default
spark SparkSession

SparkSession used to read/write Delta tables.

required
source_df DataFrame

DataFrame containing incoming records.

required
target str

Unity Catalog table name or Delta path.

required
options Mapping[str, Any]

Options mapping containing change_tracking_mode and optional kwargs.

required
verbose bool

When True, log operational details at INFO level.

False
Source code in src/spark_fuse/utils/change_tracking.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def apply_change_tracking_from_options(
    spark: SparkSession,
    source_df: DataFrame,
    target: str,
    *,
    options: Mapping[str, Any],
    verbose: bool = False,
) -> None:
    """Route the write based on ``df.write``-style options.

    Used by :class:`ChangeTrackingWriteBuilder` and data-frame accessors.

    Expected keys:
      - ``change_tracking_mode``: accepts ``1``/``2`` or ``current_only``/``track_history``.
      - Strategy options via ``current_only_options``/``track_history_options`` or the generic
        ``change_tracking_options`` key.

    Args:
        spark: SparkSession used to read/write Delta tables.
        source_df: DataFrame containing incoming records.
        target: Unity Catalog table name or Delta path.
        options: Options mapping containing change_tracking_mode and optional kwargs.
        verbose: When True, log operational details at INFO level.
    """
    change_tracking_mode, change_tracking_kwargs = _extract_tracking_kwargs_from_options(options)
    # Extract verbose from kwargs if present (for backwards compatibility with options dict)
    verbose = change_tracking_kwargs.pop("verbose", verbose)
    if change_tracking_mode == ChangeTrackingMode.CURRENT_ONLY:
        current_only_upsert(spark, source_df, target, verbose=verbose, **change_tracking_kwargs)
    else:
        track_history_upsert(spark, source_df, target, verbose=verbose, **change_tracking_kwargs)

change_tracking_writer

change_tracking_writer(df: DataFrame) -> ChangeTrackingWriteBuilder

Return a builder that mirrors DataFrameWriter but routes to change-tracking helpers.

Source code in src/spark_fuse/utils/change_tracking.py
183
184
185
186
def change_tracking_writer(df: DataFrame) -> ChangeTrackingWriteBuilder:
    """Return a builder that mirrors ``DataFrameWriter`` but routes to change-tracking helpers."""

    return ChangeTrackingWriteBuilder(df)

current_only_upsert

current_only_upsert(spark: SparkSession, source_df: DataFrame, target: str, *, business_keys: Sequence[str], tracked_columns: Optional[Iterable[str]] = None, dedupe_keys: Optional[Sequence[str]] = None, order_by: Optional[Sequence[str]] = None, hash_col: str = 'row_hash', null_key_policy: str = 'error', create_if_not_exists: bool = True, allow_schema_evolution: bool = False, verbose: bool = False) -> None

Implement :class:ChangeTrackingMode.CURRENT_ONLY.

Keeps exactly one active row per business_keys combination by:

  1. De-duplicating within the incoming batch using dedupe_keys/order_by.
  2. Hashing the tracked columns to detect changes.
  3. Running a Delta MERGE that updates only when the row changed and inserts when missing.

When allow_schema_evolution is True we temporarily enable Delta auto-merge so new source columns are added to the target on demand.

Parameters:

Name Type Description Default
spark SparkSession

SparkSession used to read/write Delta tables.

required
source_df DataFrame

DataFrame containing incoming records.

required
target str

Unity Catalog table name or Delta path.

required
business_keys Sequence[str]

Columns that uniquely identify an entity (merge condition).

required
tracked_columns Optional[Iterable[str]]

Columns whose changes trigger an update. Defaults to all non-key columns.

None
dedupe_keys Optional[Sequence[str]]

Columns used to de-duplicate input before merge. Defaults to business_keys.

None
order_by Optional[Sequence[str]]

Columns used to choose the most recent record per dedupe_keys.

None
hash_col str

Name of the hash column used to detect row changes.

'row_hash'
null_key_policy str

Policy for null business keys. Either "error" or "drop".

'error'
create_if_not_exists bool

When True, create the target table if it does not exist.

True
allow_schema_evolution bool

When True, enable Delta schema evolution for new columns.

False
verbose bool

When True, log operational details at INFO level.

False
Source code in src/spark_fuse/utils/change_tracking.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def current_only_upsert(
    spark: SparkSession,
    source_df: DataFrame,
    target: str,
    *,
    business_keys: Sequence[str],
    tracked_columns: Optional[Iterable[str]] = None,
    dedupe_keys: Optional[Sequence[str]] = None,
    order_by: Optional[Sequence[str]] = None,
    hash_col: str = "row_hash",
    null_key_policy: str = "error",  # "error" | "drop"
    create_if_not_exists: bool = True,
    allow_schema_evolution: bool = False,
    verbose: bool = False,
) -> None:
    """Implement :class:`ChangeTrackingMode.CURRENT_ONLY`.

    Keeps exactly one active row per ``business_keys`` combination by:

    1. De-duplicating within the incoming batch using ``dedupe_keys``/``order_by``.
    2. Hashing the tracked columns to detect changes.
    3. Running a Delta ``MERGE`` that updates only when the row changed and inserts when missing.

    When ``allow_schema_evolution`` is ``True`` we temporarily enable Delta auto-merge so new source
    columns are added to the target on demand.

    Args:
        spark: SparkSession used to read/write Delta tables.
        source_df: DataFrame containing incoming records.
        target: Unity Catalog table name or Delta path.
        business_keys: Columns that uniquely identify an entity (merge condition).
        tracked_columns: Columns whose changes trigger an update. Defaults to all non-key columns.
        dedupe_keys: Columns used to de-duplicate input before merge. Defaults to ``business_keys``.
        order_by: Columns used to choose the most recent record per ``dedupe_keys``.
        hash_col: Name of the hash column used to detect row changes.
        null_key_policy: Policy for null business keys. Either ``"error"`` or ``"drop"``.
        create_if_not_exists: When ``True``, create the target table if it does not exist.
        allow_schema_evolution: When ``True``, enable Delta schema evolution for new columns.
        verbose: When ``True``, log operational details at INFO level.
    """
    _configure_verbose(verbose)

    if not business_keys:
        raise ValueError("business_keys must be a non-empty sequence")

    src_cols_set = set(source_df.columns)
    missing_keys = [k for k in business_keys if k not in src_cols_set]
    if missing_keys:
        raise ValueError(f"source_df missing business_keys: {missing_keys}")

    if tracked_columns is None:
        tracked_columns = [c for c in source_df.columns if c not in set(business_keys)]
    else:
        missing_tracked = [c for c in tracked_columns if c not in src_cols_set]
        if missing_tracked:
            raise ValueError(f"tracked_columns not in source_df: {missing_tracked}")

    # Null key policy
    key_cond = None
    for k in business_keys:
        key_cond = F.col(k).isNotNull() if key_cond is None else (key_cond & F.col(k).isNotNull())
    if null_key_policy == "drop":
        source_df = source_df.where(key_cond)
    elif null_key_policy == "error":
        null_cnt = source_df.where(~key_cond).limit(1).count()
        if null_cnt:
            raise ValueError(
                "Null business key encountered in source_df; set null_key_policy='drop' to drop them."
            )
    else:
        raise ValueError("null_key_policy must be 'error' or 'drop'")

    # De-dup
    if dedupe_keys is None:
        dedupe_keys = list(business_keys)

    if order_by and len(order_by) > 0:
        w = Window.partitionBy(*[F.col(k) for k in dedupe_keys]).orderBy(
            *[F.col(c).desc_nulls_last() for c in order_by]
        )
        source_df = (
            source_df.withColumn("__rn", F.row_number().over(w))
            .where(F.col("__rn") == 1)
            .drop("__rn")
        )
    else:
        source_df = source_df.dropDuplicates(list(dedupe_keys))

    # Log configuration summary
    _LOGGER.info(
        "current_only_upsert: target=%s, business_keys=%s, tracked_columns=%s, hash_col=%s",
        target,
        business_keys,
        list(tracked_columns),
        hash_col,
    )

    # Count source rows after dedup
    source_row_count = source_df.count()
    _LOGGER.info("Source rows after deduplication: %d", source_row_count)

    # Hash tracked columns
    hash_expr_inputs = [_coalesce_cast_to_string(F.col(c)) for c in tracked_columns]
    row_hash_expr = F.sha2(F.concat_ws(UNIT_SEPARATOR, *hash_expr_inputs), 256)
    src_hashed = source_df.withColumn(hash_col, row_hash_expr)

    # Create target if missing
    target_exists = True
    try:
        _delta_table(spark, target)
    except Exception:
        target_exists = False

    if not target_exists:
        if not create_if_not_exists:
            raise ValueError(f"Target '{target}' does not exist and create_if_not_exists=False")
        _LOGGER.info("Target '%s' does not exist, creating new table", target)
        _write_append(src_hashed, target, merge_schema=allow_schema_evolution)
        return

    _LOGGER.info("Target '%s' exists, executing merge", target)

    # MERGE: update when changed, insert when new
    dt = _delta_table(spark, target)

    cond_keys_sql = " AND ".join([f"t.`{k}` <=> s.`{k}`" for k in business_keys])

    target_cols = set(_read_target_df(spark, target).columns)
    # Ensure target has hash column (add ephemeral compare if absent)
    if hash_col in target_cols:
        change_cond_sql = f"NOT (t.`{hash_col}` <=> s.`{hash_col}`)"
    else:
        change_cond_sql = (
            " OR ".join([f"NOT (t.`{c}` <=> s.`{c}`)" for c in tracked_columns]) or "false"
        )

    # Build column maps (exclude technical track-history columns if they exist in target)
    src_cols = src_hashed.columns
    # If target has history fields, do not attempt to write them during current-only merges
    history_fields = {"effective_start_ts", "effective_end_ts", "is_current", "version"}
    write_cols = [c for c in src_cols if c not in history_fields]

    set_map = {c: F.col(f"s.`{c}`") for c in write_cols}
    insert_map = {c: F.col(f"s.`{c}`") for c in write_cols}

    with _temporarily_enable_automerge(spark, allow_schema_evolution):
        (
            dt.alias("t")
            .merge(
                src_hashed.alias("s"),
                cond_keys_sql,
            )
            .whenMatchedUpdate(
                condition=F.expr(change_cond_sql),
                set=set_map,
            )
            .whenNotMatchedInsert(values=insert_map)
            .execute()
        )

    _LOGGER.info("Merge executed on '%s'", target)

enable_change_tracking_accessors

enable_change_tracking_accessors(*, force: bool = False) -> None

Attach .change_tracking helpers to DataFrame/DataFrameWriter/DataStreamWriter.

Once enabled (this module runs it on import), df.change_tracking, df.write.change_tracking, and df.writeStream.change_tracking are available. Batch accessors expose :class:ChangeTrackingWriteBuilder; the streaming accessor exposes :class:~spark_fuse.utils.change_tracking_streaming.StreamingChangeTrackingBuilder.

Parameters:

Name Type Description Default
force bool

When True re-installs the accessors even if they already exist.

False
Source code in src/spark_fuse/utils/change_tracking.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def enable_change_tracking_accessors(*, force: bool = False) -> None:
    """Attach ``.change_tracking`` helpers to ``DataFrame``/``DataFrameWriter``/``DataStreamWriter``.

    Once enabled (this module runs it on import), ``df.change_tracking``,
    ``df.write.change_tracking``, and ``df.writeStream.change_tracking`` are available.
    Batch accessors expose :class:`ChangeTrackingWriteBuilder`; the streaming accessor exposes
    :class:`~spark_fuse.utils.change_tracking_streaming.StreamingChangeTrackingBuilder`.

    Args:
        force: When ``True`` re-installs the accessors even if they already exist.
    """

    df_attrs = getattr(DataFrame, "__dict__", {})
    dfw_attrs = getattr(DataFrameWriter, "__dict__", {})

    def _df_change_tracking(self: DataFrame) -> ChangeTrackingWriteBuilder:
        return change_tracking_writer(self)

    def _dfw_change_tracking(self: DataFrameWriter) -> ChangeTrackingWriteBuilder:
        return change_tracking_writer(self._df)

    if force or "change_tracking" not in df_attrs:
        DataFrame.change_tracking = property(_df_change_tracking)  # type: ignore[attr-defined]
    if force or "change_tracking" not in dfw_attrs:
        DataFrameWriter.change_tracking = property(_dfw_change_tracking)  # type: ignore[attr-defined]

    try:
        from pyspark.sql.streaming.readwriter import DataStreamWriter

        from .change_tracking_streaming import streaming_change_tracking_writer

        dsw_attrs = getattr(DataStreamWriter, "__dict__", {})

        def _dsw_change_tracking(self: DataStreamWriter):  # type: ignore[type-arg]
            return streaming_change_tracking_writer(self._df)

        if force or "change_tracking" not in dsw_attrs:
            DataStreamWriter.change_tracking = property(_dsw_change_tracking)  # type: ignore[attr-defined]
    except ImportError:
        pass

track_history_upsert

track_history_upsert(spark: SparkSession, source_df: DataFrame, target: str, *, business_keys: Sequence[str], tracked_columns: Optional[Iterable[str]] = None, dedupe_keys: Optional[Sequence[str]] = None, order_by: Optional[Sequence[str]] = None, effective_col: str = 'effective_start_ts', expiry_col: str = 'effective_end_ts', current_col: str = 'is_current', version_col: str = 'version', hash_col: str = 'row_hash', load_ts_expr: Optional[Union[str, Column]] = None, default_expiry_value: Optional[Union[str, Column]] = None, null_key_policy: str = 'error', create_if_not_exists: bool = True, allow_schema_evolution: bool = False, verbose: bool = False) -> None

Implement :class:ChangeTrackingMode.TRACK_HISTORY.

Two-step algorithm

1) Close currently active target rows when incoming record for same key has a different row hash. 2) Insert new active rows for new keys or changed keys with incremented version.

Parameters:

Name Type Description Default
spark SparkSession

SparkSession used to read/write Delta tables.

required
source_df DataFrame

DataFrame containing incoming records. Can include duplicates; these are de-duplicated by dedupe_keys, keeping the latest row by order_by.

required
target str

Unity Catalog table name (for example, catalog.schema.table) or Delta path (for example, dbfs:/path).

required
business_keys Sequence[str]

Columns that uniquely identify an entity (merge condition).

required
tracked_columns Optional[Iterable[str]]

Columns whose changes trigger a new version. Defaults to all non-key, non-metadata columns.

None
dedupe_keys Optional[Sequence[str]]

Columns used to de-duplicate input before merge. Defaults to business_keys.

None
order_by Optional[Sequence[str]]

Columns used to choose the most recent record per dedupe_keys. Highest values win.

None
effective_col str

Name of the effective-start timestamp column in the target dataset.

'effective_start_ts'
expiry_col str

Name of the effective-end timestamp column in the target dataset.

'effective_end_ts'
current_col str

Name of the boolean "is current" column in the target dataset.

'is_current'
version_col str

Name of the version column in the target dataset.

'version'
hash_col str

Name of the hash column used to detect row changes.

'row_hash'
load_ts_expr Optional[Union[str, Column]]

PySpark Column or SQL expression string that provides the effective-start Timestamp to use for effective_start_ts. Accepts a PySpark Column or a SQL expression string (e.g., "current_timestamp()" or "to_timestamp('2020-01-01 00:00:00')"). Defaults to current_timestamp().

None
default_expiry_value Optional[Union[str, Column]]

Value to use for expiry_col on open/current rows. Accepts a PySpark Column or a string that will be cast to timestamp (e.g., "9999-12-31"). Defaults to None (i.e., NULL).

None
null_key_policy str

Policy for null business keys in source_df. Either "error" (default) or "drop".

'error'
create_if_not_exists bool

When True (default), create the target table if it does not exist.

True
allow_schema_evolution bool

When True, append operations use Delta schema evolution so new columns added to the source DataFrame are automatically added to the target table. Only affects write paths (initial bootstrap + inserts).

False
verbose bool

When True, log operational details at INFO level.

False
Source code in src/spark_fuse/utils/change_tracking.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def track_history_upsert(
    spark: SparkSession,
    source_df: DataFrame,
    target: str,
    *,
    business_keys: Sequence[str],
    tracked_columns: Optional[Iterable[str]] = None,
    dedupe_keys: Optional[Sequence[str]] = None,
    order_by: Optional[Sequence[str]] = None,
    effective_col: str = "effective_start_ts",
    expiry_col: str = "effective_end_ts",
    current_col: str = "is_current",
    version_col: str = "version",
    hash_col: str = "row_hash",
    load_ts_expr: Optional[Union[str, F.Column]] = None,
    default_expiry_value: Optional[Union[str, F.Column]] = None,
    null_key_policy: str = "error",  # "error" | "drop"
    create_if_not_exists: bool = True,
    allow_schema_evolution: bool = False,
    verbose: bool = False,
) -> None:
    """Implement :class:`ChangeTrackingMode.TRACK_HISTORY`.

    Two-step algorithm:
      1) Close currently active target rows when incoming record for same key has a different row hash.
      2) Insert new active rows for new keys or changed keys with incremented version.

    Args:
        spark: SparkSession used to read/write Delta tables.
        source_df: DataFrame containing incoming records. Can include duplicates; these are
            de-duplicated by `dedupe_keys`, keeping the latest row by `order_by`.
        target: Unity Catalog table name (for example, ``catalog.schema.table``) or Delta path
            (for example, ``dbfs:/path``).
        business_keys: Columns that uniquely identify an entity (merge condition).
        tracked_columns: Columns whose changes trigger a new version. Defaults to all non-key,
            non-metadata columns.
        dedupe_keys: Columns used to de-duplicate input before merge. Defaults to ``business_keys``.
        order_by: Columns used to choose the most recent record per ``dedupe_keys``. Highest values
            win.
        effective_col: Name of the effective-start timestamp column in the target dataset.
        expiry_col: Name of the effective-end timestamp column in the target dataset.
        current_col: Name of the boolean "is current" column in the target dataset.
        version_col: Name of the version column in the target dataset.
        hash_col: Name of the hash column used to detect row changes.
        load_ts_expr: PySpark Column or SQL expression string that provides the effective-start
            Timestamp to use for `effective_start_ts`. Accepts a PySpark Column or a SQL
            expression string (e.g., "current_timestamp()" or "to_timestamp('2020-01-01 00:00:00')").
            Defaults to `current_timestamp()`.
        default_expiry_value: Value to use for `expiry_col` on open/current rows. Accepts a
            PySpark Column or a string that will be cast to timestamp (e.g., ``"9999-12-31"``).
            Defaults to ``None`` (i.e., ``NULL``).
        null_key_policy: Policy for null business keys in ``source_df``. Either ``"error"`` (default)
            or ``"drop"``.
        create_if_not_exists: When ``True`` (default), create the target table if it does not exist.
        allow_schema_evolution: When ``True``, append operations use Delta schema evolution so new
            columns added to the source DataFrame are automatically added to the target table. Only
            affects write paths (initial bootstrap + inserts).
        verbose: When ``True``, log operational details at INFO level.
    """
    _configure_verbose(verbose)

    if not business_keys:
        raise ValueError("business_keys must be a non-empty sequence")

    # Normalize/validate columns
    src_cols_set = set(source_df.columns)

    missing_keys = [k for k in business_keys if k not in src_cols_set]
    if missing_keys:
        raise ValueError(f"source_df missing business_keys: {missing_keys}")

    tracking_meta = {effective_col, expiry_col, current_col, version_col, hash_col}
    if tracked_columns is None:
        tracked_columns = [
            c for c in source_df.columns if c not in set(business_keys) | tracking_meta
        ]
    else:
        missing_tracked = [c for c in tracked_columns if c not in src_cols_set]
        if missing_tracked:
            raise ValueError(f"tracked_columns not in source_df: {missing_tracked}")

    # Drop or error on null keys in source
    key_cond = None
    for k in business_keys:
        key_cond = F.col(k).isNotNull() if key_cond is None else (key_cond & F.col(k).isNotNull())
    if null_key_policy == "drop":
        source_df = source_df.where(key_cond)
    elif null_key_policy == "error":
        null_cnt = source_df.where(~key_cond).limit(1).count()
        if null_cnt:
            raise ValueError(
                "Null business key encountered in source_df; set null_key_policy='drop' to drop them."
            )
    else:
        raise ValueError("null_key_policy must be 'error' or 'drop'")

    # Partition incoming data by dedupe_keys so we can process one row per key in each pass.
    if dedupe_keys is None:
        dedupe_keys = list(business_keys)

    if order_by:
        w = Window.partitionBy(*[F.col(k) for k in dedupe_keys]).orderBy(
            *[F.col(c).desc_nulls_last() for c in order_by]
        )
        source_df = source_df.withColumn(CHANGE_TRACKING_SEQUENCE_COL, F.row_number().over(w))
    else:
        source_df = source_df.dropDuplicates(list(dedupe_keys)).withColumn(
            CHANGE_TRACKING_SEQUENCE_COL, F.lit(1)
        )

    # Log configuration summary
    _LOGGER.info(
        "track_history_upsert: target=%s, business_keys=%s, tracked_columns=%s, "
        "effective_col=%s, expiry_col=%s, current_col=%s, version_col=%s, hash_col=%s",
        target,
        business_keys,
        list(tracked_columns),
        effective_col,
        expiry_col,
        current_col,
        version_col,
        hash_col,
    )

    # Compute deterministic row hash over tracked columns
    hash_expr_inputs = [_coalesce_cast_to_string(F.col(c)) for c in tracked_columns]
    row_hash_expr = F.sha2(
        F.concat_ws(UNIT_SEPARATOR, *hash_expr_inputs), 256
    )  # unit separator as delimiter
    source_hashed = source_df.withColumn(hash_col, row_hash_expr)

    # Accept either a Column or a SQL string for load timestamp
    if load_ts_expr is None:
        ts_col = F.current_timestamp()
    elif isinstance(load_ts_expr, str):
        ts_col = F.expr(load_ts_expr)
    else:
        ts_col = load_ts_expr

    # Resolve the open-row expiry value
    if default_expiry_value is None:
        open_expiry = F.lit(None).cast("timestamp")
    elif isinstance(default_expiry_value, str):
        open_expiry = F.lit(default_expiry_value).cast("timestamp")
    else:
        open_expiry = default_expiry_value

    # Determine merge condition and columns used for writing.
    cond_keys_sql = " AND ".join([f"t.`{k}` <=> s.`{k}`" for k in business_keys])
    # Does target exist?
    target_exists = True
    try:
        _delta_table(spark, target)
    except Exception:
        target_exists = False

    _LOGGER.info("Target '%s' %s", target, "exists" if target_exists else "does not exist")

    # Split into per-rank batches (rank 1 == latest). Process from oldest -> newest.
    should_cache = bool(order_by)
    if should_cache:
        source_hashed = source_hashed.cache()

    max_seq_val = source_hashed.agg(
        F.max(F.col(CHANGE_TRACKING_SEQUENCE_COL)).alias("__max_seq")
    ).collect()[0]["__max_seq"]

    if max_seq_val is None:
        if should_cache:
            source_hashed.unpersist()
        return

    # Log source row count and batch count
    _LOGGER.info("Processing %d batches (max_seq=%d)", int(max_seq_val), int(max_seq_val))

    create_flag = create_if_not_exists
    for seq in range(int(max_seq_val), 0, -1):
        _LOGGER.info("Processing batch %d/%d", int(max_seq_val) - seq + 1, int(max_seq_val))
        batch = source_hashed.where(F.col(CHANGE_TRACKING_SEQUENCE_COL) == seq).drop(
            CHANGE_TRACKING_SEQUENCE_COL
        )
        target_exists = _track_history_process_batch(
            spark,
            batch,
            target,
            business_keys=business_keys,
            tracked_columns=tracked_columns,
            effective_col=effective_col,
            expiry_col=expiry_col,
            current_col=current_col,
            version_col=version_col,
            hash_col=hash_col,
            ts_col=ts_col,
            open_expiry=open_expiry,
            cond_keys_sql=cond_keys_sql,
            create_if_not_exists=create_flag,
            target_exists=target_exists,
            allow_schema_evolution=allow_schema_evolution,
            order_by=order_by,
            verbose=verbose,
        )
        create_flag = False

    if should_cache:
        source_hashed.unpersist()