From 5bb9f23878d29772188e5ce9c7ccdd22d1f336b7 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 2 Jul 2026 12:12:43 +0200 Subject: [PATCH] refactor(plan): collapse Needs typed maps to generic attrCode dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data-structure refactor of preload needs — no semantic change. Before: `plan.Needs` held 13 typed key maps (`Ledgers`, `Boundaries`, `Volumes`, `References`, `Metadata`, `Transactions`, `SinkConfigs`, `NumscriptVersions`, `NumscriptContents`, `PreparedQueries`, `LedgerMetadata`, `Indexes`, `IdempotencyKeys`). Adding an attribute cache meant adding a field on Needs, a case in `Merge()`, a length in `AttributeKeysCount()`, a bit in `coverage_bits`, and a copy of the resolver body under a new K generic. `resolveAttributePreload[K, T]` was instantiated once per attribute type. After: `Needs.Attributes map[byte]*AttributeSet` keyed by `dal.SubAttrX` bytes with a `Keys map[string]struct{}` set of canonical bytes. Admission and the mirror worker use `p.Add(dal.SubAttrX, key.Bytes())` instead of per-type map assignments. Resolver dispatch lives in a new `attribute_resolvers.go` file — one `protoAttrResolver[T]` entry per attribute cache holds the cache/loader/store triple; the builder iterates over the registration table. Two side effects that fall out for free: - Adding a new attribute cache is a one-line entry in `buildAttrResolvers` instead of edits across five files. - Preload declarations no longer scale with the number of typed maps; the resolver is one code path for all attribute caches. Wire compatibility with pre-refactor plans is preserved: `CacheGuaranteed` → `Declare`, `CacheNeedsTouch` → `Touch`, `CacheMiss + bloom/Pebble-absent` → `Declare`, `CacheMiss + Pebble-load-hit` → `Value(v)`. Cache primitives (`AttributeCache.Get` gen0→gen1 fallback, `AttributeCache.Del` strict Gen0 contract) and the FSM Preload switch (`Declare` / `Touch` / `Value` dispatch) stay at `release/v3.0` semantics. Also closes a few pre-existing invariant #6 gaps surfaced by the audit: `DeleteLedger` now preloads the boundary attribute, `DeleteMetadata` / `DeleteLedgerMetadata` / `DeletePreparedQuery` / `DropIndex` / `RemoveMetadataFieldType` / `RemoveEventsSink` / `MirrorIngest.DeletedMetadata` declare coverage for their strict-Del target. These sites all reach `AttributeCache.Del` which requires Gen0 to hold the entry; the missing declarations would have silently no-oped the FSM read and desynced nodes on any concurrent-rotation path. --- internal/application/admission/admission.go | 170 +++++++----- .../application/admission/admission_test.go | 118 +++----- .../admission/wrapper_dispatch_test.go | 95 +++---- internal/application/mirror/worker.go | 44 +-- internal/infra/plan/attribute_resolvers.go | 188 +++++++++++++ internal/infra/plan/builder.go | 261 +++++------------- internal/infra/plan/builder_test.go | 138 ++++++++- internal/infra/plan/coverage_bits.go | 51 +--- internal/infra/plan/coverage_bits_test.go | 36 +-- internal/infra/plan/needs.go | 178 ++++++------ internal/infra/plan/resolve.go | 98 +++---- 11 files changed, 753 insertions(+), 624 deletions(-) create mode 100644 internal/infra/plan/attribute_resolvers.go diff --git a/internal/application/admission/admission.go b/internal/application/admission/admission.go index f155d3a05b..d94c9c489e 100644 --- a/internal/application/admission/admission.go +++ b/internal/application/admission/admission.go @@ -360,12 +360,8 @@ func (a *Admission) Admit(ctx context.Context, req *servicepb.ApplyRequest) ([]* ctx, preloadSpan := tracer.Start(ctx, "admission.preload", trace.WithAttributes( - attribute.Int("preload.ledgers", len(needs.Ledgers)), - attribute.Int("preload.boundaries", len(needs.Boundaries)), - attribute.Int("preload.volumes", len(needs.Volumes)), + attribute.Int("preload.attributes_total", needs.AttributeKeysCount()), attribute.Int("preload.idempotency_keys", len(needs.IdempotencyKeys)), - attribute.Int("preload.references", len(needs.References)), - attribute.Int("preload.metadata", len(needs.Metadata)), )) // Build the per-order WriteOperation slice. Each operation carries @@ -783,19 +779,19 @@ func wrapSystemScoped(order *raftcmdpb.Order, ss *raftcmdpb.SystemScopedOrder) { // (admission contract violation — need never declared) stays distinct // and propagates loud through `ErrStorageOperation{Cause: covErr}`. func addVolumeNeed(p *plan.Needs, ledgerName string, account, asset string) { - p.Volumes[domain.VolumeKey{ + p.Add(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: account}, Asset: asset, - }] = struct{}{} + }.Bytes()) } // addTransactionTargetNeeds preloads the TransactionState entry for a // TargetTransaction so the FSM can read it from cache. func addTransactionTargetNeeds(p *plan.Needs, ledgerName string, txID uint64) { - p.Transactions[domain.TransactionKey{ + p.Add(dal.SubAttrTransaction, domain.TransactionKey{ LedgerName: ledgerName, ID: txID, - }] = struct{}{} + }.Bytes()) } // extractLedgerScopedNeeds populates the preload Needs for a ledger-scoped @@ -805,16 +801,23 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { ledgerName := ls.GetLedger() ledgerKey := domain.LedgerKey{Name: ledgerName} + ledgerBytes := ledgerKey.Bytes() + switch payload := ls.GetPayload().(type) { case *raftcmdpb.LedgerScopedOrder_CreateLedger: - p.Ledgers[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) case *raftcmdpb.LedgerScopedOrder_DeleteLedger: - p.Ledgers[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + // LogPayload_DeleteLedger cascades into + // Derived.Boundaries.Delete → WriteSet.Merge → KeyStore.Delete + // → strict AttributeCache.Del. Preload the boundary attribute + // so Gen0 holds the entry at apply time (invariant #6). + p.Add(dal.SubAttrBoundary, ledgerBytes) case *raftcmdpb.LedgerScopedOrder_PromoteLedger: - p.Ledgers[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) case *raftcmdpb.LedgerScopedOrder_MirrorIngest: - p.Ledgers[ledgerKey] = struct{}{} - p.Boundaries[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrBoundary, ledgerBytes) mi := payload.MirrorIngest @@ -833,10 +836,10 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { if ct := mi.GetEntry().GetCreatedTransaction(); ct != nil { for account, mm := range ct.GetAccountMetadata() { for key := range mm.GetValues() { - p.Metadata[domain.MetadataKey{ + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: account}, Key: key, - }] = struct{}{} + }.Bytes()) } } } @@ -845,80 +848,82 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { switch target := sm.GetTarget().GetTarget().(type) { case *commonpb.Target_Account: for key := range sm.GetMetadata() { - p.Metadata[domain.MetadataKey{ + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: target.Account.GetAddr()}, Key: key, - }] = struct{}{} + }.Bytes()) } case *commonpb.Target_TransactionId: - p.Transactions[domain.TransactionKey{ - LedgerName: ledgerName, - ID: target.TransactionId, - }] = struct{}{} + addTransactionTargetNeeds(p, ledgerName, target.TransactionId) } } if dm := mi.GetEntry().GetDeletedMetadata(); dm != nil { switch target := dm.GetTarget().GetTarget().(type) { case *commonpb.Target_Account: - p.Metadata[domain.MetadataKey{ + // Mirror-ingested v2 DELETE_METADATA log applies via + // processMirrorDeletedMetadata → AccountMetadata.Delete + // → strict AttributeCache.Del. Declare coverage so Gen0 + // holds the entry at apply (invariant #6). + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: target.Account.GetAddr()}, Key: dm.GetKey(), - }] = struct{}{} + }.Bytes()) case *commonpb.Target_TransactionId: - p.Transactions[domain.TransactionKey{ - LedgerName: ledgerName, - ID: target.TransactionId, - }] = struct{}{} + // Transaction metadata lives inside the TransactionState + // map — no strict-Del path, no extra coverage needed. + addTransactionTargetNeeds(p, ledgerName, target.TransactionId) } } if rt := mi.GetEntry().GetRevertedTransaction(); rt != nil { - p.Transactions[domain.TransactionKey{ - LedgerName: ledgerName, - ID: rt.GetRevertedTransactionId(), - }] = struct{}{} + addTransactionTargetNeeds(p, ledgerName, rt.GetRevertedTransactionId()) } case *raftcmdpb.LedgerScopedOrder_CreatePreparedQuery: - p.Ledgers[ledgerKey] = struct{}{} - p.PreparedQueries[domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.CreatePreparedQuery.GetQuery().GetName()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.CreatePreparedQuery.GetQuery().GetName()}.Bytes()) case *raftcmdpb.LedgerScopedOrder_UpdatePreparedQuery: - p.Ledgers[ledgerKey] = struct{}{} - p.PreparedQueries[domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.UpdatePreparedQuery.GetName()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.UpdatePreparedQuery.GetName()}.Bytes()) case *raftcmdpb.LedgerScopedOrder_DeletePreparedQuery: - p.Ledgers[ledgerKey] = struct{}{} - p.PreparedQueries[domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.DeletePreparedQuery.GetName()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + // processDeletePreparedQuery calls PreparedQueries.Delete → + // strict AttributeCache.Del. Declare coverage so Gen0 holds + // the entry at apply (invariant #6). + p.Add(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: ledgerName, Name: payload.DeletePreparedQuery.GetName()}.Bytes()) case *raftcmdpb.LedgerScopedOrder_SaveNumscript: - p.Ledgers[ledgerKey] = struct{}{} - p.NumscriptVersions[domain.NumscriptVersionKey{LedgerName: ledgerName, Name: payload.SaveNumscript.GetName()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrNumscriptVersion, domain.NumscriptVersionKey{LedgerName: ledgerName, Name: payload.SaveNumscript.GetName()}.Bytes()) // For semver saves, preload the specific version content for immutability check. version := payload.SaveNumscript.GetVersion() if version != "" && version != "latest" { - p.NumscriptContents[domain.NumscriptEntryKey{LedgerName: ledgerName, Name: payload.SaveNumscript.GetName(), Version: version}] = struct{}{} + p.Add(dal.SubAttrNumscriptContent, domain.NumscriptEntryKey{LedgerName: ledgerName, Name: payload.SaveNumscript.GetName(), Version: version}.Bytes()) } case *raftcmdpb.LedgerScopedOrder_DeleteNumscript: - p.Ledgers[ledgerKey] = struct{}{} - p.NumscriptVersions[domain.NumscriptVersionKey{LedgerName: ledgerName, Name: payload.DeleteNumscript.GetName()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrNumscriptVersion, domain.NumscriptVersionKey{LedgerName: ledgerName, Name: payload.DeleteNumscript.GetName()}.Bytes()) case *raftcmdpb.LedgerScopedOrder_SaveLedgerMetadata: - p.Ledgers[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) for key := range payload.SaveLedgerMetadata.GetMetadata() { - p.LedgerMetadata[domain.LedgerMetadataKey{LedgerName: ledgerName, Key: key}] = struct{}{} + p.Add(dal.SubAttrLedgerMetadata, domain.LedgerMetadataKey{LedgerName: ledgerName, Key: key}.Bytes()) } case *raftcmdpb.LedgerScopedOrder_DeleteLedgerMetadata: - p.Ledgers[ledgerKey] = struct{}{} - p.LedgerMetadata[domain.LedgerMetadataKey{LedgerName: ledgerName, Key: payload.DeleteLedgerMetadata.GetKey()}] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + // Delete's apply calls strict-Del at KeyStore level (invariant #6). + // Declare coverage so Gen0 holds the entry at apply. + p.Add(dal.SubAttrLedgerMetadata, domain.LedgerMetadataKey{LedgerName: ledgerName, Key: payload.DeleteLedgerMetadata.GetKey()}.Bytes()) case *raftcmdpb.LedgerScopedOrder_Apply: - p.Boundaries[ledgerKey] = struct{}{} - p.Ledgers[ledgerKey] = struct{}{} + p.Add(dal.SubAttrBoundary, ledgerBytes) + p.Add(dal.SubAttrLedger, ledgerBytes) switch applyData := payload.Apply.GetData().(type) { case *raftcmdpb.LedgerApplyOrder_CreateTransaction: if applyData.CreateTransaction.GetReference() != "" { - p.References[domain.TransactionReferenceKey{ + p.Add(dal.SubAttrReference, domain.TransactionReferenceKey{ LedgerName: ledgerName, Reference: applyData.CreateTransaction.GetReference(), - }] = struct{}{} + }.Bytes()) } // Caller-supplied account metadata always preloads here, @@ -929,10 +934,10 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { // themselves are discovered later by the script pass. for account, mm := range applyData.CreateTransaction.GetAccountMetadata() { for key := range mm.GetValues() { - p.Metadata[domain.MetadataKey{ + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: account}, Key: key, - }] = struct{}{} + }.Bytes()) } } @@ -952,10 +957,7 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { } case *raftcmdpb.LedgerApplyOrder_RevertTransaction: - p.Transactions[domain.TransactionKey{ - LedgerName: ledgerName, - ID: applyData.RevertTransaction.GetTransactionId(), - }] = struct{}{} + addTransactionTargetNeeds(p, ledgerName, applyData.RevertTransaction.GetTransactionId()) for _, posting := range applyData.RevertTransaction.GetOriginalPostings() { addVolumeNeed(p, ledgerName, posting.GetDestination(), posting.GetAsset()) @@ -965,10 +967,10 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { case *raftcmdpb.LedgerApplyOrder_AddMetadata: if target, ok := applyData.AddMetadata.GetTarget().GetTarget().(*commonpb.Target_Account); ok { for key := range applyData.AddMetadata.GetMetadata() { - p.Metadata[domain.MetadataKey{ + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: target.Account.GetAddr()}, Key: key, - }] = struct{}{} + }.Bytes()) } } @@ -978,42 +980,54 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { case *raftcmdpb.LedgerApplyOrder_DeleteMetadata: if target, ok := applyData.DeleteMetadata.GetTarget().GetTarget().(*commonpb.Target_Account); ok { - p.Metadata[domain.MetadataKey{ + // Account-metadata Delete's apply routes through + // KeyStore.Delete → strict AttributeCache.Del (requires Gen0 + // to hold the entry — invariant #6). Declare coverage. + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: ledgerName, Account: target.Account.GetAddr()}, Key: applyData.DeleteMetadata.GetKey(), - }] = struct{}{} + }.Bytes()) } if tx, ok := applyData.DeleteMetadata.GetTarget().GetTarget().(*commonpb.Target_TransactionId); ok { + // Transaction metadata lives inside the transaction state + // (a TransactionState.Metadata map, not a separate cache + // attribute), so strict-Del does not apply. addTransactionTargetNeeds(p, ledgerName, tx.TransactionId) } case *raftcmdpb.LedgerApplyOrder_CreateIndex: // processCreateIndex consults the registry to short-circuit on // READY duplicates — preload the matching entry. - p.Indexes[domain.IndexKey{LedgerName: ledgerName, Canonical: indexes.Canonical(applyData.CreateIndex.GetId())}] = struct{}{} + p.Add(dal.SubAttrIndex, domain.IndexKey{LedgerName: ledgerName, Canonical: indexes.Canonical(applyData.CreateIndex.GetId())}.Bytes()) case *raftcmdpb.LedgerApplyOrder_DropIndex: - // processDropIndex calls DeleteIndex unconditionally, but - // preloading keeps the FSM read side consistent with invariant 3. - p.Indexes[domain.IndexKey{LedgerName: ledgerName, Canonical: indexes.Canonical(applyData.DropIndex.GetId())}] = struct{}{} + // processDropIndex calls DeleteIndex unconditionally. + // indexes.Remove → w.Delete → strict AttributeCache.Del. + // Declare coverage so Gen0 holds the entry at apply. + p.Add(dal.SubAttrIndex, domain.IndexKey{ + LedgerName: ledgerName, + Canonical: indexes.Canonical(applyData.DropIndex.GetId()), + }.Bytes()) case *raftcmdpb.LedgerApplyOrder_SetMetadataFieldType: // Schema changes touch the matching metadata index entry to // flip it back to BUILDING; preload so processSetMetadataFieldType // finds the current state. - p.Indexes[domain.IndexKey{ + p.Add(dal.SubAttrIndex, domain.IndexKey{ LedgerName: ledgerName, Canonical: indexes.Canonical(indexes.MetadataID(applyData.SetMetadataFieldType.GetTargetType(), applyData.SetMetadataFieldType.GetKey())), - }] = struct{}{} + }.Bytes()) case *raftcmdpb.LedgerApplyOrder_RemoveMetadataFieldType: // Removing a schema field cascades into dropping the index; // processRemoveMetadataFieldType probes the registry first. - p.Indexes[domain.IndexKey{ + // The cascade Find→indexes.Remove reaches strict Del on hit — + // declare coverage. + p.Add(dal.SubAttrIndex, domain.IndexKey{ LedgerName: ledgerName, Canonical: indexes.Canonical(indexes.MetadataID(applyData.RemoveMetadataFieldType.GetTargetType(), applyData.RemoveMetadataFieldType.GetKey())), - }] = struct{}{} + }.Bytes()) } default: // Loud failure for an unmapped ledger-scoped payload. The processor @@ -1040,9 +1054,12 @@ func extractLedgerScopedNeeds(p *plan.Needs, ls *raftcmdpb.LedgerScopedOrder) { func extractSystemScopedNeeds(p *plan.Needs, ss *raftcmdpb.SystemScopedOrder) { switch payload := ss.GetPayload().(type) { case *raftcmdpb.SystemScopedOrder_AddEventsSink: - p.SinkConfigs[domain.SinkConfigKey{Name: payload.AddEventsSink.GetConfig().GetName()}] = struct{}{} + p.Add(dal.SubAttrSinkConfig, domain.SinkConfigKey{Name: payload.AddEventsSink.GetConfig().GetName()}.Bytes()) case *raftcmdpb.SystemScopedOrder_RemoveEventsSink: - p.SinkConfigs[domain.SinkConfigKey{Name: payload.RemoveEventsSink.GetName()}] = struct{}{} + // LogPayload_RemovedEventsSink cascades into + // Derived.SinkConfigs.Delete → WriteSet.Merge → KeyStore.Delete + // → strict AttributeCache.Del. Declare coverage. + p.Add(dal.SubAttrSinkConfig, domain.SinkConfigKey{Name: payload.RemoveEventsSink.GetName()}.Bytes()) // Explicit no-op cases: every other system-scoped variant intentionally // touches no cache attribute. Listed individually (not lumped into @@ -1187,8 +1204,9 @@ func (a *Admission) resolveScriptsAndEnrichNeeds(ctx context.Context, orders []* } for key := range discovered.WrittenMetadata { - p.Metadata[key] = struct{}{} - orderNeeds.Metadata[key] = struct{}{} + canonical := key.Bytes() + p.Add(dal.SubAttrMetadata, canonical) + orderNeeds.Add(dal.SubAttrMetadata, canonical) } } @@ -1201,9 +1219,9 @@ func (a *Admission) resolveScriptsAndEnrichNeeds(ctx context.Context, orders []* LedgerName: ledgerName, Name: ref.GetName(), Version: resolvedVersion, - } - p.NumscriptContents[contentKey] = struct{}{} - orderNeeds.NumscriptContents[contentKey] = struct{}{} + }.Bytes() + p.Add(dal.SubAttrNumscriptContent, contentKey) + orderNeeds.Add(dal.SubAttrNumscriptContent, contentKey) } } diff --git a/internal/application/admission/admission_test.go b/internal/application/admission/admission_test.go index fe4574c188..92dd84c748 100644 --- a/internal/application/admission/admission_test.go +++ b/internal/application/admission/admission_test.go @@ -219,10 +219,9 @@ func TestExtractNeededVolumes(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 2 volume keys: both source (world) and destination (user:alice) are preloaded - require.Len(t, volumes, 2) + require.Equal(t, 2, needs.Count(dal.SubAttrVolume)) worldKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "world"}, @@ -233,10 +232,8 @@ func TestExtractNeededVolumes(t *testing.T) { Asset: "USD", } - _, hasWorld := volumes[worldKey] - _, hasAlice := volumes[aliceKey] - require.True(t, hasWorld, "should have world volume key") - require.True(t, hasAlice, "should have alice volume key") + require.True(t, needs.Has(dal.SubAttrVolume, worldKey.Bytes()), "should have world volume key") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should have alice volume key") }) t.Run("extracts volumes for revert transaction", func(t *testing.T) { @@ -275,10 +272,9 @@ func TestExtractNeededVolumes(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 2 volume keys: both the new source (alice) and new destination (world) are preloaded - require.Len(t, volumes, 2) + require.Equal(t, 2, needs.Count(dal.SubAttrVolume)) aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "user:alice"}, @@ -289,10 +285,8 @@ func TestExtractNeededVolumes(t *testing.T) { Asset: "USD", } - _, hasAlice := volumes[aliceKey] - _, hasWorld := volumes[worldKey] - require.True(t, hasAlice, "should have alice volume key (source in revert)") - require.True(t, hasWorld, "should have world volume key (destination in revert)") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should have alice volume key (source in revert)") + require.True(t, needs.Has(dal.SubAttrVolume, worldKey.Bytes()), "should have world volume key (destination in revert)") }) t.Run("extracts volumes for multiple postings in revert", func(t *testing.T) { @@ -334,11 +328,10 @@ func TestExtractNeededVolumes(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 3 volume keys: alice, bob (original destinations become sources in revert) // AND world (original source becomes destination in revert) - all volumes preloaded - require.Len(t, volumes, 3) + require.Equal(t, 3, needs.Count(dal.SubAttrVolume)) aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "user:alice"}, @@ -353,13 +346,9 @@ func TestExtractNeededVolumes(t *testing.T) { Asset: "USD", } - _, hasAlice := volumes[aliceKey] - _, hasBob := volumes[bobKey] - _, hasWorld := volumes[worldKey] - - require.True(t, hasAlice) - require.True(t, hasBob) - require.True(t, hasWorld) + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes())) + require.True(t, needs.Has(dal.SubAttrVolume, bobKey.Bytes())) + require.True(t, needs.Has(dal.SubAttrVolume, worldKey.Bytes())) }) t.Run("preloads transaction state when add_metadata target uses id", func(t *testing.T) { @@ -393,9 +382,8 @@ func TestExtractNeededVolumes(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - _, hasTx := needs.Transactions[domain.TransactionKey{LedgerName: "test-ledger", ID: 7}] - require.True(t, hasTx, "transaction key should be preloaded when id is used") - require.Empty(t, needs.References, "reference key should not be preloaded when id is used") + require.True(t, needs.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: "test-ledger", ID: 7}.Bytes()), "transaction key should be preloaded when id is used") + require.Zero(t, needs.Count(dal.SubAttrReference), "reference key should not be preloaded when id is used") }) } @@ -536,10 +524,9 @@ func TestExtractNeededVolumes_Force(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 2 volume keys: both source and destination are always preloaded - require.Len(t, volumes, 2, "force=true should still extract all volumes") + require.Equal(t, 2, needs.Count(dal.SubAttrVolume), "force=true should still extract all volumes") aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:alice"}, @@ -550,10 +537,8 @@ func TestExtractNeededVolumes_Force(t *testing.T) { Asset: "USD", } - _, hasAlice := volumes[aliceKey] - _, hasBob := volumes[bobKey] - require.True(t, hasAlice, "should have source volume") - require.True(t, hasBob, "should have destination volume") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should have source volume") + require.True(t, needs.Has(dal.SubAttrVolume, bobKey.Bytes()), "should have destination volume") }) t.Run("extracts volumes when force is false for create transaction", func(t *testing.T) { @@ -589,10 +574,9 @@ func TestExtractNeededVolumes_Force(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 2 volume keys: both source and destination are always preloaded - require.Len(t, volumes, 2, "force=false should extract all volumes") + require.Equal(t, 2, needs.Count(dal.SubAttrVolume), "force=false should extract all volumes") aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:alice"}, @@ -603,10 +587,8 @@ func TestExtractNeededVolumes_Force(t *testing.T) { Asset: "USD", } - _, hasAlice := volumes[aliceKey] - _, hasBob := volumes[bobKey] - require.True(t, hasAlice) - require.True(t, hasBob) + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes())) + require.True(t, needs.Has(dal.SubAttrVolume, bobKey.Bytes())) }) t.Run("mixed orders: all volumes extracted regardless of force flag", func(t *testing.T) { @@ -667,10 +649,9 @@ func TestExtractNeededVolumes_Force(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should have 4 volume keys: source+dest from both orders - require.Len(t, volumes, 4) + require.Equal(t, 4, needs.Count(dal.SubAttrVolume)) // Verify force=true volumes ARE present forceSourceKey := domain.VolumeKey{ @@ -681,10 +662,8 @@ func TestExtractNeededVolumes_Force(t *testing.T) { AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:force_dest"}, Asset: "USD", } - _, hasForceSource := volumes[forceSourceKey] - _, hasForceDest := volumes[forceDestKey] - require.True(t, hasForceSource, "force=true order should have source volumes extracted") - require.True(t, hasForceDest, "force=true order should have destination volumes extracted") + require.True(t, needs.Has(dal.SubAttrVolume, forceSourceKey.Bytes()), "force=true order should have source volumes extracted") + require.True(t, needs.Has(dal.SubAttrVolume, forceDestKey.Bytes()), "force=true order should have destination volumes extracted") // Verify force=false volumes are present normalSourceKey := domain.VolumeKey{ @@ -695,10 +674,8 @@ func TestExtractNeededVolumes_Force(t *testing.T) { AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:normal_dest"}, Asset: "EUR", } - _, hasNormalSource := volumes[normalSourceKey] - _, hasNormalDest := volumes[normalDestKey] - require.True(t, hasNormalSource, "force=false order should have source volumes extracted") - require.True(t, hasNormalDest, "force=false order should have destination volumes extracted") + require.True(t, needs.Has(dal.SubAttrVolume, normalSourceKey.Bytes()), "force=false order should have source volumes extracted") + require.True(t, needs.Has(dal.SubAttrVolume, normalDestKey.Bytes()), "force=false order should have destination volumes extracted") }) t.Run("force on revert still extracts volumes", func(t *testing.T) { @@ -736,10 +713,9 @@ func TestExtractNeededVolumes_Force(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Revert reverses postings: alice->world. Both source (alice) and destination (world) preloaded. - require.Len(t, volumes, 2, "revert with force=true should still extract all volumes") + require.Equal(t, 2, needs.Count(dal.SubAttrVolume), "revert with force=true should still extract all volumes") aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "user:alice"}, @@ -750,10 +726,8 @@ func TestExtractNeededVolumes_Force(t *testing.T) { Asset: "USD", } - _, hasAlice := volumes[aliceKey] - _, hasWorld := volumes[worldKey] - require.True(t, hasAlice, "should have alice volume key (source in revert)") - require.True(t, hasWorld, "should have world volume key (destination in revert)") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should have alice volume key (source in revert)") + require.True(t, needs.Has(dal.SubAttrVolume, worldKey.Bytes()), "should have world volume key (destination in revert)") }) } @@ -839,8 +813,7 @@ func TestExtractPreloadNeeds_AccountMetadata_ScriptBacked(t *testing.T) { AccountKey: domain.AccountKey{LedgerName: testLedgerName, Account: "users:alice"}, Key: "vip", } - _, ok := needs.Metadata[key] - require.True(t, ok, + require.True(t, needs.Has(dal.SubAttrMetadata, key.Bytes()), "caller-supplied accountMetadata key must reach the preload set so the FSM can capture previousAccountMetadata", ) }) @@ -988,10 +961,9 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { needs, perOrderNeeds, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) require.NoError(t, admission.resolveScriptsAndEnrichNeeds(context.Background(), orders, overlay, needs, perOrderNeeds)) - volumes := needs.Volumes // Both source and destination volumes are preloaded from numscript - require.Len(t, volumes, 2, "numscript emulation should discover all volumes") + require.Equal(t, 2, needs.Count(dal.SubAttrVolume), "numscript emulation should discover all volumes") aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:alice"}, @@ -1002,11 +974,8 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { Asset: "USD/2", } - _, hasAlice := volumes[aliceKey] - _, hasBob := volumes[bobKey] - - require.True(t, hasAlice, "should discover source account from numscript") - require.True(t, hasBob, "should preload destination account from numscript") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should discover source account from numscript") + require.True(t, needs.Has(dal.SubAttrVolume, bobKey.Bytes()), "should preload destination account from numscript") }) t.Run("extracts numscript volumes even when force is true", func(t *testing.T) { @@ -1044,10 +1013,9 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { needs, perOrderNeeds, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) require.NoError(t, admission.resolveScriptsAndEnrichNeeds(context.Background(), orders, overlay, needs, perOrderNeeds)) - volumes := needs.Volumes // Force=true no longer skips volume extraction - all volumes are preloaded - require.Len(t, volumes, 2, "force=true should still extract numscript volumes") + require.Equal(t, 2, needs.Count(dal.SubAttrVolume), "force=true should still extract numscript volumes") aliceKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:alice"}, @@ -1058,10 +1026,8 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { Asset: "USD/2", } - _, hasAlice := volumes[aliceKey] - _, hasBob := volumes[bobKey] - require.True(t, hasAlice, "should have source volume from numscript") - require.True(t, hasBob, "should have destination volume from numscript") + require.True(t, needs.Has(dal.SubAttrVolume, aliceKey.Bytes()), "should have source volume from numscript") + require.True(t, needs.Has(dal.SubAttrVolume, bobKey.Bytes()), "should have destination volume from numscript") }) t.Run("discovers volumes from numscript reference vars", func(t *testing.T) { @@ -1110,11 +1076,10 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { require.NoError(t, err) require.NoError(t, admission.resolveScriptsAndEnrichNeeds(context.Background(), orders, overlay, needs, perOrderNeeds)) - _, hasAlice := needs.Volumes[domain.VolumeKey{ + require.True(t, needs.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "users:alice"}, Asset: "USD/2", - }] - require.True(t, hasAlice, "should discover destination account from reference vars") + }.Bytes()), "should discover destination account from reference vars") ref := orders[0].GetLedgerScoped().GetApply().GetCreateTransaction().GetNumscriptReference() require.Equal(t, "v1", ref.GetVersion()) @@ -1161,7 +1126,7 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { var parseErr *domain.ErrNumscriptParse require.ErrorAs(t, err, &parseErr) - require.Empty(t, needs.Volumes) + require.Zero(t, needs.Count(dal.SubAttrVolume)) }) t.Run("rejects numscript emulation failures during dependency discovery", func(t *testing.T) { @@ -1212,7 +1177,7 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { var runtimeErr *domain.ErrNumscriptRuntime require.ErrorAs(t, err, &runtimeErr) - require.Empty(t, needs.Volumes) + require.Zero(t, needs.Count(dal.SubAttrVolume)) }) t.Run("falls back to postings when script has explicit postings", func(t *testing.T) { @@ -1251,10 +1216,9 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { needs, _, err := admission.extractPreloadNeeds(context.Background(), orders) require.NoError(t, err) - volumes := needs.Volumes // Should use explicit postings, not numscript emulation; both source and destination preloaded - require.Len(t, volumes, 2) + require.Equal(t, 2, needs.Count(dal.SubAttrVolume)) bankKey := domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: "test-ledger", Account: "bank"}, @@ -1265,10 +1229,8 @@ func TestExtractNeededVolumes_Numscript(t *testing.T) { Asset: "EUR", } - _, hasBank := volumes[bankKey] - _, hasMerchant := volumes[merchantKey] - require.True(t, hasBank, "should use explicit posting source") - require.True(t, hasMerchant, "should use explicit posting destination") + require.True(t, needs.Has(dal.SubAttrVolume, bankKey.Bytes()), "should use explicit posting source") + require.True(t, needs.Has(dal.SubAttrVolume, merchantKey.Bytes()), "should use explicit posting destination") }) } diff --git a/internal/application/admission/wrapper_dispatch_test.go b/internal/application/admission/wrapper_dispatch_test.go index 257bbd5d96..b4def11166 100644 --- a/internal/application/admission/wrapper_dispatch_test.go +++ b/internal/application/admission/wrapper_dispatch_test.go @@ -9,6 +9,7 @@ import ( "github.com/formancehq/ledger/v3/internal/infra/plan" "github.com/formancehq/ledger/v3/internal/proto/commonpb" "github.com/formancehq/ledger/v3/internal/proto/raftcmdpb" + "github.com/formancehq/ledger/v3/internal/storage/dal" ) const wrapperTestLedger = "books" @@ -36,7 +37,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { Payload: &raftcmdpb.LedgerScopedOrder_CreateLedger{CreateLedger: &raftcmdpb.CreateLedgerOrder{}}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Ledgers, ledgerKey) + require.True(t, n.Has(dal.SubAttrLedger, ledgerKey.Bytes())) }, }, { @@ -46,7 +47,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { Payload: &raftcmdpb.LedgerScopedOrder_DeleteLedger{DeleteLedger: &raftcmdpb.DeleteLedgerOrder{}}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Ledgers, ledgerKey) + require.True(t, n.Has(dal.SubAttrLedger, ledgerKey.Bytes())) }, }, { @@ -56,7 +57,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { Payload: &raftcmdpb.LedgerScopedOrder_PromoteLedger{PromoteLedger: &raftcmdpb.PromoteLedgerOrder{}}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Ledgers, ledgerKey) + require.True(t, n.Has(dal.SubAttrLedger, ledgerKey.Bytes())) }, }, { @@ -81,20 +82,20 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Ledgers, ledgerKey) - require.Contains(t, n.Boundaries, ledgerKey) - require.Contains(t, n.Volumes, domain.VolumeKey{ + require.True(t, n.Has(dal.SubAttrLedger, ledgerKey.Bytes())) + require.True(t, n.Has(dal.SubAttrBoundary, ledgerKey.Bytes())) + require.True(t, n.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "world"}, Asset: "USD", - }) - require.Contains(t, n.Volumes, domain.VolumeKey{ + }.Bytes())) + require.True(t, n.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:alice"}, Asset: "USD", - }) - require.Contains(t, n.Metadata, domain.MetadataKey{ + }.Bytes())) + require.True(t, n.Has(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:alice"}, Key: "tag", - }) + }.Bytes())) }, }, { @@ -117,10 +118,10 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Metadata, domain.MetadataKey{ + require.True(t, n.Has(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:bob"}, Key: "score", - }) + }.Bytes())) }, }, { @@ -138,7 +139,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 7}) + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 7}.Bytes())) }, }, { @@ -159,10 +160,10 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Metadata, domain.MetadataKey{ + require.True(t, n.Has(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:carol"}, Key: "score", - }) + }.Bytes())) }, }, { @@ -180,7 +181,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 9}) + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 9}.Bytes())) }, }, { @@ -201,11 +202,11 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }}, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 11}) - require.Contains(t, n.Volumes, domain.VolumeKey{ + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 11}.Bytes())) + require.True(t, n.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:alice"}, Asset: "USD", - }) + }.Bytes())) }, }, { @@ -219,8 +220,8 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Ledgers, ledgerKey) - require.Contains(t, n.PreparedQueries, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}) + require.True(t, n.Has(dal.SubAttrLedger, ledgerKey.Bytes())) + require.True(t, n.Has(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}.Bytes())) }, }, { @@ -232,7 +233,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.PreparedQueries, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}) + require.True(t, n.Has(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}.Bytes())) }, }, { @@ -244,7 +245,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.PreparedQueries, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}) + require.True(t, n.Has(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: wrapperTestLedger, Name: "q1"}.Bytes())) }, }, { @@ -256,9 +257,9 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.NumscriptVersions, domain.NumscriptVersionKey{LedgerName: wrapperTestLedger, Name: "tx"}) + require.True(t, n.Has(dal.SubAttrNumscriptVersion, domain.NumscriptVersionKey{LedgerName: wrapperTestLedger, Name: "tx"}.Bytes())) // latest does not preload a specific version content - require.Empty(t, n.NumscriptContents) + require.Zero(t, n.Count(dal.SubAttrNumscriptContent)) }, }, { @@ -271,11 +272,11 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, assert: func(t *testing.T, n *plan.Needs) { // semver saves preload the specific (name, version) for immutability check - require.Contains(t, n.NumscriptContents, domain.NumscriptEntryKey{ + require.True(t, n.Has(dal.SubAttrNumscriptContent, domain.NumscriptEntryKey{ LedgerName: wrapperTestLedger, Name: "tx", Version: "1.2.3", - }) + }.Bytes())) }, }, { @@ -287,7 +288,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.NumscriptVersions, domain.NumscriptVersionKey{LedgerName: wrapperTestLedger, Name: "tx"}) + require.True(t, n.Has(dal.SubAttrNumscriptVersion, domain.NumscriptVersionKey{LedgerName: wrapperTestLedger, Name: "tx"}.Bytes())) }, }, { @@ -303,7 +304,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.LedgerMetadata, domain.LedgerMetadataKey{LedgerName: wrapperTestLedger, Key: "owner"}) + require.True(t, n.Has(dal.SubAttrLedgerMetadata, domain.LedgerMetadataKey{LedgerName: wrapperTestLedger, Key: "owner"}.Bytes())) }, }, { @@ -315,7 +316,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.LedgerMetadata, domain.LedgerMetadataKey{LedgerName: wrapperTestLedger, Key: "owner"}) + require.True(t, n.Has(dal.SubAttrLedgerMetadata, domain.LedgerMetadataKey{LedgerName: wrapperTestLedger, Key: "owner"}.Bytes())) }, }, { @@ -336,11 +337,11 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.References, domain.TransactionReferenceKey{LedgerName: wrapperTestLedger, Reference: "order-42"}) - require.Contains(t, n.Volumes, domain.VolumeKey{ + require.True(t, n.Has(dal.SubAttrReference, domain.TransactionReferenceKey{LedgerName: wrapperTestLedger, Reference: "order-42"}.Bytes())) + require.True(t, n.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:dan"}, Asset: "EUR", - }) + }.Bytes())) }, }, { @@ -361,11 +362,11 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 13}) - require.Contains(t, n.Volumes, domain.VolumeKey{ + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 13}.Bytes())) + require.True(t, n.Has(dal.SubAttrVolume, domain.VolumeKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:eve"}, Asset: "USD", - }) + }.Bytes())) }, }, { @@ -388,10 +389,10 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Metadata, domain.MetadataKey{ + require.True(t, n.Has(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:fran"}, Key: "badge", - }) + }.Bytes())) }, }, { @@ -409,7 +410,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 17}) + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 17}.Bytes())) }, }, { @@ -430,10 +431,10 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Metadata, domain.MetadataKey{ + require.True(t, n.Has(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: wrapperTestLedger, Account: "user:gina"}, Key: "badge", - }) + }.Bytes())) }, }, { @@ -452,7 +453,7 @@ func TestExtractLedgerScopedNeeds_CoversEveryPayloadVariant(t *testing.T) { }, }, assert: func(t *testing.T, n *plan.Needs) { - require.Contains(t, n.Transactions, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 19}) + require.True(t, n.Has(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: wrapperTestLedger, ID: 19}.Bytes())) }, }, } @@ -484,7 +485,7 @@ func TestExtractSystemScopedNeeds_OnlySinkConfigsContribute(t *testing.T) { Config: &commonpb.SinkConfig{Name: "kafka-main"}, }}, }) - require.Contains(t, needs.SinkConfigs, domain.SinkConfigKey{Name: "kafka-main"}) + require.True(t, needs.Has(dal.SubAttrSinkConfig, domain.SinkConfigKey{Name: "kafka-main"}.Bytes())) }) t.Run("remove_events_sink", func(t *testing.T) { @@ -495,7 +496,7 @@ func TestExtractSystemScopedNeeds_OnlySinkConfigsContribute(t *testing.T) { Name: "kafka-main", }}, }) - require.Contains(t, needs.SinkConfigs, domain.SinkConfigKey{Name: "kafka-main"}) + require.True(t, needs.Has(dal.SubAttrSinkConfig, domain.SinkConfigKey{Name: "kafka-main"}.Bytes())) }) noOpVariants := []struct { @@ -524,8 +525,8 @@ func TestExtractSystemScopedNeeds_OnlySinkConfigsContribute(t *testing.T) { needs := plan.NewNeeds() extractSystemScopedNeeds(needs, tc.payload) - require.Empty(t, needs.SinkConfigs, "%s must not contribute sink configs", tc.name) - require.Empty(t, needs.Ledgers, "%s must not contribute a ledger", tc.name) + require.Zero(t, needs.Count(dal.SubAttrSinkConfig), "%s must not contribute sink configs", tc.name) + require.Zero(t, needs.Count(dal.SubAttrLedger), "%s must not contribute a ledger", tc.name) }) } } diff --git a/internal/application/mirror/worker.go b/internal/application/mirror/worker.go index 848b1e9df2..0f7558744e 100644 --- a/internal/application/mirror/worker.go +++ b/internal/application/mirror/worker.go @@ -366,7 +366,7 @@ func (w *Worker) processBatch(ctx context.Context) (bool, error) { // One WriteOperation per Order + one for the cursor TU. The cursor // TU reads Registry.Ledgers[w.ledgerName] in applyMirrorSyncUpdate. tuNeeds := plan.NewNeeds() - tuNeeds.Ledgers[domain.LedgerKey{Name: w.ledgerName}] = struct{}{} + tuNeeds.Add(dal.SubAttrLedger, domain.LedgerKey{Name: w.ledgerName}.Bytes()) operations := make([]plan.WriteOperation, 0, len(orders)+1) for i := range orders { @@ -519,7 +519,7 @@ func (w *Worker) reportError(ctx context.Context, message string) { // emit no audit entry and the error would never reach the store. // One WriteOperation for the error TU with its ledger needs declared. needs := plan.NewNeeds() - needs.Ledgers[domain.LedgerKey{Name: w.ledgerName}] = struct{}{} + needs.Add(dal.SubAttrLedger, domain.LedgerKey{Name: w.ledgerName}.Bytes()) operations := []plan.WriteOperation{{ Needs: needs, @@ -585,12 +585,22 @@ func (w *Worker) extractMirrorNeeds(cmd *raftcmdpb.Proposal) (*plan.Needs, []*pl aggregate := plan.NewNeeds() perOrder := make([]*plan.Needs, len(cmd.GetOrders())) - ledgerKey := domain.LedgerKey{Name: w.ledgerName} + ledgerBytes := domain.LedgerKey{Name: w.ledgerName}.Bytes() + + addAccountMetadata := func(p *plan.Needs, account, key string) { + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ + AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: account}, + Key: key, + }.Bytes()) + } + addTx := func(p *plan.Needs, txID uint64) { + p.Add(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: w.ledgerName, ID: txID}.Bytes()) + } for orderIdx, order := range cmd.GetOrders() { p := plan.NewNeeds() - p.Ledgers[ledgerKey] = struct{}{} - p.Boundaries[ledgerKey] = struct{}{} + p.Add(dal.SubAttrLedger, ledgerBytes) + p.Add(dal.SubAttrBoundary, ledgerBytes) mi := order.GetLedgerScoped().GetMirrorIngest() if mi == nil { @@ -612,7 +622,7 @@ func (w *Worker) extractMirrorNeeds(cmd *raftcmdpb.Proposal) (*plan.Needs, []*pl {AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: posting.GetSource()}, Asset: posting.GetAsset()}, {AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: posting.GetDestination()}, Asset: posting.GetAsset()}, } { - p.Volumes[volKey] = struct{}{} + p.Add(dal.SubAttrVolume, volKey.Bytes()) } } @@ -620,10 +630,7 @@ func (w *Worker) extractMirrorNeeds(cmd *raftcmdpb.Proposal) (*plan.Needs, []*pl if ct := mi.GetEntry().GetCreatedTransaction(); ct != nil { for account, mm := range ct.GetAccountMetadata() { for key := range mm.GetValues() { - p.Metadata[domain.MetadataKey{ - AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: account}, - Key: key, - }] = struct{}{} + addAccountMetadata(p, account, key) } } } @@ -632,30 +639,29 @@ func (w *Worker) extractMirrorNeeds(cmd *raftcmdpb.Proposal) (*plan.Needs, []*pl switch target := sm.GetTarget().GetTarget().(type) { case *commonpb.Target_Account: for key := range sm.GetMetadata() { - p.Metadata[domain.MetadataKey{ - AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: target.Account.GetAddr()}, - Key: key, - }] = struct{}{} + addAccountMetadata(p, target.Account.GetAddr(), key) } case *commonpb.Target_TransactionId: - p.Transactions[domain.TransactionKey{LedgerName: w.ledgerName, ID: target.TransactionId}] = struct{}{} + addTx(p, target.TransactionId) } } if dm := mi.GetEntry().GetDeletedMetadata(); dm != nil { switch target := dm.GetTarget().GetTarget().(type) { case *commonpb.Target_Account: - p.Metadata[domain.MetadataKey{ + // Same strict-Del coverage as the admission-side + // MirrorIngest.DeletedMetadata path (see admission.go). + p.Add(dal.SubAttrMetadata, domain.MetadataKey{ AccountKey: domain.AccountKey{LedgerName: w.ledgerName, Account: target.Account.GetAddr()}, Key: dm.GetKey(), - }] = struct{}{} + }.Bytes()) case *commonpb.Target_TransactionId: - p.Transactions[domain.TransactionKey{LedgerName: w.ledgerName, ID: target.TransactionId}] = struct{}{} + addTx(p, target.TransactionId) } } if rt := mi.GetEntry().GetRevertedTransaction(); rt != nil { - p.Transactions[domain.TransactionKey{LedgerName: w.ledgerName, ID: rt.GetRevertedTransactionId()}] = struct{}{} + addTx(p, rt.GetRevertedTransactionId()) } perOrder[orderIdx] = p diff --git a/internal/infra/plan/attribute_resolvers.go b/internal/infra/plan/attribute_resolvers.go new file mode 100644 index 0000000000..ea78c8ca80 --- /dev/null +++ b/internal/infra/plan/attribute_resolvers.go @@ -0,0 +1,188 @@ +package plan + +import ( + logging "github.com/formancehq/go-libs/v5/pkg/observe/log" + + "github.com/formancehq/ledger/v3/internal/infra/attributes" + "github.com/formancehq/ledger/v3/internal/infra/bloom" + "github.com/formancehq/ledger/v3/internal/infra/cache" + "github.com/formancehq/ledger/v3/internal/infra/preload" + "github.com/formancehq/ledger/v3/internal/proto/commonpb" + "github.com/formancehq/ledger/v3/internal/proto/raftcmdpb" + "github.com/formancehq/ledger/v3/internal/storage/dal" +) + +// attrResolver is the non-generic view over one attribute type's +// resolve pipeline. Each attribute cache carries a different generic +// value type T, so the concrete implementation is +// protoAttrResolver[T]; the interface exists so the builder can hold +// them uniformly in a map keyed by attrCode. +type attrResolver interface { + // Resolve walks the keys set through resolveAttributePreload for + // this attribute's cache/loader/store bindings. Returns the batch + // of AttributePlan entries and the tracker keys the caller must + // associate with Loader() for the CleanupToken. + Resolve( + keys map[string]struct{}, + nextIndex, boundary, cacheEpoch uint64, + store dal.PebbleGetter, + logger logging.Logger, + ) (*resolveResult, error) + + // Loader returns the AttributeLoader tied to this resolver so the + // builder can register it in the CleanupToken. Non-generic so the + // registration is uniform across attribute types. + Loader() preload.LoaderOps +} + +// protoAttrResolver binds one attribute cache/loader/store triple for +// the resolve pipeline. One instance per dal.SubAttrX code lives in +// Builder.resolvers, populated at NewBuilder. +// +// bloom is a closure rather than a captured pointer: bloom.FilterSet is +// swappable across the ledger lifetime and the Builder.bloomFilter +// accessor takes a fresh snapshot per call to keep readiness and the +// filter pointer coherent (see #317). Storing a stale pointer here +// would defeat that. +type protoAttrResolver[T interface { + MarshalVT() ([]byte, error) +}] struct { + attrCode byte + typeName string + cache *cache.AttributeCache[T] + loader *preload.AttributeLoader[T] + getValue func(reader dal.PebbleGetter, canonicalKey []byte) (T, error) + bloom func() *bloom.Filter +} + +func (r *protoAttrResolver[T]) Resolve( + keys map[string]struct{}, + nextIndex, boundary, cacheEpoch uint64, + store dal.PebbleGetter, + logger logging.Logger, +) (*resolveResult, error) { + return resolveAttributePreload[T]( + keys, nextIndex, boundary, cacheEpoch, + r.cache, r.loader, r.getValue, store, + r.attrCode, nil, r.bloom(), logger, r.typeName, + ) +} + +func (r *protoAttrResolver[T]) Loader() preload.LoaderOps { return r.loader } + +// buildAttrResolvers returns the full registration set keyed by +// dal.SubAttrX for every attribute cache. Adding a new attribute +// cache means adding one entry here — no changes needed in Needs, +// admission call sites, coverage_bits, or the builder loop. +func buildAttrResolvers( + c *cache.Cache, + attrs *attributes.Attributes, + loaders *preload.Loaders, + bloomLookup func(sub byte) *bloom.Filter, +) map[byte]attrResolver { + // bloomLookup takes a snapshot per call (see Builder.bloomFilter); + // wrap it in per-attrCode closures so each resolver captures its own + // attribute code without threading it through the Resolve signature. + filter := func(sub byte) func() *bloom.Filter { + return func() *bloom.Filter { return bloomLookup(sub) } + } + + return map[byte]attrResolver{ + dal.SubAttrLedger: &protoAttrResolver[*commonpb.LedgerInfo]{ + attrCode: dal.SubAttrLedger, + typeName: "ledgers", + cache: c.Ledgers, + loader: loaders.Ledgers, + getValue: attrs.Ledger.Get, + bloom: filter(dal.SubAttrLedger), + }, + dal.SubAttrBoundary: &protoAttrResolver[*raftcmdpb.LedgerBoundaries]{ + attrCode: dal.SubAttrBoundary, + typeName: "boundaries", + cache: c.Boundaries, + loader: loaders.Boundaries, + getValue: attrs.Boundary.Get, + bloom: filter(dal.SubAttrBoundary), + }, + dal.SubAttrVolume: &protoAttrResolver[*raftcmdpb.VolumePair]{ + attrCode: dal.SubAttrVolume, + typeName: "volumes", + cache: c.Volumes, + loader: loaders.Volumes, + getValue: attrs.Volume.Get, + bloom: filter(dal.SubAttrVolume), + }, + dal.SubAttrReference: &protoAttrResolver[*commonpb.TransactionReferenceValue]{ + attrCode: dal.SubAttrReference, + typeName: "references", + cache: c.References, + loader: loaders.References, + getValue: attrs.References.Get, + bloom: filter(dal.SubAttrReference), + }, + dal.SubAttrSinkConfig: &protoAttrResolver[*commonpb.SinkConfig]{ + attrCode: dal.SubAttrSinkConfig, + typeName: "sink_configs", + cache: c.SinkConfigs, + loader: loaders.SinkConfigs, + getValue: attrs.SinkConfig.Get, + bloom: filter(dal.SubAttrSinkConfig), + }, + dal.SubAttrNumscriptVersion: &protoAttrResolver[*commonpb.NumscriptVersionValue]{ + attrCode: dal.SubAttrNumscriptVersion, + typeName: "numscript_versions", + cache: c.NumscriptVersions, + loader: loaders.NumscriptVersions, + getValue: attrs.NumscriptVersion.Get, + bloom: filter(dal.SubAttrNumscriptVersion), + }, + dal.SubAttrNumscriptContent: &protoAttrResolver[*commonpb.NumscriptInfo]{ + attrCode: dal.SubAttrNumscriptContent, + typeName: "numscript_contents", + cache: c.NumscriptContents, + loader: loaders.NumscriptContents, + getValue: attrs.NumscriptContent.Get, + bloom: filter(dal.SubAttrNumscriptContent), + }, + dal.SubAttrTransaction: &protoAttrResolver[*commonpb.TransactionState]{ + attrCode: dal.SubAttrTransaction, + typeName: "transactions", + cache: c.Transactions, + loader: loaders.Transactions, + getValue: attrs.Transaction.Get, + bloom: filter(dal.SubAttrTransaction), + }, + dal.SubAttrMetadata: &protoAttrResolver[*commonpb.MetadataValue]{ + attrCode: dal.SubAttrMetadata, + typeName: "metadata", + cache: c.AccountMetadata, + loader: loaders.AccountMetadata, + getValue: attrs.Metadata.Get, + bloom: filter(dal.SubAttrMetadata), + }, + dal.SubAttrPreparedQuery: &protoAttrResolver[*commonpb.PreparedQuery]{ + attrCode: dal.SubAttrPreparedQuery, + typeName: "prepared_queries", + cache: c.PreparedQueries, + loader: loaders.PreparedQueries, + getValue: attrs.PreparedQuery.Get, + bloom: filter(dal.SubAttrPreparedQuery), + }, + dal.SubAttrLedgerMetadata: &protoAttrResolver[*commonpb.MetadataValue]{ + attrCode: dal.SubAttrLedgerMetadata, + typeName: "ledger_metadata", + cache: c.LedgerMetadata, + loader: loaders.LedgerMetadata, + getValue: attrs.LedgerMetadata.Get, + bloom: filter(dal.SubAttrLedgerMetadata), + }, + dal.SubAttrIndex: &protoAttrResolver[*commonpb.Index]{ + attrCode: dal.SubAttrIndex, + typeName: "indexes", + cache: c.Indexes, + loader: loaders.Indexes, + getValue: attrs.Index.Get, + bloom: filter(dal.SubAttrIndex), + }, + } +} diff --git a/internal/infra/plan/builder.go b/internal/infra/plan/builder.go index 21d6873120..e4dfddb100 100644 --- a/internal/infra/plan/builder.go +++ b/internal/infra/plan/builder.go @@ -36,6 +36,13 @@ type Builder struct { bloomFilters *bloom.FilterSet logger logging.Logger + // resolvers[attrCode] holds the per-attribute-cache resolve pipeline + // (cache, loader, getValue, bloom, typeName). Populated once at + // NewBuilder; buildPreloadsAt iterates over needs.Attributes and + // dispatches through this map. Adding a new attribute cache is a + // single-line change in buildAttrResolvers. + resolvers map[byte]attrResolver + // maxPlanSize is the cap on the number of AttributePlan entries an // ExecutionPlan may carry. Build returns domain.ErrExecutionPlanTooLarge // past this threshold so admission rejects pathological payloads up @@ -112,7 +119,7 @@ func (g *ProposalGuard) ReleaseAll() { // prediction. maxPlanSize caps the number of AttributePlan entries an // ExecutionPlan may carry (0 = unlimited); see Builder.maxPlanSize. func NewBuilder(tracker *node.IndexTracker, c *cache.Cache, attrs *attributes.Attributes, store *dal.Store, bloomFilters *bloom.FilterSet, logger logging.Logger, maxPlanSize int) *Builder { - return &Builder{ + b := &Builder{ tracker: tracker, cache: c, attrs: attrs, @@ -122,6 +129,9 @@ func NewBuilder(tracker *node.IndexTracker, c *cache.Cache, attrs *attributes.At logger: logger, maxPlanSize: maxPlanSize, } + b.resolvers = buildAttrResolvers(c, attrs, b.loaders, b.bloomFilter) + + return b } // Loaders returns the shared preload.Loaders instance, allowing callers to @@ -347,72 +357,71 @@ func (p *Builder) buildPreloadsAt(nextIndex uint64, snap cache.ConfigSnapshot, n }).Tracef("Builder: buildPreloadsAt") } - // Each goroutine writes to a distinct results[slot]. Bump this constant - // whenever a new launch() call is added: with the Indexes attribute - // (PR #453) the count went from 12 to 13, and `results[12]` would - // otherwise panic with an out-of-range index when every category fires. - const maxTypes = 13 - results := make([]buildResult, maxTypes) + // Pre-count active attribute caches so results[] can be + // fixed-size: each goroutine writes to a distinct index without + // any synchronization on the slice header. + activeAttrCodes := make([]byte, 0, len(needs.Attributes)) + for attrCode, set := range needs.Attributes { + if len(set.Keys) == 0 { + continue + } - var wg sync.WaitGroup + activeAttrCodes = append(activeAttrCodes, attrCode) + } + + // Pre-validate every attrCode has a resolver BEFORE spawning any + // goroutine. Bailing mid-loop after a `launch` call would leave + // in-flight goroutines writing to results[] on a slice the caller + // no longer sees AND leak their `loader.LoadOrWait` inflight-map + // entries (no CleanupToken assembled → the loader keeps them + // pinned forever). Validating first makes the error path leak-free. + for _, attrCode := range activeAttrCodes { + if _, ok := p.resolvers[attrCode]; !ok { + // Admission emitted a preload for a cache the builder + // wasn't told about (new attrCode landed in dal.SubAttr* + // without a matching entry in buildAttrResolvers). + // Silent no-op would leave the FSM without seeded values + // on the apply side, violating invariant #6. + assert.Unreachable("plan builder: no resolver registered for attrCode — extend buildAttrResolvers", map[string]any{ + "attrCode": attrCode, + "keys": len(needs.Attributes[attrCode].Keys), + }) + + return nil, nil, fmt.Errorf("plan builder: no resolver for attrCode 0x%x", attrCode) + } + } - slot := 0 + slotCount := len(activeAttrCodes) + if len(needs.IdempotencyKeys) > 0 { + slotCount++ + } + + results := make([]buildResult, slotCount) - // launch spawns a resolve goroutine that writes to results[slot]. + var wg sync.WaitGroup + nextSlot := 0 launch := func(fn func(i int)) { - i := slot - slot++ + i := nextSlot + nextSlot++ wg.Go(func() { fn(i) }) } - if len(needs.Ledgers) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Ledgers, nextIndex, boundary, snap.Epoch, - p.cache.Ledgers, p.loaders.Ledgers, - p.attrs.Ledger.Get, p.store, - dal.SubAttrLedger, nil, - p.bloomFilter(dal.SubAttrLedger), - p.logger, "ledgers", - ) - results[i].resolve = r - results[i].loader = p.loaders.Ledgers - }) - } - - if len(needs.Boundaries) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Boundaries, nextIndex, boundary, snap.Epoch, - p.cache.Boundaries, p.loaders.Boundaries, - p.attrs.Boundary.Get, p.store, - dal.SubAttrBoundary, nil, - p.bloomFilter(dal.SubAttrBoundary), - p.logger, "boundaries", - ) - results[i].resolve = r - results[i].loader = p.loaders.Boundaries - }) - } + for _, attrCode := range activeAttrCodes { + set := needs.Attributes[attrCode] + resolver := p.resolvers[attrCode] // pre-validated non-nil above - if len(needs.Volumes) > 0 { launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Volumes, nextIndex, boundary, snap.Epoch, - p.cache.Volumes, p.loaders.Volumes, - p.attrs.Volume.Get, p.store, - dal.SubAttrVolume, nil, - p.bloomFilter(dal.SubAttrVolume), - p.logger, "volumes", + r, err := resolver.Resolve( + set.Keys, + nextIndex, boundary, snap.Epoch, + p.store, p.logger, ) results[i].resolve = r - results[i].loader = p.loaders.Volumes + results[i].err = err + results[i].loader = resolver.Loader() }) } @@ -446,150 +455,6 @@ func (p *Builder) buildPreloadsAt(nextIndex uint64, snap cache.ConfigSnapshot, n }) } - if len(needs.References) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.References, nextIndex, boundary, snap.Epoch, - p.cache.References, p.loaders.References, - p.attrs.References.Get, p.store, - dal.SubAttrReference, nil, - p.bloomFilter(dal.SubAttrReference), - p.logger, "references", - ) - results[i].resolve = r - results[i].loader = p.loaders.References - }) - } - - if len(needs.SinkConfigs) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.SinkConfigs, nextIndex, boundary, snap.Epoch, - p.cache.SinkConfigs, p.loaders.SinkConfigs, - p.attrs.SinkConfig.Get, p.store, - dal.SubAttrSinkConfig, nil, - p.bloomFilter(dal.SubAttrSinkConfig), - p.logger, "sink_configs", - ) - results[i].resolve = r - results[i].loader = p.loaders.SinkConfigs - }) - } - - if len(needs.NumscriptVersions) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.NumscriptVersions, nextIndex, boundary, snap.Epoch, - p.cache.NumscriptVersions, p.loaders.NumscriptVersions, - p.attrs.NumscriptVersion.Get, p.store, - dal.SubAttrNumscriptVersion, nil, - p.bloomFilter(dal.SubAttrNumscriptVersion), - p.logger, "numscript_versions", - ) - results[i].resolve = r - results[i].loader = p.loaders.NumscriptVersions - }) - } - - if len(needs.NumscriptContents) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.NumscriptContents, nextIndex, boundary, snap.Epoch, - p.cache.NumscriptContents, p.loaders.NumscriptContents, - p.attrs.NumscriptContent.Get, p.store, - dal.SubAttrNumscriptContent, nil, - p.bloomFilter(dal.SubAttrNumscriptContent), - p.logger, "numscript_contents", - ) - results[i].resolve = r - results[i].loader = p.loaders.NumscriptContents - }) - } - - if len(needs.Transactions) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Transactions, nextIndex, boundary, snap.Epoch, - p.cache.Transactions, p.loaders.Transactions, - p.attrs.Transaction.Get, p.store, - dal.SubAttrTransaction, nil, - p.bloomFilter(dal.SubAttrTransaction), - p.logger, "transactions", - ) - results[i].resolve = r - results[i].loader = p.loaders.Transactions - }) - } - - if len(needs.Metadata) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Metadata, nextIndex, boundary, snap.Epoch, - p.cache.AccountMetadata, p.loaders.AccountMetadata, - p.attrs.Metadata.Get, p.store, - dal.SubAttrMetadata, nil, - p.bloomFilter(dal.SubAttrMetadata), - p.logger, "metadata", - ) - results[i].resolve = r - results[i].loader = p.loaders.AccountMetadata - }) - } - - if len(needs.PreparedQueries) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.PreparedQueries, nextIndex, boundary, snap.Epoch, - p.cache.PreparedQueries, p.loaders.PreparedQueries, - p.attrs.PreparedQuery.Get, p.store, - dal.SubAttrPreparedQuery, nil, - p.bloomFilter(dal.SubAttrPreparedQuery), - p.logger, "prepared_queries", - ) - results[i].resolve = r - results[i].loader = p.loaders.PreparedQueries - }) - } - - if len(needs.LedgerMetadata) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.LedgerMetadata, nextIndex, boundary, snap.Epoch, - p.cache.LedgerMetadata, p.loaders.LedgerMetadata, - p.attrs.LedgerMetadata.Get, p.store, - dal.SubAttrLedgerMetadata, nil, - p.bloomFilter(dal.SubAttrLedgerMetadata), - p.logger, "ledger_metadata", - ) - results[i].resolve = r - results[i].loader = p.loaders.LedgerMetadata - }) - } - - if len(needs.Indexes) > 0 { - launch(func(i int) { - var r *resolveResult - r, results[i].err = resolveAttributePreload( - needs.Indexes, nextIndex, boundary, snap.Epoch, - p.cache.Indexes, p.loaders.Indexes, - p.attrs.Index.Get, p.store, - dal.SubAttrIndex, nil, - p.bloomFilter(dal.SubAttrIndex), - p.logger, "indexes", - ) - results[i].resolve = r - results[i].loader = p.loaders.Indexes - }) - } - wg.Wait() // Build preload.CleanupToken and merge results, returning first error encountered. @@ -599,7 +464,7 @@ func (p *Builder) buildPreloadsAt(nextIndex uint64, snap cache.ConfigSnapshot, n CacheEpoch: snap.Epoch, } - for i := range slot { + for i := range results { // Always promote any tracker entries into the cleanup token, // even when the slot returned an error. resolveAttributePreload // can populate the tracker for keys that loaded successfully diff --git a/internal/infra/plan/builder_test.go b/internal/infra/plan/builder_test.go index f79b4c1a8a..5759ee3ca0 100644 --- a/internal/infra/plan/builder_test.go +++ b/internal/infra/plan/builder_test.go @@ -301,7 +301,7 @@ func TestBuildPreloads_DeclaresAbsentNonZeroKey(t *testing.T) { refKey := domain.TransactionReferenceKey{LedgerName: "test", Reference: "fresh-ref"} needs := NewNeeds() - needs.References[refKey] = struct{}{} + needs.Add(dal.SubAttrReference, refKey.Bytes()) build, err := p.Build([]WriteOperation{{Needs: needs}}) require.NoError(t, err) @@ -357,7 +357,7 @@ func TestBuildPreloads_RejectsCacheHorizonExceeded(t *testing.T) { refKey := domain.TransactionReferenceKey{LedgerName: "test", Reference: "ref"} needs := NewNeeds() - needs.References[refKey] = struct{}{} + needs.Add(dal.SubAttrReference, refKey.Bytes()) build, buildErr := p.Build([]WriteOperation{{Needs: needs}}) defer build.ReleaseLoaders() @@ -366,3 +366,137 @@ func TestBuildPreloads_RejectsCacheHorizonExceeded(t *testing.T) { require.ErrorIs(t, buildErr, ErrCacheHorizonExceeded, "reject must surface ErrCacheHorizonExceeded so the gRPC adapter maps to codes.Unavailable") } + +// TestBuildPreloads_EmitsDeclareOnCacheGuaranteed pins the CacheGuaranteed +// branch: when admission's CheckCache verdict says the key is in Gen0 and +// still will be at apply, the resolver emits Declare (coverage-only, no +// Pebble read). +func TestBuildPreloads_EmitsDeclareOnCacheGuaranteed(t *testing.T) { + t.Parallel() + + ctx := logging.TestingContext() + logger := logging.FromContext(ctx) + meter := noop.NewMeterProvider().Meter("test") + + store, err := dal.NewStore(t.TempDir(), logger, meter, dal.DefaultConfig()) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + attrs := attributes.New() + c, err := cache.New(1000, meter) + require.NoError(t, err) + + metaKey := domain.MetadataKey{ + AccountKey: domain.AccountKey{LedgerName: "test", Account: "alice"}, + Key: "label", + } + id, _ := attributes.MakeKey(metaKey.Bytes()) + c.AccountMetadata.Put(id, attributes.Entry[*commonpb.MetadataValue]{ + Data: &commonpb.MetadataValue{}, + }) + + tracker := node.NewIndexTracker(1) + p := NewBuilder(tracker, c, attrs, store, nil, logger, 0) + + needs := NewNeeds() + needs.Add(dal.SubAttrMetadata, metaKey.Bytes()) + + build, err := p.Build([]WriteOperation{{Needs: needs}}) + require.NoError(t, err) + defer build.ReleaseLoaders() + + require.Len(t, build.ExecutionPlan.GetAttributes(), 1) + plan := build.ExecutionPlan.GetAttributes()[0] + + _, isDeclare := plan.GetIntent().(*raftcmdpb.AttributePlan_Declare) + require.True(t, isDeclare, "CacheGuaranteed must emit Declare — cache already has the value, no seed needed") + require.Equal(t, uint32(dal.SubAttrMetadata), plan.GetAttrCode()) + require.Equal(t, id[:], plan.GetId().GetId()) +} + +// TestBuildPreloads_EmitsDeclareOnMissingKey confirms the CacheMiss + +// Pebble-absent branch: nothing to seed, so the resolver emits Declare +// (coverage-only). +func TestBuildPreloads_EmitsDeclareOnMissingKey(t *testing.T) { + t.Parallel() + + ctx := logging.TestingContext() + logger := logging.FromContext(ctx) + meter := noop.NewMeterProvider().Meter("test") + + store, err := dal.NewStore(t.TempDir(), logger, meter, dal.DefaultConfig()) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + attrs := attributes.New() + c, err := cache.New(1000, meter) + require.NoError(t, err) + + metaKey := domain.MetadataKey{ + AccountKey: domain.AccountKey{LedgerName: "test", Account: "alice"}, + Key: "label", + } + + tracker := node.NewIndexTracker(1) + p := NewBuilder(tracker, c, attrs, store, nil, logger, 0) + + needs := NewNeeds() + needs.Add(dal.SubAttrMetadata, metaKey.Bytes()) + + build, err := p.Build([]WriteOperation{{Needs: needs}}) + require.NoError(t, err) + defer build.ReleaseLoaders() + + require.Len(t, build.ExecutionPlan.GetAttributes(), 1) + plan := build.ExecutionPlan.GetAttributes()[0] + + _, isDeclare := plan.GetIntent().(*raftcmdpb.AttributePlan_Declare) + require.True(t, isDeclare, "CacheMiss + Pebble-absent must emit Declare (coverage-only)") +} + +// TestBuildPreloads_EmitsDeclareOnBloomShortcut confirms the +// bloom-shortcut path: when admission's bloom filter says "definitely +// not", the resolver skips the Pebble read and emits Declare directly. +func TestBuildPreloads_EmitsDeclareOnBloomShortcut(t *testing.T) { + t.Parallel() + + ctx := logging.TestingContext() + logger := logging.FromContext(ctx) + meter := noop.NewMeterProvider().Meter("test") + + store, err := dal.NewStore(t.TempDir(), logger, meter, dal.DefaultConfig()) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + attrs := attributes.New() + c, err := cache.New(1000, meter) + require.NoError(t, err) + + bfs := bloom.NewFilterSet(&commonpb.ClusterConfig{ + BloomMetadata: &commonpb.BloomTypeConfig{ExpectedKeys: 1024, FpRate: 0.001}, + }, meter) + require.NotNil(t, bfs) + bfs.SetReady(true) + require.True(t, bfs.IsReady()) + + metaKey := domain.MetadataKey{ + AccountKey: domain.AccountKey{LedgerName: "test", Account: "alice"}, + Key: "label", + } + + tracker := node.NewIndexTracker(1) + p := NewBuilder(tracker, c, attrs, store, bfs, logger, 0) + + needs := NewNeeds() + needs.Add(dal.SubAttrMetadata, metaKey.Bytes()) + + build, err := p.Build([]WriteOperation{{Needs: needs}}) + require.NoError(t, err) + defer build.ReleaseLoaders() + + require.Len(t, build.ExecutionPlan.GetAttributes(), 1) + plan := build.ExecutionPlan.GetAttributes()[0] + + _, isDeclare := plan.GetIntent().(*raftcmdpb.AttributePlan_Declare) + require.True(t, isDeclare, "bloom-shortcut must emit Declare — the fast path bypasses Pebble entirely") +} diff --git a/internal/infra/plan/coverage_bits.go b/internal/infra/plan/coverage_bits.go index dac0af0a37..8ca745b0c6 100644 --- a/internal/infra/plan/coverage_bits.go +++ b/internal/infra/plan/coverage_bits.go @@ -3,7 +3,6 @@ package plan import ( "github.com/formancehq/ledger/v3/internal/infra/attributes" "github.com/formancehq/ledger/v3/internal/proto/raftcmdpb" - "github.com/formancehq/ledger/v3/internal/storage/dal" ) // planLookupKey indexes the proposal's AttributePlan slice. Multiple plans @@ -93,51 +92,9 @@ func setIDInBitset(bits []byte, indexByPlan map[planLookupKey]uint32, needs *Nee bits[idx/8] |= 1 << (idx % 8) } - for k := range needs.Ledgers { - mark(k.Bytes(), dal.SubAttrLedger) - } - - for k := range needs.Boundaries { - mark(k.Bytes(), dal.SubAttrBoundary) - } - - for k := range needs.Volumes { - mark(k.Bytes(), dal.SubAttrVolume) - } - - for k := range needs.References { - mark(k.Bytes(), dal.SubAttrReference) - } - - for k := range needs.Metadata { - mark(k.Bytes(), dal.SubAttrMetadata) - } - - for k := range needs.Transactions { - mark(k.Bytes(), dal.SubAttrTransaction) - } - - for k := range needs.SinkConfigs { - mark(k.Bytes(), dal.SubAttrSinkConfig) - } - - for k := range needs.NumscriptVersions { - mark(k.Bytes(), dal.SubAttrNumscriptVersion) - } - - for k := range needs.NumscriptContents { - mark(k.Bytes(), dal.SubAttrNumscriptContent) - } - - for k := range needs.PreparedQueries { - mark(k.Bytes(), dal.SubAttrPreparedQuery) - } - - for k := range needs.LedgerMetadata { - mark(k.Bytes(), dal.SubAttrLedgerMetadata) - } - - for k := range needs.Indexes { - mark(k.Bytes(), dal.SubAttrIndex) + for attrCode, set := range needs.Attributes { + for canonical := range set.Keys { + mark([]byte(canonical), attrCode) + } } } diff --git a/internal/infra/plan/coverage_bits_test.go b/internal/infra/plan/coverage_bits_test.go index 1fbf322312..2d20a1f0c7 100644 --- a/internal/infra/plan/coverage_bits_test.go +++ b/internal/infra/plan/coverage_bits_test.go @@ -39,8 +39,8 @@ func TestBitsForNeeds_SameCanonicalDifferentAttrCode(t *testing.T) { } needs := NewNeeds() - needs.Ledgers[domain.LedgerKey{Name: ledgerName}] = struct{}{} - needs.Boundaries[domain.LedgerKey{Name: ledgerName}] = struct{}{} + needs.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerName}.Bytes()) + needs.Add(dal.SubAttrBoundary, domain.LedgerKey{Name: ledgerName}.Bytes()) require.Equal(t, []byte{0b11}, bitsForNeeds(needs, plans), "both Ledger (bit 0) and Boundary (bit 1) plans must be flagged even though they share a U128") @@ -64,70 +64,70 @@ func TestBitsForNeeds_CoversEveryNeedsKind(t *testing.T) { { attrCode: dal.SubAttrLedger, canonical: domain.LedgerKey{Name: "L"}.Bytes(), - add: func(n *Needs) { n.Ledgers[domain.LedgerKey{Name: "L"}] = struct{}{} }, + add: func(n *Needs) { n.Add(dal.SubAttrLedger, domain.LedgerKey{Name: "L"}.Bytes()) }, }, { attrCode: dal.SubAttrBoundary, canonical: domain.LedgerKey{Name: "L"}.Bytes(), - add: func(n *Needs) { n.Boundaries[domain.LedgerKey{Name: "L"}] = struct{}{} }, + add: func(n *Needs) { n.Add(dal.SubAttrBoundary, domain.LedgerKey{Name: "L"}.Bytes()) }, }, { attrCode: dal.SubAttrVolume, canonical: domain.VolumeKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Asset: "USD"}.Bytes(), add: func(n *Needs) { - n.Volumes[domain.VolumeKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Asset: "USD"}] = struct{}{} + n.Add(dal.SubAttrVolume, domain.VolumeKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Asset: "USD"}.Bytes()) }, }, { attrCode: dal.SubAttrReference, canonical: domain.TransactionReferenceKey{LedgerName: "L", Reference: "ref"}.Bytes(), add: func(n *Needs) { - n.References[domain.TransactionReferenceKey{LedgerName: "L", Reference: "ref"}] = struct{}{} + n.Add(dal.SubAttrReference, domain.TransactionReferenceKey{LedgerName: "L", Reference: "ref"}.Bytes()) }, }, { attrCode: dal.SubAttrMetadata, canonical: domain.MetadataKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Key: "k"}.Bytes(), add: func(n *Needs) { - n.Metadata[domain.MetadataKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Key: "k"}] = struct{}{} + n.Add(dal.SubAttrMetadata, domain.MetadataKey{AccountKey: domain.AccountKey{LedgerName: "L", Account: "a"}, Key: "k"}.Bytes()) }, }, { attrCode: dal.SubAttrTransaction, canonical: domain.TransactionKey{LedgerName: "L", ID: 1}.Bytes(), - add: func(n *Needs) { n.Transactions[domain.TransactionKey{LedgerName: "L", ID: 1}] = struct{}{} }, + add: func(n *Needs) { n.Add(dal.SubAttrTransaction, domain.TransactionKey{LedgerName: "L", ID: 1}.Bytes()) }, }, { attrCode: dal.SubAttrSinkConfig, canonical: domain.SinkConfigKey{Name: "s"}.Bytes(), - add: func(n *Needs) { n.SinkConfigs[domain.SinkConfigKey{Name: "s"}] = struct{}{} }, + add: func(n *Needs) { n.Add(dal.SubAttrSinkConfig, domain.SinkConfigKey{Name: "s"}.Bytes()) }, }, { attrCode: dal.SubAttrNumscriptVersion, canonical: domain.NumscriptVersionKey{LedgerName: "L", Name: "n"}.Bytes(), add: func(n *Needs) { - n.NumscriptVersions[domain.NumscriptVersionKey{LedgerName: "L", Name: "n"}] = struct{}{} + n.Add(dal.SubAttrNumscriptVersion, domain.NumscriptVersionKey{LedgerName: "L", Name: "n"}.Bytes()) }, }, { attrCode: dal.SubAttrNumscriptContent, canonical: domain.NumscriptEntryKey{LedgerName: "L", Name: "n", Version: "v"}.Bytes(), add: func(n *Needs) { - n.NumscriptContents[domain.NumscriptEntryKey{LedgerName: "L", Name: "n", Version: "v"}] = struct{}{} + n.Add(dal.SubAttrNumscriptContent, domain.NumscriptEntryKey{LedgerName: "L", Name: "n", Version: "v"}.Bytes()) }, }, { attrCode: dal.SubAttrPreparedQuery, canonical: domain.PreparedQueryKey{LedgerName: "L", Name: "q"}.Bytes(), add: func(n *Needs) { - n.PreparedQueries[domain.PreparedQueryKey{LedgerName: "L", Name: "q"}] = struct{}{} + n.Add(dal.SubAttrPreparedQuery, domain.PreparedQueryKey{LedgerName: "L", Name: "q"}.Bytes()) }, }, { attrCode: dal.SubAttrLedgerMetadata, canonical: domain.LedgerMetadataKey{LedgerName: "L", Key: "k"}.Bytes(), add: func(n *Needs) { - n.LedgerMetadata[domain.LedgerMetadataKey{LedgerName: "L", Key: "k"}] = struct{}{} + n.Add(dal.SubAttrLedgerMetadata, domain.LedgerMetadataKey{LedgerName: "L", Key: "k"}.Bytes()) }, }, } @@ -178,7 +178,7 @@ func TestBitsForNeeds_TracksPlanPosition(t *testing.T) { } needs := NewNeeds() - needs.Ledgers[domain.LedgerKey{Name: ledgerName}] = struct{}{} + needs.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerName}.Bytes()) // First plan order: ledger at index 0 → bit 0. require.Equal(t, []byte{0b01}, bitsForNeeds(needs, []*raftcmdpb.AttributePlan{ledgerPlan, padding})) @@ -221,14 +221,14 @@ func TestApplyBits_SharesPlanIndexAcrossOperations(t *testing.T) { } needsA := NewNeeds() - needsA.Ledgers[domain.LedgerKey{Name: ledgerA}] = struct{}{} + needsA.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerA}.Bytes()) needsB := NewNeeds() - needsB.Ledgers[domain.LedgerKey{Name: ledgerB}] = struct{}{} + needsB.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerB}.Bytes()) needsAB := NewNeeds() - needsAB.Ledgers[domain.LedgerKey{Name: ledgerA}] = struct{}{} - needsAB.Ledgers[domain.LedgerKey{Name: ledgerB}] = struct{}{} + needsAB.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerA}.Bytes()) + needsAB.Add(dal.SubAttrLedger, domain.LedgerKey{Name: ledgerB}.Bytes()) var ( gotA []byte diff --git a/internal/infra/plan/needs.go b/internal/infra/plan/needs.go index f7daae4207..8a0683d742 100644 --- a/internal/infra/plan/needs.go +++ b/internal/infra/plan/needs.go @@ -4,119 +4,127 @@ import ( "github.com/formancehq/ledger/v3/internal/domain" ) -// Needs describes the preload requirements for a command. -// All attribute types use map[K]struct{} and are resolved via attrs.*.Get. -type Needs struct { - Ledgers map[domain.LedgerKey]struct{} - Boundaries map[domain.LedgerKey]struct{} - Volumes map[domain.VolumeKey]struct{} - IdempotencyKeys map[domain.IdempotencyKey]struct{} - References map[domain.TransactionReferenceKey]struct{} - Metadata map[domain.MetadataKey]struct{} - Transactions map[domain.TransactionKey]struct{} - SinkConfigs map[domain.SinkConfigKey]struct{} - NumscriptVersions map[domain.NumscriptVersionKey]struct{} - NumscriptContents map[domain.NumscriptEntryKey]struct{} - PreparedQueries map[domain.PreparedQueryKey]struct{} - LedgerMetadata map[domain.LedgerMetadataKey]struct{} - Indexes map[domain.IndexKey]struct{} +// AttributeSet is the per-attribute-code preload requirement: the set of +// canonical key bytes admission wants covered. The resolver emits Value +// (Pebble seed) or Declare (coverage-only) plans for these based on the +// CheckCache verdict. +// +// Keyed by string(canonical) — Go requires comparable map keys and +// []byte is not comparable. The string is a memory-cheap view (no copy +// semantics apply here since keys arrive already-allocated from each +// domain key's Bytes() method). +type AttributeSet struct { + Keys map[string]struct{} } -// TotalKeys returns the total number of keys across all need types, -// including IdempotencyKeys. Used by admission metrics that account for -// every key the preload pipeline handles. -func (n *Needs) TotalKeys() int { - return n.AttributeKeysCount() + len(n.IdempotencyKeys) +// Needs describes the preload requirements for a command. +// +// Attributes[attrCode] holds the per-cache preload requirement, where +// attrCode is a dal.SubAttr* byte constant. A missing entry means "no +// preload for that cache in this proposal". This shape collapses what +// used to be 13 typed key maps into a single generic dispatch keyed by +// attribute code — the same code the FSM uses to route through +// AttributePlan.attr_code. +// +// IdempotencyKeys stay separate: they do not live in the attribute +// cache (they have a dedicated IdempotencyStore), so the resolver +// pipeline treats them via its own load path and they carry no +// bloom-filter / rotation semantics. +type Needs struct { + Attributes map[byte]*AttributeSet + IdempotencyKeys map[domain.IdempotencyKey]struct{} } -// AttributeKeysCount returns the total number of cache-attribute keys — -// i.e. every key that consults the in-memory cache at apply time. -// Idempotency keys live in the IdempotencyStore (not the cache), so -// they are excluded: a proposal with idempotency keys only does not -// need the cache-epoch revalidation that the slow path performs. The -// runner uses this count to gate runWithoutPreload, so idempotency-only -// proposals (maintenance, signing, chapter schedule) take the fast -// path and avoid spurious ErrStaleProposal on cluster-config resets. -func (n *Needs) AttributeKeysCount() int { - return len(n.Ledgers) + len(n.Boundaries) + len(n.Volumes) + - len(n.References) + len(n.Metadata) + len(n.Transactions) + - len(n.SinkConfigs) + len(n.NumscriptVersions) + - len(n.NumscriptContents) + len(n.PreparedQueries) + - len(n.LedgerMetadata) + len(n.Indexes) +// Add records `canonical` under attrCode's Keys set. Idempotent. +// Callers pass the result of the domain key's Bytes() method. +func (n *Needs) Add(attrCode byte, canonical []byte) { + n.set(attrCode).Keys[string(canonical)] = struct{}{} } -// Merge unions every key set from src into dst. Used by admission to roll -// per-order Needs into a single proposal-wide Needs while keeping the -// per-order slice available for coverage_bits computation. -func (n *Needs) Merge(src *Needs) { - for k := range src.Ledgers { - n.Ledgers[k] = struct{}{} +// Has reports whether `canonical` is in attrCode's Keys set. +// Primarily a test helper (production reads iterate via AttributeSet +// directly). +func (n *Needs) Has(attrCode byte, canonical []byte) bool { + set, ok := n.Attributes[attrCode] + if !ok { + return false } - for k := range src.Boundaries { - n.Boundaries[k] = struct{}{} - } + _, ok = set.Keys[string(canonical)] - for k := range src.Volumes { - n.Volumes[k] = struct{}{} - } + return ok +} - for k := range src.IdempotencyKeys { - n.IdempotencyKeys[k] = struct{}{} +// Count returns the number of Keys declared for attrCode (0 if the +// attrCode has no entry). Test helper. +func (n *Needs) Count(attrCode byte) int { + set, ok := n.Attributes[attrCode] + if !ok { + return 0 } - for k := range src.References { - n.References[k] = struct{}{} - } + return len(set.Keys) +} - for k := range src.Metadata { - n.Metadata[k] = struct{}{} +func (n *Needs) set(attrCode byte) *AttributeSet { + if n.Attributes == nil { + n.Attributes = make(map[byte]*AttributeSet) } - for k := range src.Transactions { - n.Transactions[k] = struct{}{} + set, ok := n.Attributes[attrCode] + if !ok { + set = &AttributeSet{ + Keys: make(map[string]struct{}), + } + n.Attributes[attrCode] = set } - for k := range src.SinkConfigs { - n.SinkConfigs[k] = struct{}{} - } + return set +} - for k := range src.NumscriptVersions { - n.NumscriptVersions[k] = struct{}{} - } +// TotalKeys returns the total number of keys across all attribute +// caches AND idempotency keys — every key the preload pipeline handles. +func (n *Needs) TotalKeys() int { + return n.AttributeKeysCount() + len(n.IdempotencyKeys) +} - for k := range src.NumscriptContents { - n.NumscriptContents[k] = struct{}{} +// AttributeKeysCount returns the total number of cache-attribute keys. +// Idempotency keys are excluded: a proposal with idempotency keys only +// does not need the cache-epoch revalidation that the slow path +// performs. The runner uses this count to gate runWithoutPreload, so +// idempotency-only proposals (maintenance, signing, chapter schedule) +// take the fast path and avoid spurious ErrStaleProposal on +// cluster-config resets. +func (n *Needs) AttributeKeysCount() int { + total := 0 + for _, set := range n.Attributes { + total += len(set.Keys) } - for k := range src.PreparedQueries { - n.PreparedQueries[k] = struct{}{} - } + return total +} - for k := range src.LedgerMetadata { - n.LedgerMetadata[k] = struct{}{} +// Merge unions every key set from src into dst. Used by admission to +// roll per-order Needs into a single proposal-wide Needs while keeping +// the per-order slice available for coverage_bits computation. +func (n *Needs) Merge(src *Needs) { + for attrCode, set := range src.Attributes { + dst := n.set(attrCode) + for k := range set.Keys { + dst.Keys[k] = struct{}{} + } } - for k := range src.Indexes { - n.Indexes[k] = struct{}{} + for k := range src.IdempotencyKeys { + n.IdempotencyKeys[k] = struct{}{} } } -// NewNeeds creates a Needs with all maps initialized. +// NewNeeds creates a Needs with initialized maps. Per-attribute sets +// are created lazily on first Add so an empty Needs stays cheap. func NewNeeds() *Needs { return &Needs{ - Ledgers: make(map[domain.LedgerKey]struct{}), - Boundaries: make(map[domain.LedgerKey]struct{}), - Volumes: make(map[domain.VolumeKey]struct{}), - IdempotencyKeys: make(map[domain.IdempotencyKey]struct{}), - References: make(map[domain.TransactionReferenceKey]struct{}), - Metadata: make(map[domain.MetadataKey]struct{}), - Transactions: make(map[domain.TransactionKey]struct{}), - SinkConfigs: make(map[domain.SinkConfigKey]struct{}), - NumscriptVersions: make(map[domain.NumscriptVersionKey]struct{}), - NumscriptContents: make(map[domain.NumscriptEntryKey]struct{}), - PreparedQueries: make(map[domain.PreparedQueryKey]struct{}), - LedgerMetadata: make(map[domain.LedgerMetadataKey]struct{}), - Indexes: make(map[domain.IndexKey]struct{}), + Attributes: make(map[byte]*AttributeSet), + IdempotencyKeys: make(map[domain.IdempotencyKey]struct{}), } } diff --git a/internal/infra/plan/resolve.go b/internal/infra/plan/resolve.go index dfebafe987..0de1eaded4 100644 --- a/internal/infra/plan/resolve.go +++ b/internal/infra/plan/resolve.go @@ -23,11 +23,11 @@ const resolveParallelism = 16 // resolveResult holds the output of a resolve call: a flat list of // per-attribute plans plus the tracker keys. Each AttributePlan carries -// exactly one intent — Preload (value loaded from Pebble), Touch -// (Gen1->Gen0 promotion), or Declare (key already in Gen0 at build time, -// pure coverage declaration). The FSM-side preload.View consumes the -// whole list as its coverage set so reads on declared keys never trip a -// false-positive "not preloaded" miss. +// exactly one intent — Value (loaded from Pebble), Touch (Gen1→Gen0 +// promotion at apply), or Declare (already in Gen0 or verified absent — +// coverage only). The FSM-side preload.View consumes the whole list as +// its coverage set so reads on declared keys never trip a false-positive +// "not preloaded" miss. // // Idempotency keys live on the parallel idempotencyKeys slice — they are // NOT cache attributes (the FSM applies them to a dedicated store) so @@ -39,10 +39,8 @@ type resolveResult struct { } // declarePlan returns an AttributePlan whose intent is Declare: the key -// was already in Gen0 (or was a bloom-confirmed absent miss the proposer -// still wants the View to admit). No FSM-side cache mutation. attr_code -// lives on the plan envelope, AttributeID.Tag carries the xxh3 collision -// tag. +// is either already in Gen0 or verified absent, so no FSM-side cache +// mutation is required. func declarePlan(id attributes.U128, attrCode byte, tag uint64) *raftcmdpb.AttributePlan { return &raftcmdpb.AttributePlan{ Id: &raftcmdpb.AttributeID{Id: id[:], Tag: tag}, @@ -52,7 +50,7 @@ func declarePlan(id attributes.U128, attrCode byte, tag uint64) *raftcmdpb.Attri } // touchPlan returns an AttributePlan whose intent is Touch: the FSM must -// promote this key from Gen1 to Gen0. +// promote this key from Gen1 to Gen0 before order handlers run. func touchPlan(id attributes.U128, attrCode byte, tag uint64) *raftcmdpb.AttributePlan { return &raftcmdpb.AttributePlan{ Id: &raftcmdpb.AttributeID{Id: id[:], Tag: tag}, @@ -72,31 +70,27 @@ func preloadPlan(attrID *raftcmdpb.AttributeID, attrCode byte, value *raftcmdpb. } } -// resolveAttributePreload resolves a standard attribute type (loaded via attrs.*.Get). -// For each key it emits one AttributePlan: declare (already Gen0, absent in -// Pebble, or bloom-confirmed absent), touch (Gen1->Gen0 promotion), or -// preload (value loaded from store). Keys are resolved with bounded -// parallelism to amortize I/O latency. +// resolveAttributePreload resolves one attribute cache for the plan +// pipeline. Keys are passed as canonical byte strings (see AttributeSet +// on Needs) — no K generic; the (attrCode, canonical) pair is the whole +// identity. // -// The Preload variant carries a vtproto-marshaled raw_value blob keyed by -// attrCode — the FSM unmarshals it through the same attrCode dispatch. +// For each key, resolveAttributePreload emits ONE AttributePlan based on +// admission's CheckCache verdict: // -// Absent keys uniformly produce a Declare plan: the coverage gate admits -// the read, the cache stays empty, and the FSM-side `Scope.GetX` returns -// `domain.ErrNotFound`, which each reader interprets per attribute (the -// canonical pattern is `errors.Is(err, ErrNotFound) → "doesn't exist"`, -// see `GetPreparedQuery` / `GetNumscriptLatestVersion`). The cache is -// never seeded with a typed-nil placeholder — that pattern existed for -// Volume (EN-1378, dropped) and for the Numscript / PreparedQuery -// attributes whose readers relied on it implicitly (now ported to -// explicit ErrNotFound handling, same commit). -func resolveAttributePreload[K interface { - comparable - Bytes() []byte -}, T interface { +// - CacheUnreachable → ErrCacheHorizonExceeded (admission rejection). +// - CacheGuaranteed → Declare (cache already has the key in Gen0 at +// apply time — no cache mutation needed). +// - CacheNeedsTouch → Touch (key sits in Gen1; the FSM promotes it back +// to Gen0 before order handlers run). +// - CacheMiss + bloom/Pebble-absent → Declare (coverage-only). +// - CacheMiss + Pebble-load-hit → Value(v) (MirrorPreload seeds gen0+gen1). +// +// Keys are resolved with bounded parallelism to amortize I/O latency. +func resolveAttributePreload[T interface { MarshalVT() ([]byte, error) }]( - keys map[K]struct{}, + keys map[string]struct{}, nextIndex, boundary, cacheEpoch uint64, attrCache *cache.AttributeCache[T], loader *preload.AttributeLoader[T], @@ -118,18 +112,16 @@ func resolveAttributePreload[K interface { sem := make(chan struct{}, resolveParallelism) for key := range keys { - canonicalKey := key.Bytes() + canonicalKey := []byte(key) id, tag := attributes.MakeKey(canonicalKey) switch attrCache.CheckCache(nextIndex, id) { case cache.CacheUnreachable: // Admission predicts ≥2 rotations between propose and apply: a - // preload computed now would be rotated out before the FSM read. - // Record the rejection but continue processing so the wg.Wait() + // preload computed now would be rotated out before the FSM reads + // it. Record the rejection but continue processing so wg.Wait() // below drains any CacheMiss loader goroutine earlier iterations - // already launched — an immediate return would race with those - // goroutines' appends to plans/tracker and leak their - // AttributeLoader entries past the caller's cleanup token. + // already launched. if logger.Enabled(logging.TraceLevel) { logger.WithFields(map[string]any{ "type": typeName, @@ -148,13 +140,9 @@ func resolveAttributePreload[K interface { continue case cache.CacheGuaranteed: - // Record the declaration so the FSM-side preload.View admits - // reads on this key. The apply path triggers no cache mutation - // — the value is already in Gen0 on every node. - // - // Hold mu: while this loop iteration is sequential, earlier - // iterations may have spawned CacheMiss goroutines that append - // to the same attributes slice concurrently. + // Key is already in Gen0 and will still be there at apply. + // Emit Declare — the FSM's coverage view admits the read; no + // mutation required. mu.Lock() plans = append(plans, declarePlan(id, attrCode, tag)) mu.Unlock() @@ -162,6 +150,10 @@ func resolveAttributePreload[K interface { continue case cache.CacheNeedsTouch: + // Key sits in Gen1 (predicted-apply lands in the next + // generation). Emit Touch so the FSM promotes Gen1 → Gen0 + // before the order runs — otherwise the next rotation would + // drop the entry. if logger.Enabled(logging.TraceLevel) { logger.WithFields(map[string]any{ "type": typeName, @@ -178,10 +170,9 @@ func resolveAttributePreload[K interface { continue case cache.CacheMiss: - // Bloom filter short-circuit: when the key is definitely not in - // Pebble, skip the goroutine + Pebble Get and emit a Declare - // (coverage-only). The FSM-side Scope.GetX returns ErrNotFound - // on read; each reader interprets that as "doesn't exist". + // Bloom filter short-circuit: when the key is definitely not + // in Pebble, skip the goroutine + Pebble Get and emit Declare + // (coverage-only, no value to seed). if bloomFilter != nil && !bloomFilter.MayContain(id) { mu.Lock() plans = append(plans, declarePlan(id, attrCode, tag)) @@ -249,13 +240,12 @@ func resolveAttributePreload[K interface { } plans = append(plans, preloadPlan(attrID, attrCode, attrValue)) - } else { - // Pebble had no value for this key but the proposer - // declared it. Emit a Declare so the FSM-side View - // admits the read; the underlying KeyStore returns - // ErrNotFound for the legitimate-absence case. - plans = append(plans, declarePlan(id, attrCode, tag)) + + return } + + // Pebble had no value either — coverage-only Declare. + plans = append(plans, declarePlan(id, attrCode, tag)) }) } }