Skip to content

[CELEBORN-2351] Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED#3720

Closed
s0nskar wants to merge 5 commits into
apache:mainfrom
s0nskar:sort_memory_ready
Closed

[CELEBORN-2351] Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED#3720
s0nskar wants to merge 5 commits into
apache:mainfrom
s0nskar:sort_memory_ready

Conversation

@s0nskar

@s0nskar s0nskar commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED, which represent very high memory pressure and cause OOM for workers. Sorting should be allowed for PUSH_PAUSED state.

Why are the changes needed?

Currently even for push pause state we stop the sorting for partition files. If pause is sustained for a longer time then sorting can timeout and reader waiting for sorting will fail or be delayed.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

Existing UTs

@codecov

codecov Bot commented Jun 5, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 67.09%. Comparing base (b4cb5a0) to head (7e60ac2).
⚠️ Report is 56 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3720      +/-   ##
==========================================
+ Coverage   66.91%   67.09%   +0.18%     
==========================================
  Files         358      359       +1     
  Lines       21986    22304     +318     
  Branches     1946     1982      +36     
==========================================
+ Hits        14710    14962     +252     
- Misses       6262     6319      +57     
- Partials     1014     1023       +9     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the worker-side memory gating for partition file sorting so that sorting is only paused under the highest memory-pressure state (PUSH_AND_REPLICATE_PAUSED), while allowing sorting to continue under PUSH_PAUSED. This aligns sorting behavior with the ServingState semantics in MemoryManager and aims to prevent prolonged pause-induced sort timeouts that can delay or fail readers.

Changes:

  • Adjust MemoryManager.sortMemoryReady() to block sorting only when servingState == PUSH_AND_REPLICATE_PAUSED.
  • Allow partition file sorting to proceed during PUSH_PAUSED, subject to the existing maxSortMemory threshold.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, well-targeted, low-risk change. The motivation and safety both check out — approve. My only real ask is a unit test.

The motivation checks out

I traced the read path: getSortedFileInfo enqueues a FileSorter onto the single-threaded worker-file-sorter-scheduler, and isPrefetch = !isDfs && prefetchEnabled. So with prefetch enabled, reader-triggered local-disk sorts are themselves the "prefetch" tasks that pass through the while (!sortMemoryReady()) sleep(20) gate. Under the old code, any pause — including a sustained PUSH_PAUSED — stalls that gate, and the reader blocks up to the ~220s sort timeout and then fails. The change correctly narrows the stall to genuine OOM-risk pressure.

Why it's safe

Allowing more sort memory under pause is self-limiting because sort memory is itself part of the pause signal: getMemoryUsage() = getNettyUsedDirectMemory() + sortMemoryCounter.get(), and the state thresholds key off getMemoryUsage(). So if sorting in PUSH_PAUSED accumulates enough sortMemoryCounter to cross pauseReplicateThreshold, the next switchServingState() tick (~10ms) flips to PUSH_AND_REPLICATE_PAUSED and new prefetch sorts re-block. Combined with the unchanged < maxSortMemory cap, the extra pressure can't run away.

Notes (none blocking)

  1. Please add a unit test. MemoryManagerSuite already drives states by setting sortMemoryCounter, so this is cheap. One nuance: sortMemoryReady() reads the cached servingState field, not currentServingState() — so a test must call the @VisibleForTesting switchServingState() after setting the counter, then assert sortMemoryReady() is true in PUSH_PAUSED and false in PUSH_AND_REPLICATE_PAUSED. Worth doing to prevent a silent regression back to the old gating.

  2. Mild resume delay / possible state flutter (minor). Holding up to maxSortMemory of sort buffers in PUSH_PAUSED keeps getMemoryUsage() elevated, which can marginally delay resume to NONE_PAUSED and, if sort memory is the marginal contributor near pauseReplicateThreshold, cause some PUSH_PAUSED ↔ PUSH_AND_REPLICATE_PAUSED toggling (each toggle fires replicate pause/resume callbacks). Bounded by maxSortMemory and self-correcting, so acceptable — just flagging it's a real if small behavior change.

  3. Residual, not introduced here: under PUSH_AND_REPLICATE_PAUSED a blocked prefetch task still parks the single scheduler thread inside the sleep loop, head-of-line-blocking every queued task (including already-sortable or DFS ones) behind it. This PR makes that rarer but doesn't remove it — fine for this PR's scope.

🤖 Generated with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

@s0nskar s0nskar force-pushed the sort_memory_ready branch from d77ab69 to 20b5659 Compare June 9, 2026 05:58
@s0nskar

s0nskar commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Residual, not introduced here: PUSH_AND_REPLICATE_PAUSED a blocked prefetch task still parks the single scheduler thread inside the sleep loop, head-of-line-blocking every queued task (including already-sortable or DFS ones) behind it. This PR makes that rarer but doesn't remove it — fine for this PR's scope.

Also, this seems like a valid issue. I will try to fix this in a follow up PR.
Filed this – https://issues.apache.org/jira/browse/CELEBORN-2352

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@SteNicholas

Copy link
Copy Markdown
Member

Heads-up @s0nskar: b7e086b ("Apply suggestions from code review") accidentally broke the new test — applying the comment reword also deleted the adjacent stub line:

Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)

Diff of MemoryManagerSuite.scala between 20b5659 and b7e086b:

-    // back-pressured. Before Fix 1 this incorrectly returned false.
-    Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
+    // back-pressured (previously sorting was also blocked in this state).

That line is what drives the transition into PUSH_PAUSED. With getMemoryUsage still stubbed to 0L, currentServingState() can only return NONE_PAUSED (usage exceeds neither pause threshold and is below the resume ratio), so the next assertion assert(memoryManager.servingState == ServingState.PUSH_PAUSED) in "sortMemoryReady allows sorting in PUSH_PAUSED but blocks in PUSH_AND_REPLICATE_PAUSED" cannot pass — the currently-pending CI run will fail on it. pushThreshold is also left as an unused val.

Restoring the stub line under the new comment fixes it. The production change itself is unaffected — my approval from the earlier review stands.

@s0nskar

s0nskar commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @SteNicholas for checking. Fixed them now.

@RexXiong RexXiong closed this in 90b9b44 Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants