Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions buckaroo/dataflow/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,49 @@ def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]:

_summary_sd_cache_key = (None, None)

def _scope_cache_key(self, chain):
"""Hash that identifies a scope's SD-input identity.

Includes the op chain *and* an identifier for the source
dataframe (``id(sampled_df)``) *and* the post-processing method
— all three are inputs to the scope df, and a cache hit must
mean "same SD-producing inputs" not just "same chain".

- sampled_df identity addresses codex P1 on #783: a ``raw_df``
swap with an unchanged chain must invalidate.
- post_processing_method addresses the
``test_hide_column_config_post_processing`` invariant: when
post-processing replaces the df entirely (e.g. ``hide_post``
→ ``SENTINEL_DF``), the raw scope's SD must reflect that
new df, not the pre-post-processing one.

Lives on ``DataFlow`` rather than ``CustomizableDataflow`` so
``_summary_sd`` (also on ``DataFlow``) can compute the same key
``_populate_sd_cache`` (on ``CustomizableDataflow``) uses,
without a layering wart.
"""
sampled_id = id(self.sampled_df) if self.sampled_df is not None else 0
pp = getattr(self, 'post_processing_method', '') or ''
return hash_chain(chain, extra=f"{sampled_id}|{pp}")

def _current_filt_chain(self):
"""Return the filt-scope chain for the just-set state.

Reads ``self.cleaned[3]`` via the ``merged_operations`` property
rather than ``self.operations`` — at the moment ``_summary_sd``
fires, ``self.operations`` still holds the PRIOR state's chain
(the parent ``_operation_result`` sets ``self.cleaned = result``
BEFORE ``self.operations = result[3]``; see #814). The merged
chain from the result tuple is already current.

Falls back to ``self.operations`` for pre-cascade states (e.g.
an ``analysis_klasses`` change before any cascade has run).
"""
ops = self.merged_operations
if ops is None:
ops = self.operations
return split_chain_by_scope(ops)['filt']

@observe('processed_result', 'analysis_klasses')
@exception_protect('summary_sd-protector')
def _summary_sd(self, change):
Expand All @@ -264,6 +307,22 @@ def _summary_sd(self, change):
if (id(df), id(klasses)) == self._summary_sd_cache_key:
return
self._summary_sd_cache_key = (id(df), id(klasses))
# Cache short-circuit (#814): if ``summary_stats_cache`` already
# holds an entry for the current filt chain, reuse it instead of
# paying ~300ms+ to re-run the full analysis pipeline. ~half of
# state_change latency on xorq backends is this recompute.
#
# Key off ``_current_filt_chain()`` — which reads from
# ``self.cleaned[3]`` (already the new chain), NOT
# ``self.operations`` (still the prior chain at this point in
# the cascade). Keying off ``self.operations`` is the bug the
# original lookup-removal (5bc7bbfb) was guarding against.
filt_key = self._scope_cache_key(self._current_filt_chain())
cache = self.summary_stats_cache or {}
if filt_key in cache:
self.summary_sd = cache[filt_key]
self.errs = {}
return
result_summary_sd, errs = self._get_summary_sd(df)
self.summary_sd = result_summary_sd
self.errs = errs
Expand Down Expand Up @@ -438,7 +497,17 @@ def _merged_sd(self, change):
# off ``filt_sd_key != raw_sd_key`` would also fire for
# cleaning-only states, mislabelling cleaned stats as filtered
# until the deferred ``cleaned_*`` scope lands.
chains = split_chain_by_scope(self.operations)
#
# Read the chain from ``self.merged_operations`` (== freshly-set
# ``self.cleaned[3]``) rather than ``self.operations``: this
# observer fires on ``summary_sd``, which is set BEFORE the
# parent ``_operation_result`` reaches ``self.operations =
# result[3]``. ``self.operations`` is therefore the PRIOR
# state's chain at first-fire time. See #814.
ops = self.merged_operations
if ops is None:
ops = self.operations
chains = split_chain_by_scope(ops)
filter_active = chains['filt'] != chains['clean']

if self.processed_df is None:
Expand Down Expand Up @@ -500,29 +569,6 @@ def _compute_scope_df(self, scope: str):
return base
return pp_result[0] if pp_result else base

def _scope_cache_key(self, chain):
"""Hash that identifies a scope's SD-input identity.

Includes the op chain *and* an identifier for the source
dataframe (``id(sampled_df)``) *and* the post-processing method
— all three are inputs to the scope df, and a cache hit must
mean "same SD-producing inputs" not just "same chain".

- sampled_df identity addresses codex P1 on #783: a ``raw_df``
swap with an unchanged chain must invalidate.
- post_processing_method addresses the
``test_hide_column_config_post_processing`` invariant: when
post-processing replaces the df entirely (e.g. ``hide_post``
→ ``SENTINEL_DF``), the raw scope's SD must reflect that
new df, not the pre-post-processing one.

analysis_klasses is *not* included here; that's a separate
invariant (codex P2, deferred — see follow-up issue).
"""
sampled_id = id(self.sampled_df) if self.sampled_df is not None else 0
pp = self.post_processing_method or ''
return hash_chain(chain, extra=f"{sampled_id}|{pp}")

