Skip to content

[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721

Open
shlomitubul wants to merge 3 commits into
apache:mainfrom
shlomitubul:worker-preserve-partial-commit-on-timeout-main
Open

[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721
shlomitubul wants to merge 3 commits into
apache:mainfrom
shlomitubul:worker-preserve-partial-commit-on-timeout-main

Conversation

@shlomitubul

@shlomitubul shlomitubul commented Jun 7, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This targets main and supersedes #3706 (which targeted branch-0.6).

When celeborn.worker.commitFiles.timeout fires, Controller cancels the per-partition commit tasks (future.cancel(true) / task.cancel(true)) and runs the handleAsync error branch. Today that branch:

  1. never calls context.reply() — the originating CommitFiles RPC is left unanswered. The driver only learns the outcome after its ask times out (celeborn.client.rpc.commitFiles.askTimeout, which falls back to celeborn.rpc.askTimeout, default 60s; often raised to 240s+), after which it re-asks with the same commit epoch and the worker's dedup returns the cached response;
  2. reports empty committed lists (COMMIT_FILE_EXCEPTION with all requested ids failed), discarding partitions that finished closing before the timer fired;
  3. never stops the COMMIT_FILES_TIME timer (latent leak);
  4. never releases the reserved slots / partition locations at commit time (they were only reclaimed later by the shuffle-expiry cleanup in Worker.cleanup).

This PR makes the cancel/timeout branch:

  • context.reply(response) immediately, so the driver doesn't wait out the ask timeout;
  • workerSource.stopTimer(COMMIT_FILES_TIME, shuffleKey);
  • release slots and remove partition locations before replying, mirroring the normal reply() path (previously they were held until shuffle expiry);
  • build the response from the actual commit state via a new pure helper Controller.buildCommitFilesResponseOnCancel, deriving the status from the failed lists materialized in the response: SUCCESS when nothing actually failed (every requested partition committed or closed empty before the snapshot — i.e. cancellation raced with completion), the unchanged COMMIT_FILE_EXCEPTION when nothing committed and nothing is empty, and PARTIAL_SUCCESS otherwise.

The failed lists are computed as requested − committed − empty, not just the explicitly-failed ids (see correctness note below).

Why are the changes needed?

Primary — eliminate the driver stall. On every commit timeout the driver currently blocks for a full commitFiles.askTimeout before the same-epoch retry retrieves the worker's cached response. Replying immediately removes that wait (and the redundant retry round). The stopTimer call fixes a timer leak on the same path. These two are unconditional wins on every timeout.

Secondary — make the partial-success path safe. Tasks still queued (or interrupted before reaching a terminal state) when cancellation fires land in none of the worker's committed / empty / failed sets. If such a partition were reported as neither committed nor failed, the driver would record it nowhere; checkDataLost keys only off the failed sets, so it would not flag it; collectResult would omit it from the reducer file group; and on the read side CelebornShuffleReader treats a partition absent from the file group as empty-and-valid (it filters the requested range to partitionGroups.containsKey(p)), so reducers would silently produce wrong results with no FetchFailedException. Computing failed = requested − committed − empty guarantees every not-actually-committed partition is reported failed, routing it through checkDataLostSHUFFLE_DATA_LOST → stage recompute. This is required to return PARTIAL_SUCCESS without regressing into silent data loss.

Status correctness. The driver records the worker in commitFilesFailedWorkers (→ WorkerStatusTracker.excludedWorkers) for any terminal status other than SUCCESS. When cancellation races with completion and the failed lists come out empty, the response is functionally identical to a success-path response, so the helper returns SUCCESS — otherwise the worker would be excluded from this client's slot allocations despite no actual failure. This is safe because the committed/empty sets are append-only: empty failed lists imply every requested partition had already reached a terminal good state at snapshot time.

Scope / honest limitations

Reduce-shuffle finalization is all-or-nothing: ReducePartitionCommitHandler.handleFinalCommitFiles calls collectResult (which populates the reducer file groups) only when checkDataLost returns false, and checkDataLost flags the whole shuffle if any primary failed (non-replicated) or any partition failed on both replicas. So this PR does not let reducers fetch the committed partitions while only the lost ones recompute — when a timeout leaves any partition uncommitted in a non-replicated shuffle, the whole map stage still recomputes, exactly as before. Preserving committed partitions avoids recompute only when the timeout left nothing actually uncommitted (e.g. the timer fired as the last file closed), and otherwise just keeps the worker's report truthful. The dependable, every-time benefits are the immediate reply and the timer fix.

Does this PR introduce any user-facing change?

No protocol or API change. StatusCode.PARTIAL_SUCCESS is already part of CommitFilesResponse and is already handled by the driver as a terminal, non-retry status (same branch as SUCCESS in CommitHandler.doParallelCommitFiles). Note that, like any non-SUCCESS terminal status, it records the worker in commitFilesFailedWorkers — which is why the helper returns SUCCESS when the failed lists are empty.

How was this patch tested?

New ControllerSuite unit tests for buildCommitFilesResponseOnCancel:

  • some committed, one empty, two still-queued/in-flight → PARTIAL_SUCCESS, committed preserved, the in-flight ids reported failed, the empty id not failed;
  • nothing committed → COMMIT_FILE_EXCEPTION with all requested ids failed;
  • nothing committed, some empty, some in-flight → PARTIAL_SUCCESS, empty ids not reported failed;
  • all requested partitions empty → SUCCESS with no failures;
  • every requested partition committed or empty before the snapshot (cancellation raced with completion) → SUCCESS, committed preserved, no failures.

Built and spotless:check-clean on worker with Java 17; all 5 suite tests pass.

🤖 Generated with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the worker-side CommitFiles timeout/cancellation handling in Controller to (a) reply immediately instead of leaving the driver waiting for its ask timeout, (b) stop the COMMIT_FILES_TIME timer on the timeout path, and (c) preserve partially-completed commit state by building a response from the actual committed/empty sets.

Changes:

  • Reply immediately and stop the commit-files timer when the async commit future is cancelled/times out.
  • Introduce Controller.buildCommitFilesResponseOnCancel to compute a truthful CommitFilesResponse from the observed commit state (committed/empty vs failed).
  • Add ControllerSuite unit tests covering partial-success and all-failed cancel responses.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Replies on cancel/timeout, stops timer, and adds helper to build cancel/timeout response from commit state.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala Adds unit tests for the new cancel/timeout response builder.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I traced the full data path on both the driver and reader sides — this is a solid, well-reasoned change. The two unconditional wins (immediate reply, timer-leak fix) are clearly correct, and the riskier partial-success logic is safe.

The safety argument holds (verified)

The crucial claim is that an in-flight partition reported as neither committed nor failed causes silent data loss, so failed = requested − committed − empty is required. Every link checks out:

  • CommitHandler.checkDataLost keys only off the failed sets — a partition absent from failedPrimaryPartitionIds/failedReplicaPartitionIds is never flagged.
  • collectResult only populates the reducer file group from committedPrimaryIds.
  • CelebornShuffleReader (// filter empty partition.filter(p => fileGroups.partitionGroups.containsKey(p))) silently skips a partition absent from the file group — no FetchFailedException.

So an in-flight partition absent from both sets is silently dropped at read time. Computing failed = requested − committed − empty correctly routes it through checkDataLost → SHUFFLE_DATA_LOST → recompute. PARTIAL_SUCCESS is confirmed terminal/non-retry (same branch as SUCCESS in doParallelCommitFiles), and the "honest limitations" section matches the all-or-nothing behavior in handleFinalCommitFiles.

Concurrency nuance worth a code comment

task.cancel(true) / future.cancel(true) on a CompletableFuture does not interrupt the running task — mayInterruptIfRunning is ignored by CompletableFuture. So when the timer fires, the per-partition commit tasks keep running in commitThreadPool while the error branch snapshots the committed/empty sets. I worked through the race and it's safe: failedPrimaryIds is computed from the live committedPrimaryIds before the committed-list snapshot is taken, and the committed set only grows, so a partition that commits during the window lands in both lists (over-reports failure → safe recompute) — never in "neither". And a partition reaches committedIds only after fileWriter.close() succeeds, so committed ⇒ durable.

This is correct but non-obvious and the ordering is load-bearing. A one-line comment noting the snapshot is best-effort (tasks may still be running since cancel doesn't interrupt) would protect against a future reorder.

Empty-partition edge case (same as Copilot's note)

The COMMIT_FILE_EXCEPTION branch triggers on committed*Ids.isEmpty, which marks genuinely-empty partitions (tracked in emptyFile*Ids) as failed → unnecessary recompute in the rare "all partitions empty, none committed" case. Not a correctness bug — over-reporting failure is always safe, and it byte-for-byte preserves the prior no-commit response. If you'd rather optimize it, gate on committed.isEmpty && empty.isEmpty; otherwise a quick reply on the Copilot thread noting it's an intentional conservative choice would close it out.

Minor

  • Tests cover only the pure helper (the risky part — acceptable), but not the empty-only edge case above, which is exactly the Copilot scenario — worth adding.

None of this blocks merge. Nice work, and the writeup/limitations section is appreciated.

🤖 Generated with Claude Code

…s timeout

When `celeborn.worker.commitFiles.timeout` fires and `future.cancel(true)` /
`task.cancel(true)` interrupt the per-partition commit tasks, the worker's
error-path response had three problems that amplified data loss:

1. The response was built with empty committed lists, discarding all partitions
   that committed before the timer fired.
2. `context.reply()` was never called, so the driver waited out
   `celeborn.client.rpc.commitFiles.askTimeout` instead of getting the verdict.
3. The `COMMIT_FILES_TIME` timer was never stopped on this path (leak).

This builds the response from the actual committed / empty / failed state and
returns `PARTIAL_SUCCESS` when any partition committed.

Crucially, tasks still queued (or interrupted before reaching a terminal state)
when cancellation fires land in NONE of the committed / empty / failed sets, so
the failed list is computed as `requested - committed - empty` rather than only
the explicitly-failed ids. Otherwise the driver's `CommitHandler.checkDataLost`
cannot distinguish an in-flight (has data, uncommitted) partition from an empty
(no data) one -- both are absent from committed and failed -- and would silently
treat it as empty-and-valid, producing wrong reducer results with no
`FetchFailedException`. Reporting them as failed makes the driver recompute them.

The response construction is extracted into `Controller.buildCommitFilesResponseOnCancel`
and covered by `ControllerSuite`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@shlomitubul shlomitubul force-pushed the worker-preserve-partial-commit-on-timeout-main branch from 9ae10f6 to 922eba8 Compare June 8, 2026 13:30
@shlomitubul

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough trace — addressed both points in the latest push (922eba8):

  1. Empty-partition edge case (also Copilot's note): the helper no longer takes a separate empty-committed branch. failed is always computed as requested − committed − empty, and COMMIT_FILE_EXCEPTION is returned only when nothing committed and nothing is empty; otherwise PARTIAL_SUCCESS. So genuinely-empty partitions are never reported as failed (consistent with the success path), and the "all-empty, none committed" case no longer forces a recompute. Added two unit tests: empty-files-not-failed-when-nothing-committed, and all-empty-reports-no-failures.

  2. Concurrency / cancel doesn't interrupt — added a comment at the snapshot making the ordering explicit: cancel(true) doesn't interrupt a running CompletableFuture, the committed/empty sets are append-only, and failed is computed (reading committed) before the committed snapshot — so a partition committing mid-window lands in both lists (safe over-report), never in neither. Calling out the load-bearing ordering so a future reorder doesn't reintroduce the silent-drop.

Appreciate the review.

🤖 Generated with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@SteNicholas

Copy link
Copy Markdown
Member

Re-checked the latest two pushes.

Both points from my earlier review are addressed in 922eba8:

  1. The best-effort-cancel comment in buildCommitFilesResponseOnCancel documents the race precisely — failed is computed before the committed snapshot and the sets are append-only, so a racing commit can only land in both failed and committed (safe over-report), never in neither.
  2. Empty partitions are no longer reported as failed: COMMIT_FILE_EXCEPTION is gated on nothing-committed-and-nothing-empty, with the empty-only unit test to match.

Also verified 59c989f's fix for the status/snapshot inconsistency Copilot flagged: status is now derived from the same committed snapshots returned in the response, so COMMIT_FILE_EXCEPTION with non-empty committed lists is no longer possible. The residual window (the empty sets are still read live in the status check) only degrades to a safe over-report of failed, consistent with the documented design.

One leftover nit: the comment "COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a successful terminal state and must not be reported as failed." appears twice in the helper — above the committed-list snapshot and again above the status computation. Worth dropping one copy before merge.

LGTM.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomitubul, please take a look at the comments of copilot.

…ase slots on the cancel path

Address review feedback:
- Derive the response status from the failed lists materialized in the
  response: SUCCESS when both are empty (every requested partition was
  committed or empty before the snapshot), COMMIT_FILE_EXCEPTION when
  nothing committed and nothing empty, PARTIAL_SUCCESS otherwise. This
  keeps a cancellation that raced with completion from landing the
  worker in the client's commitFilesFailedWorkers/excludedWorkers.
- Release slots and remove partition locations before replying in the
  cancel branch, mirroring reply(), instead of holding them until
  shuffle expiry.
- Drop a duplicated comment in buildCommitFilesResponseOnCancel.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comment on lines +708 to +714
// Release slots and remove partition locations before reply, mirroring reply().
// Unlike reply(), commit tasks may still be running here (cancel(true) does not
// interrupt them): a task that has not yet fetched its location will then find it
// removed and mark the partition failed, which is harmless -- the response below
// already reports every not-committed-and-not-empty partition as failed, and the
// shuffle-expiry cleanup that previously reclaimed these slots is idempotent.
val releasePrimaryLocations =
Comment on lines +963 to +973
CommitFilesResponse(
status,
committedPrimaryIdList,
committedReplicaIdList,
failedPrimaryIds,
failedReplicaIds,
new jHashMap[String, StorageInfo](committedPrimaryStorageInfos),
new jHashMap[String, StorageInfo](committedReplicaStorageInfos),
new jHashMap[String, RoaringBitmap](committedMapIdBitMap),
partitionSizeList.asScala.sum,
partitionSizeList.size())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants