[CELEBORN-2351] Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED#3720
[CELEBORN-2351] Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED#3720s0nskar wants to merge 5 commits into
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3720 +/- ##
==========================================
+ Coverage 66.91% 67.09% +0.18%
==========================================
Files 358 359 +1
Lines 21986 22304 +318
Branches 1946 1982 +36
==========================================
+ Hits 14710 14962 +252
- Misses 6262 6319 +57
- Partials 1014 1023 +9 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR updates the worker-side memory gating for partition file sorting so that sorting is only paused under the highest memory-pressure state (PUSH_AND_REPLICATE_PAUSED), while allowing sorting to continue under PUSH_PAUSED. This aligns sorting behavior with the ServingState semantics in MemoryManager and aims to prevent prolonged pause-induced sort timeouts that can delay or fail readers.
Changes:
- Adjust
MemoryManager.sortMemoryReady()to block sorting only whenservingState == PUSH_AND_REPLICATE_PAUSED. - Allow partition file sorting to proceed during
PUSH_PAUSED, subject to the existingmaxSortMemorythreshold.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
SteNicholas
left a comment
There was a problem hiding this comment.
Correct, well-targeted, low-risk change. The motivation and safety both check out — approve. My only real ask is a unit test.
The motivation checks out
I traced the read path: getSortedFileInfo enqueues a FileSorter onto the single-threaded worker-file-sorter-scheduler, and isPrefetch = !isDfs && prefetchEnabled. So with prefetch enabled, reader-triggered local-disk sorts are themselves the "prefetch" tasks that pass through the while (!sortMemoryReady()) sleep(20) gate. Under the old code, any pause — including a sustained PUSH_PAUSED — stalls that gate, and the reader blocks up to the ~220s sort timeout and then fails. The change correctly narrows the stall to genuine OOM-risk pressure.
Why it's safe
Allowing more sort memory under pause is self-limiting because sort memory is itself part of the pause signal: getMemoryUsage() = getNettyUsedDirectMemory() + sortMemoryCounter.get(), and the state thresholds key off getMemoryUsage(). So if sorting in PUSH_PAUSED accumulates enough sortMemoryCounter to cross pauseReplicateThreshold, the next switchServingState() tick (~10ms) flips to PUSH_AND_REPLICATE_PAUSED and new prefetch sorts re-block. Combined with the unchanged < maxSortMemory cap, the extra pressure can't run away.
Notes (none blocking)
-
Please add a unit test.
MemoryManagerSuitealready drives states by settingsortMemoryCounter, so this is cheap. One nuance:sortMemoryReady()reads the cachedservingStatefield, notcurrentServingState()— so a test must call the@VisibleForTesting switchServingState()after setting the counter, then assertsortMemoryReady()istrueinPUSH_PAUSEDandfalseinPUSH_AND_REPLICATE_PAUSED. Worth doing to prevent a silent regression back to the old gating. -
Mild resume delay / possible state flutter (minor). Holding up to
maxSortMemoryof sort buffers inPUSH_PAUSEDkeepsgetMemoryUsage()elevated, which can marginally delay resume toNONE_PAUSEDand, if sort memory is the marginal contributor nearpauseReplicateThreshold, cause somePUSH_PAUSED ↔ PUSH_AND_REPLICATE_PAUSEDtoggling (each toggle fires replicate pause/resume callbacks). Bounded bymaxSortMemoryand self-correcting, so acceptable — just flagging it's a real if small behavior change. -
Residual, not introduced here: under
PUSH_AND_REPLICATE_PAUSEDa blocked prefetch task still parks the single scheduler thread inside the sleep loop, head-of-line-blocking every queued task (including already-sortable or DFS ones) behind it. This PR makes that rarer but doesn't remove it — fine for this PR's scope.
🤖 Generated with Claude Code
72302d1 to
d77ab69
Compare
d77ab69 to
20b5659
Compare
Also, this seems like a valid issue. I will try to fix this in a follow up PR. |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
|
Heads-up @s0nskar: Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)Diff of - // back-pressured. Before Fix 1 this incorrectly returned false.
- Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
+ // back-pressured (previously sorting was also blocked in this state).That line is what drives the transition into Restoring the stub line under the new comment fixes it. The production change itself is unaffected — my approval from the earlier review stands. |
|
Thanks @SteNicholas for checking. Fixed them now. |
What changes were proposed in this pull request?
Partition file sorting should only be paused for
PUSH_AND_REPLICATE_PAUSED, which represent very high memory pressure and cause OOM for workers. Sorting should be allowed forPUSH_PAUSEDstate.Why are the changes needed?
Currently even for push pause state we stop the sorting for partition files. If pause is sustained for a longer time then sorting can timeout and reader waiting for sorting will fail or be delayed.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
Existing UTs