@observe('summary_sd', 'operations', 'analysis_klasses')
@exception_protect('sd-cache-protector')
def _populate_sd_cache(self, _change):
Expand All @@ -540,14 +586,22 @@ def _populate_sd_cache(self, _change):
no second pass through the analysis pipeline. Raw and clean
scopes recompute, but only on cache miss.

Reads the chain from ``self.merged_operations`` (== freshly-set
``self.cleaned[3]``) so this observer sees the new chain even
when fired through the ``summary_sd`` path that runs before
``self.operations = result[3]`` lands. See #814.

Cache stores in-process dicts (not the parquet-b64 wire form);
``_merged_sd`` reads them directly. The cache + pointer traits
are un-synced — the frontend consumes only the merged
prefixed-key ``merged_sd``.
"""
if self.processed_df is None:
return
chains = split_chain_by_scope(self.operations)
ops = self.merged_operations
if ops is None:
ops = self.operations
chains = split_chain_by_scope(ops)
keys = {scope: self._scope_cache_key(chain)
for scope, chain in chains.items()}
new_cache = dict(self.summary_stats_cache)
Expand Down
141 changes: 141 additions & 0 deletions tests/unit/dataflow/sd_cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DropCol, FillNA, GroupBy, NoOp, SafeInt, Search)
from buckaroo.customizations.pd_autoclean_conf import NoCleaningConf
from buckaroo.dataflow.autocleaning import AutocleaningConfig, PandasAutocleaning
from buckaroo.dataflow.dataflow import CustomizableDataflow, StylingAnalysis
from buckaroo.dataflow.sd_cache import hash_chain, split_chain_by_scope
from buckaroo.jlisp.lisp_utils import s, sA, sQ
from buckaroo.pluggable_analysis_framework.col_analysis import ColAnalysis
Expand Down Expand Up @@ -117,3 +118,143 @@ def test_filter_flip_only_grows_filt_entry(dirty_df):
assert len(df.summary_stats_cache) == cache_size_before + 1
assert raw_before in df.summary_stats_cache
assert clean_before in df.summary_stats_cache


class _CountingDataflow(CustomizableDataflow):
"""CustomizableDataflow subclass that records every ``_get_summary_sd``
call by the row-count of the df it was passed.

Use this to assert that ``_summary_sd`` (which calls
``_get_summary_sd`` on ``processed_df``) hits the cache on a
warm-cache state_change instead of recomputing. The raw/clean
scopes are populated through a separate call path inside
``_populate_sd_cache`` — those calls also land here but are easy
to distinguish by their row-count (raw/clean run on the full
``sampled_df``, the filt scope runs on the filtered
``processed_df``).
"""
autocleaning_klass = PandasAutocleaning
autoclean_conf = tuple([_Conf, NoCleaningConf])
analysis_klasses = [StylingAnalysis, DefaultSummaryStats]

def __init__(self, *args, **kwargs):
self.summary_sd_calls = []
super().__init__(*args, **kwargs)

def _get_summary_sd(self, df):
try:
self.summary_sd_calls.append(len(df))
except Exception:
self.summary_sd_calls.append(-1)
return super()._get_summary_sd(df)


def test_warm_filt_cache_skips_get_summary_sd_on_state_change(dirty_df):
"""Issue #814 regression.

A state_change that re-applies a previously-computed filter must
NOT call ``_get_summary_sd`` through ``_summary_sd`` again — the
filt scope's cached entry from the first application must be
reused.

Cycle: filter=abc → clear → filter=abc. The third state-change
must not run ``_get_summary_sd`` on any df with the filt scope's
row count (the only "new compute" the cache is supposed to skip).

