test(replication): authoritative-table blob byte-integrity (drop #1281 repair, stacks on #368)#371
test(replication): authoritative-table blob byte-integrity (drop #1281 repair, stacks on #368)#371kriszyp wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a receive-side resume-cursor clamp mechanism in replicationConnection.ts to prevent the replication resume cursor from advancing past un-saved or failed blobs, alongside new integration tests and diagnostics to verify recovery from transient blob save failures. The reviewer identified a critical issue where initializing lastDurableSequenceId to 0 on startup could incorrectly reset the replication resume cursor to 0 if a blob is outstanding or fails on the first transaction after reconnecting. To resolve this, the reviewer suggested lazily initializing lastDurableSequenceId from the persisted sequence ID in dbisDB and provided code suggestions to update the sequence-id update branches and onCommit handler.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| let hasBlobGap = false; | ||
| let lastDurableSequenceId = 0; | ||
| // The cursor must not advance past a sequence id whose blobs are not yet durable — whether a blob has | ||
| // already failed (`hasBlobGap`) OR is still in flight and might fail. A sequence-update can arrive while | ||
| // a slow blob (e.g. one about to hit the stream timeout) is still saving; advancing on it would push the | ||
| // cursor past a blob that then fails, before `hasBlobGap` is ever set. The data-record onCommit awaits | ||
| // `outstandingBlobsToFinish` before it clamps, so it only needs `hasBlobGap`; the sequence-update sites | ||
| // don't await, so they also gate on outstanding blobs. | ||
| const cursorBlockedByBlob = () => hasBlobGap || outstandingBlobsToFinish.length > 0; |
There was a problem hiding this comment.
Initializing lastDurableSequenceId to 0 on startup can cause the replication resume cursor to be incorrectly reset to 0 if a blob is outstanding or fails on the very first transaction after a reconnect. Since cursorBlockedByBlob() will be true, the connection will clamp the cursor to lastDurableSequenceId (which is 0), overwriting the persisted resume cursor in the database and forcing replication to start over from the beginning of time.
We should lazily initialize lastDurableSequenceId from the persisted sequence ID in dbisDB for the connected node(s) when it is first needed.
let hasBlobGap = false;
let lastDurableSequenceId = 0;
const getLastDurableSequenceId = () => {
if (lastDurableSequenceId === 0 && tableSubscriptionToReplicator?.dbisDB && receivingDataFromNodeIds) {
for (const nodeId of receivingDataFromNodeIds) {
const sequenceEntry = tableSubscriptionToReplicator.dbisDB.get([Symbol.for('seq'), nodeId]);
if (sequenceEntry?.seqId > lastDurableSequenceId) {
lastDurableSequenceId = sequenceEntry.seqId;
}
}
}
return lastDurableSequenceId;
};
// The cursor must not advance past a sequence id whose blobs are not yet durable — whether a blob has
// already failed (hasBlobGap) OR is still in flight and might fail. A sequence-update can arrive while
// a slow blob (e.g. one about to hit the stream timeout) is still saving; advancing on it would push the
// cursor past a blob that then fails, before hasBlobGap is ever set. The data-record onCommit awaits
// outstandingBlobsToFinish before it clamps, so it only needs hasBlobGap; the sequence-update sites
// don't await, so they also gate on outstanding blobs.
const cursorBlockedByBlob = () => hasBlobGap || outstandingBlobsToFinish.length > 0;| lastSequenceIdReceived = data; | ||
| tableSubscriptionToReplicator.send({ | ||
| type: 'end_txn', | ||
| localTime: lastSequenceIdReceived, | ||
| // Clamp: a sequence-id update carries no commit/blob-durability gate, so while any blob is | ||
| // not yet durable it must not push the resume cursor past the last fully-durable point (same | ||
| // as the inline REMOTE_SEQUENCE_UPDATE branch below). | ||
| localTime: cursorBlockedByBlob() ? lastDurableSequenceId : lastSequenceIdReceived, | ||
| remoteNodeIds: receivingDataFromNodeIds, | ||
| }); |
There was a problem hiding this comment.
Update the sequence-id update branch to use getLastDurableSequenceId() instead of lastDurableSequenceId to ensure the cursor is clamped to the correct last durable sequence ID rather than 0 on reconnect.
| lastSequenceIdReceived = data; | |
| tableSubscriptionToReplicator.send({ | |
| type: 'end_txn', | |
| localTime: lastSequenceIdReceived, | |
| // Clamp: a sequence-id update carries no commit/blob-durability gate, so while any blob is | |
| // not yet durable it must not push the resume cursor past the last fully-durable point (same | |
| // as the inline REMOTE_SEQUENCE_UPDATE branch below). | |
| localTime: cursorBlockedByBlob() ? lastDurableSequenceId : lastSequenceIdReceived, | |
| remoteNodeIds: receivingDataFromNodeIds, | |
| }); | |
| lastSequenceIdReceived = data; | |
| tableSubscriptionToReplicator.send({ | |
| type: 'end_txn', | |
| // Clamp: a sequence-id update carries no commit/blob-durability gate, so while any blob is | |
| // not yet durable it must not push the resume cursor past the last fully-durable point (same | |
| // as the inline REMOTE_SEQUENCE_UPDATE branch below). | |
| localTime: cursorBlockedByBlob() ? getLastDurableSequenceId() : lastSequenceIdReceived, | |
| remoteNodeIds: receivingDataFromNodeIds, | |
| }); |
| replicationSharedStatus[RECEIVING_STATUS_POSITION] = RECEIVING_STATUS_WAITING; | ||
| tableSubscriptionToReplicator.send({ | ||
| type: 'end_txn', | ||
| localTime: lastSequenceIdReceived, | ||
| // Clamp: an empty sequence update carries no commit/blob-durability gate, so while any | ||
| // blob is not yet durable it must not push the resume cursor past the last fully-durable point. | ||
| localTime: cursorBlockedByBlob() ? lastDurableSequenceId : lastSequenceIdReceived, | ||
| remoteNodeIds: receivingDataFromNodeIds, | ||
| }); |
There was a problem hiding this comment.
Update the remote sequence update branch to use getLastDurableSequenceId() instead of lastDurableSequenceId to ensure the cursor is clamped to the correct last durable sequence ID rather than 0 on reconnect.
| replicationSharedStatus[RECEIVING_STATUS_POSITION] = RECEIVING_STATUS_WAITING; | |
| tableSubscriptionToReplicator.send({ | |
| type: 'end_txn', | |
| localTime: lastSequenceIdReceived, | |
| // Clamp: an empty sequence update carries no commit/blob-durability gate, so while any | |
| // blob is not yet durable it must not push the resume cursor past the last fully-durable point. | |
| localTime: cursorBlockedByBlob() ? lastDurableSequenceId : lastSequenceIdReceived, | |
| remoteNodeIds: receivingDataFromNodeIds, | |
| }); | |
| replicationSharedStatus[RECEIVING_STATUS_POSITION] = RECEIVING_STATUS_WAITING; | |
| tableSubscriptionToReplicator.send({ | |
| type: 'end_txn', | |
| // Clamp: an empty sequence update carries no commit/blob-durability gate, so while any | |
| // blob is not yet durable it must not push the resume cursor past the last fully-durable point. | |
| localTime: cursorBlockedByBlob() ? getLastDurableSequenceId() : lastSequenceIdReceived, | |
| remoteNodeIds: receivingDataFromNodeIds, | |
| }); |
| if (hasBlobGap) { | ||
| // A blob save in (or before) this batch failed (swallowed by the receiveBlobs .catch so it | ||
| // never escapes as an uncaughtException, which is why this Promise.all resolved cleanly). | ||
| // Clamp the persisted resume cursor to the last fully-durable transaction instead of letting | ||
| // it advance over the gap: core Table.ts reads `event.localTime` right after this onCommit to | ||
| // persist the cursor, so lowering it here pins the cursor at `lastDurableSequenceId`. Records | ||
| // keep flowing live (no drops, no teardown); the next reconnect/restart resumes from the | ||
| // clamped cursor and the normal stream re-delivers — and re-saves — the disrupted blob. Without | ||
| // the clamp the cursor advances past the missing blob and it is lost permanently across restarts. | ||
| endTxnEvent.localTime = lastDurableSequenceId; | ||
| } else { | ||
| // This batch (and everything before it) is committed with all blobs durable; it's safe to let | ||
| // the resume cursor advance to here. | ||
| lastDurableSequenceId = Math.max(lastDurableSequenceId, endTxnEvent.localTime ?? 0); | ||
| } |
There was a problem hiding this comment.
Update the onCommit handler to use getLastDurableSequenceId() instead of lastDurableSequenceId to ensure the cursor is clamped to the correct last durable sequence ID rather than 0 on reconnect.
| if (hasBlobGap) { | |
| // A blob save in (or before) this batch failed (swallowed by the receiveBlobs .catch so it | |
| // never escapes as an uncaughtException, which is why this Promise.all resolved cleanly). | |
| // Clamp the persisted resume cursor to the last fully-durable transaction instead of letting | |
| // it advance over the gap: core Table.ts reads `event.localTime` right after this onCommit to | |
| // persist the cursor, so lowering it here pins the cursor at `lastDurableSequenceId`. Records | |
| // keep flowing live (no drops, no teardown); the next reconnect/restart resumes from the | |
| // clamped cursor and the normal stream re-delivers — and re-saves — the disrupted blob. Without | |
| // the clamp the cursor advances past the missing blob and it is lost permanently across restarts. | |
| endTxnEvent.localTime = lastDurableSequenceId; | |
| } else { | |
| // This batch (and everything before it) is committed with all blobs durable; it's safe to let | |
| // the resume cursor advance to here. | |
| lastDurableSequenceId = Math.max(lastDurableSequenceId, endTxnEvent.localTime ?? 0); | |
| } | |
| if (hasBlobGap) { | |
| // A blob save in (or before) this batch failed (swallowed by the receiveBlobs .catch so it | |
| // never escapes as an uncaughtException, which is why this Promise.all resolved cleanly). | |
| // Clamp the persisted resume cursor to the last fully-durable transaction instead of letting | |
| // it advance over the gap: core Table.ts reads event.localTime right after this onCommit to | |
| // persist the cursor, so lowering it here pins the cursor at lastDurableSequenceId. Records | |
| // keep flowing live (no drops, no teardown); the next reconnect/restart resumes from the | |
| // clamped cursor and the normal stream re-delivers — and re-saves — the disrupted blob. Without | |
| // the clamp the cursor advances past the missing blob and it is lost permanently across restarts. | |
| endTxnEvent.localTime = getLastDurableSequenceId(); | |
| } else { | |
| // This batch (and everything before it) is committed with all blobs durable; it's safe to let | |
| // the resume cursor advance to here. | |
| lastDurableSequenceId = Math.max(getLastDurableSequenceId(), endTxnEvent.localTime ?? 0); | |
| } |
|
Reviewed; no blockers found. |
…o deadlock)
Receive-side blob-gap handling now uses an async durability watermark that
fixes BOTH the original data-loss bug AND the receive/apply deadlock.
Previously, the no-data-loss clamp ran a synchronous
`await Promise.all(outstandingBlobsToFinish)` inside the end_txn onCommit
before persisting the resume cursor. Combined with receive backpressure
(`waitForDrain` / `ws.pause()` at the high-water mark), this produced a
circular wait: onCommit waited for blobs <- blobs waited for BLOB_CHUNK
frames <- those frames were paused behind a drain-blocked data frame <-
the apply loop never drained because it was parked in onCommit. A blob-gap
during heavy catch-up could wedge the receiver permanently.
This replaces the synchronous wait with a durability watermark:
* committedSequence - highest sequence (end_txn localTime/version) the
apply loop has committed. Commit == visibility; advances synchronously.
* lastDurableSequenceId - the durable watermark = highest committed
sequence whose blobs (and all earlier blobs) are durably saved. This is
what we persist as the resume cursor.
The watermark advances to committedSequence only when there is no in-flight
blob AND no gap, in exactly two places:
- non-copy onCommit, when outstandingBlobsToFinish is empty (the common
blob-less / already-saved case advances the cursor immediately); and
- the blob save `.finally` success path, after the splice, when the last
in-flight blob drains.
A failed save sets hasBlobGap in the `.catch`, which holds the watermark
(advance gated on !hasBlobGap) until a reconnect re-streams the blob.
The apply loop no longer blocks on blobs, so the deadlock cannot occur. The
cursor only ever advances to a sequence whose blobs are all durable, so the
no-data-loss guarantee is preserved: on crash/restart, resume from the
watermark re-streams any record whose blob wasn't durable. Backpressure is
unchanged and BLOB_CHUNK frames stay on the serialized message chain (no
off-chain dispatch), so there is no unbounded buffering / OOM.
COPY MODE retains the synchronous blob-wait before the copy-cursor put /
maybeFinishCopy(): copy is the lower-frequency initial bulk-copy path and the
deadlock was observed in non-copy catch-up, so copy durability is preserved
exactly as before. Only the non-copy catch-up/live path uses the watermark.
The COMMITTED_UPDATE receipt is now clamped to the durable watermark
(min(committed, lastDurableSequenceId)) so we never tell the sender we
durably hold past the watermark now that onCommit no longer awaits blobs.
The two sequence-update sites continue to clamp to lastDurableSequenceId via
cursorBlockedByBlob(); that remains correct under the watermark.
Tests:
- integrationTests/cluster/blobGapDeadlock.test.mjs (ported, heavy/stress-
gated): drives a caching blob table with a transient receive-side blob
save failure and asserts B converges (no wedge). Converges with 8 real
injected blob-save failures and zero stall.
- replicationBlobResyncOnFailure.test.mjs (existing) still passes: the
follower re-streams + re-saves the disrupted blob after restart.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ive-side save failure Adds a stress-gated integration regression guard under integrationTests/cluster/ for the authoritative (non-caching) blob path: a receive-side blob save fails on the follower mid-stream, the follower restarts, and after the watermark-driven re-stream every record's file-backed blob must be present, full-size, and byte-for-byte correct -- verified with the SOURCE node offline so a read cannot re-source and mask a missing blob. - fixture-large-blob-authoritative: a plain @table @export AuthLocation (NO sourcedFrom) with a SeedAuthLocation GET endpoint that writes deterministic 50 KB file-backed blobs, and an AuthLocationImage resource serving the raw bytes for byte-exact verification. The component is deployed to BOTH nodes (replicated to the leader for schema+data, and explicitly to the follower so it serves the REST export used by the integrity check). - reuses #368's fixture-blob-fail-transient injector to fail one receive-side blob save. Stacks on #368 (the blob-gap durability watermark): this test passes on the watermark receive path -- the follower converges with no wedge and the disrupted record's blob is re-saved by the natural same-version overwrite of the re-streamed record. NOTE: this commit drops the core-side repair from harper PR #1281. That PR added a dedicated repair at the identity-tie duplicate-drop in core Table._writeUpdate, on the theory that the re-streamed authoritative record arrives as an identity-tie duplicate and is dropped, leaving the row's blob reference dangling. Empirical testing on the watermark-based #368 path showed otherwise: across repeated runs the disrupted record's blob is reliably re-saved by the natural same-version overwrite (the audit-walk auditStore.get lookup that gated the repair branch reliably misses, so the record never reaches the tie-drop), and the repair branch never fired. The core submodule pointer is therefore reverted to #368's base (no repair), and this test is retained as the lasting value: it guards the data-integrity OUTCOME rather than the mechanism. See PR #1281 for the disposition. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
19f8ec1 to
8fa7c5b
Compare
|
Scope note for review: the core repair (harper #1281) was dropped — authoritative blob integrity is already maintained by #368's watermark + the natural same-version overwrite (verified byte-exact across runs). This PR is now purely an authoritative-table byte-integrity regression test, stacked on #368. Two residuals worth a reviewer eye (neither blocks; the test guards the outcome regardless of internal path):
|
|
Marking ready for review. CI note: the failing |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Summary
Adds a stress-gated integration byte-integrity regression test for the authoritative
(non-caching) blob path, and — based on the empirical result below — drops the core-side
repair (harper #1281): the
coresubmodule pointer is reverted to #368's base (norepair). This PR stacks on #368 (the blob-gap durability watermark) and reuses its
fixture-blob-fail-transientinjector.The decision: drop the dedicated repair, keep the test
The original plan added a dedicated repair in core
Table._writeUpdate(#1281) at theidentity-tie duplicate-drop, on the theory that the re-streamed authoritative record arrives
as an identity-tie duplicate, is dropped, and leaves the row's blob reference dangling.
With the receive-path deadlock now fixed by #368's async durability watermark, I rebased
this work onto #368 and tested the authoritative path empirically. The repair turned out to be
unnecessary:
natural same-version overwrite of the re-streamed record. The follower converges (80/80, no
wedge) and, with the source node offline, every blob is byte-for-byte intact — including
the one whose receive-side save was injected to fail.
walk-break / overwrite showed the existing-row-has-missing-blob condition was never true
at any apply-commit decision: the audit-walk
auditStore.get(...)lookup that gated therepair reliably misses on this live re-stream, so the record never reaches the tie-drop
and falls through to the natural overwrite (which re-saves the blob to a fresh fileId).
the row points at a new, present fileId.
So the repair sat behind a lookup that reliably misses, and the path it guarded already
self-heals via the natural overwrite. The lasting value is the test, which asserts the
data-integrity outcome (every authoritative blob intact, no re-source masking) rather than
the mechanism.
What's here
fixture-large-blob-authoritative/— a plain@table @exportAuthLocation(nosourcedFrom) with aSeedAuthLocationGET endpoint writing deterministic 50 KBfile-backed blobs, and an
AuthLocationImageresource serving raw bytes for byte-exactverification. Authoritative so a read on the follower cannot re-source/mask a missing blob.
replicationBlobRepairAuthoritative.test.mjs— drives A→B replication, injects onetransient blob-save failure on B, restarts B disarmed, waits for convergence, and asserts
every blob is byte-for-byte intact on B with A offline. The component is deployed to
both nodes (replicated to A for schema+data; explicitly to B so it serves the REST export
the integrity check reads). Now passing; gated on
HARPER_RUN_STRESS_TESTS=1.coresubmodule pointer reverted to fix(replication): watermark-based blob-gap handling (no data loss + no deadlock) #368's base (the repair commit is not referenced).Disposition of harper #1281
#1281 (the core repair) is now recommended to be closed / converted — its repair is
redundant on the #368 watermark path. Left in draft for that decision; this PR no longer
depends on it.
Test result
3 consecutive validation runs after dropping the repair: PASS —
injected=1, B converges80/80,
byte-integrity verified=80/80, failures(0).🤖 Generated with Claude Code
Cross-model review pending (main session)