Source code for yaml_provenance._dict

"""
DictWithProvenance — a dictionary subclass with provenance tracking.
"""

import copy

from loguru import logger

from ._config import get_config
from ._exceptions import CategoryConflictError
from ._provenance import Provenance
from ._wrapper import wrapper_with_provenance_factory


[docs] class DictWithProvenance(dict): """ A dictionary subclass that tracks provenance for all nested values. Features: - Recursively transforms leaf values into provenance-aware objects - Extends ``__setitem__`` to preserve provenance history - Optionally enforces category hierarchy when configured - Extends ``update`` to preserve provenance history Parameters ---------- dictionary : dict The dictionary to wrap with provenance. provenance : dict Provenance data with matching structure to ``dictionary``. config : ProvenanceConfig or None Configuration. If ``None``, uses the module-level default. """ def __init__(self, dictionary, provenance, config=None): super().__init__(dictionary) self._config = config or get_config() self.custom_setitem = False self.put_provenance(provenance) self.custom_setitem = True
[docs] def put_provenance(self, provenance): """ Recursively transforms every value into its WithProvenance object with corresponding provenance from the ``provenance`` dict (1-to-1 mapping). Parameters ---------- provenance : dict Provenance dict with same keys as ``self``. """ from ._list import ListWithProvenance for key, val in self.items(): if isinstance(val, dict): self[key] = DictWithProvenance( val, provenance.get(key, {}), config=self._config ) elif isinstance(val, list): self[key] = ListWithProvenance( val, provenance.get(key, []), config=self._config ) elif hasattr(val, "provenance"): self[key].provenance.extend(provenance.get(key, {})) else: self[key] = wrapper_with_provenance_factory( val, provenance.get(key, None) )
[docs] def set_provenance(self, provenance): """ Recursively sets the same ``provenance`` on all nested values. Parameters ---------- provenance : any New provenance value to set. """ from ._list import ListWithProvenance if not isinstance(provenance, list): provenance = [provenance] for key, val in self.items(): if isinstance(val, dict): self[key] = DictWithProvenance(val, {}, config=self._config) self[key].set_provenance(provenance) elif isinstance(val, list): self[key] = ListWithProvenance(val, [], config=self._config) self[key].set_provenance(provenance) elif hasattr(val, "provenance"): self[key].provenance.extend(provenance) else: self[key] = wrapper_with_provenance_factory(val, provenance)
[docs] def get_provenance(self, index=-1): """ Returns a dictionary of provenance information with matching structure. Parameters ---------- index : int Index into the provenance history. Default: ``-1`` (last/current). Returns ------- dict Provenance dictionary. """ from ._list import ListWithProvenance PROVENANCE_MAPPINGS = (DictWithProvenance, ListWithProvenance) provenance_dict = {} for key, val in self.items(): if isinstance(val, PROVENANCE_MAPPINGS): provenance_dict[key] = val.get_provenance(index=index) elif hasattr(val, "provenance"): provenance_dict[key] = val.provenance[index] else: provenance_dict[key] = None return provenance_dict
def _has_real_hierarchy(self): """Check if the config has a non-trivial category hierarchy.""" return len(self._config.category_hierarchy) > 1 def __setitem__(self, key, val): """ Extended ``__setitem__`` that preserves provenance history. When a category hierarchy is configured (more than just ``[None]``), also enforces category-based conflict resolution and hierarchy ordering. Raises ------ CategoryConflictError If values at the same hierarchy level conflict (only when hierarchy is configured and ``on_conflict="raise"``). """ val_new = val config = self._config if ( key in self and not isinstance(self[key], (dict, list)) and hasattr(self[key], "provenance") and hasattr(self, "custom_setitem") and self.custom_setitem ): old_val = self[key] old_prov = old_val.provenance # Capture categories BEFORE extending provenance (extend mutates) if old_prov[-1]: old_category = old_prov[-1].get("category", None) else: old_category = "backend" new_category = None if hasattr(val, "provenance") and val.provenance and val.provenance[-1]: new_category = val.provenance[-1].get("category", None) # new_provenance is the same object as old_prov (a reference) new_provenance = old_prov if hasattr(val, "provenance"): new_provenance.extend_and_modified_by( val.provenance, "dict.__setitem__" ) if self._has_real_hierarchy(): hierarchy = config.category_hierarchy if old_category in hierarchy and new_category in hierarchy: old_idx = hierarchy.index(old_category) new_idx = hierarchy.index(new_category) if old_idx == new_idx and old_val != val: # Same category — conflict if config.conflict_resolver is not None: action = config.conflict_resolver( key, old_val, val, old_prov, new_provenance ) if action == "raise": raise CategoryConflictError( f"Key '{key}' exists at the same hierarchical " f"level ('{old_category}') with different values " f"('{old_val}':'{val}').", key=key, old_val=old_val, new_val=val, category=old_category, old_provenance=old_prov, new_provenance=new_provenance, ) elif action == "keep_old": val_new = copy.deepcopy(old_val) if config.track_history else old_val elif action == "keep_new": val_new = copy.deepcopy(val) if config.track_history else val else: val_new = val elif config.on_conflict == "raise": raise CategoryConflictError( f"Key '{key}' exists at the same hierarchical level " f"('{old_category}') with different values " f"('{old_val}':'{val}').", key=key, old_val=old_val, new_val=val, category=old_category, old_provenance=old_prov, new_provenance=new_provenance, ) elif config.on_conflict == "warn": logger.warning( f"Key '{key}' conflict at level '{old_category}': " f"'{old_val}' -> '{val}'" ) val_new = copy.deepcopy(val) if config.track_history else val else: val_new = copy.deepcopy(val) if config.track_history else val elif old_idx < new_idx or old_val is None: # New category is higher — allow overwrite val_new = copy.deepcopy(val) if config.track_history else val else: # Old category is higher — keep old value val_new = copy.deepcopy(old_val) if config.track_history else old_val new_provenance.extend_and_modified_by( Provenance( {"category": old_category}, track_history=config.track_history, ), "dict.__setitem__->reverted_by_hierarchy", ) logger.trace( f"Value {val} won't be assigned to key {key}, because " f"the old value {old_val} comes from a category higher " f"in the hierarchy ({old_val}:{old_category} > " f"{val}:{new_category})" ) val_new.provenance = new_provenance else: # Simple mode: just extend provenance, no hierarchy checks val_new = copy.deepcopy(val) if config.track_history else val val_new.provenance = new_provenance super().__setitem__(key, val_new)
[docs] def super_setitem(self, key, val): """ Call the original ``dict.__setitem__`` without provenance tracking. """ super().__setitem__(key, val)
[docs] def update(self, dictionary, *args, **kwargs): """ Extends ``dict.update`` to preserve provenance history. Parameters ---------- dictionary : dict Dictionary to update from. """ new_provs = {} for key, val in dictionary.items(): if ( key in self and not isinstance(self[key], (dict, list)) and hasattr(self[key], "provenance") and hasattr(self, "custom_setitem") and self.custom_setitem ): new_provenance = self[key].provenance if hasattr(val, "provenance"): new_provenance.extend_and_modified_by(val.provenance, "dict.update") new_provs[key] = new_provenance super().update(dictionary, *args, **kwargs) for key, val in new_provs.items(): self[key].provenance = val