Currently fails because ``_summary_sd`` reads ``self.operations``
for its cache key, but ``self.operations`` is the PRIOR state's
chain at the moment ``_summary_sd`` fires (during
``self.cleaned = result`` — before
``self.operations = result[3]``). So the cache lookup never sees
the new chain's entry — actually, today there is no cache lookup
at all, so the call always happens. The fix re-introduces the
lookup but keys it off ``self.merged_operations`` (== the freshly
set ``self.cleaned[3]``) so the right entry is found.
"""
dfc = _CountingDataflow(dirty_df, debug=False)

# Apply filter the first time — populates filt_key_abc.
dfc.quick_command_args = {'search': ['10']}
filt_rows_first_apply = len(dfc.processed_df)
raw_rows = len(dfc.sampled_df)
assert filt_rows_first_apply < raw_rows, (
"precondition: search should have reduced rows"
)

# Clear filter — back to empty-filter chain (cache hit from init).
dfc.quick_command_args = {}

calls_before_replay = list(dfc.summary_sd_calls)

# Replay the same filter. This MUST be a cache hit in _summary_sd —
# no _get_summary_sd call on the filtered (smaller-row) df.
dfc.quick_command_args = {'search': ['10']}

new_calls = dfc.summary_sd_calls[len(calls_before_replay):]
filt_scope_calls = [n for n in new_calls if n == filt_rows_first_apply]
assert filt_scope_calls == [], (

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Mark regression test expected-fail or include fix

This assertion makes the commit fail against its parent because CustomizableDataflow._summary_sd in buckaroo/dataflow/dataflow.py still unconditionally calls _get_summary_sd on each processed_result change, so replaying the same filter will record a filtered-scope call and violate filt_scope_calls == []. Since this commit adds only tests and does not include the _summary_sd cache short-circuit (or xfail), it introduces a deterministic red test in CI.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional TDD split — commit 889dc047 is tests-only so the regression assertions get seen red on CI (CI run on that commit confirms test_warm_filt_cache_skips_get_summary_sd_on_state_change failing); the _summary_sd cache short-circuit + the _current_filt_chain() / _scope_cache_key move land in 1fee1722 ("fix(dataflow): cache short-circuit in _summary_sd (#814)"), which is the second commit in this PR.

Project convention (global CLAUDE.md): "NEVER bundle a test with the fix that makes it pass — the test must be seen failing on CI first." xfail would defeat that — we want CI evidence that the failure mode is real, not a marker that says "we expect this to be broken".

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale: comment is about the test-first commit (889dc04), but the PR head (1fee172) IS the fix — it adds the cache short-circuit in _summary_sd. CI is green across all matrices, confirming the regression test passes once the fix is in place.

f"warm-cache filter replay must skip _get_summary_sd for the "
f"filt scope (row-count={filt_rows_first_apply}). Saw "
f"{len(filt_scope_calls)} call(s) — _summary_sd missed the "
f"cache and recomputed. New calls in this state_change: "
f"{new_calls}."
)


def test_summary_sd_uses_new_state_chain_not_prior():
"""Regression for the cascade-ordering bug that motivated the
original removal of the ``_summary_sd`` cache lookup
(commit 5bc7bbfb).

If ``_summary_sd`` keys off ``self.operations`` instead of the
fresh chain in ``self.cleaned[3]``, then on a state_change the
cached entry it returns corresponds to the PRIOR state's chain —
so ``summary_sd`` ends up labelled with the new state but holds
the prior state's data.

Construct the mislabel scenario:
1. Apply search 'foo' — populates filt_key_FOO with SD_foo
(computed on 3 'foo' rows).
2. Apply search 'bar' — populates filt_key_BAR with SD_bar
(computed on 1 'bar' row).

After step 2, the filt cache slot MUST hold SD_bar. If the bug
were present, the cache lookup at step 2 would key off the prior
state's chain (still in ``self.operations``), find filt_key_FOO,
and assign that to ``summary_sd`` — which ``_populate_sd_cache``
would then write under filt_key_BAR. Reading filt_key_BAR back
would yield 3-row stats, not 1-row.
"""
df = pd.DataFrame({'a': [10, 20, 30, 40, 50],
'b': ['foo', 'bar', 'foo', 'baz', 'foo']})
dfc = _CountingDataflow(df, debug=False)

dfc.quick_command_args = {'search': ['foo']}
foo_rows = len(dfc.processed_df)
assert foo_rows == 3, (
f"precondition: search 'foo' should match 3 rows, got {foo_rows}"
)

dfc.quick_command_args = {'search': ['bar']}
bar_rows = len(dfc.processed_df)
assert bar_rows == 1, (
f"precondition: search 'bar' should match 1 row, got {bar_rows}"
)

cached_filt = dfc.summary_stats_cache[dfc.filt_sd_key]
assert cached_filt is not None
# The processed_df has 1 row; any column-level length stat should
# reflect that.
saw_length_stat = False
for col, stats in cached_filt.items():
if 'length' in stats:
saw_length_stat = True
assert stats['length'] == bar_rows, (
f"cached filt SD for column {col!r} reports length="
f"{stats['length']}; expected {bar_rows} (current "
f"'bar'-filtered df). A wrong length means _summary_sd "
f"reused the prior state's cache entry."
)
assert saw_length_stat, (
"precondition: at least one column should have a `length` stat"
)
Loading