[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698
[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698chenghuichen wants to merge 8 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in “decommission-on-SIGTERM” mode for workers to better support scale-down scenarios, by introducing a new config flag that makes SIGTERM follow the decommission flow (report to master and wait for shuffle data consumption/expiration) instead of the graceful-restart flow.
Changes:
- Add
celeborn.worker.decommission.shutdown.enabledconfig and surface it viaCelebornConf. - Make
workerGracefulShutdowneffectively disabled when decommission-on-shutdown is enabled, and setWorkerStatusManager.exitEventTypeaccordingly. - Attempt to extend shutdown hook timeout to
workerDecommissionForceExitTimeoutbefore decommissioning during shutdown.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala | Initialize exitEventType to Decommission when decommission-on-shutdown is enabled (overriding graceful). |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | On decommission shutdown path, call ShutdownHookManager.updateTimeout before decommissionWorker(). |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Add new config entry + getter, and ensure graceful shutdown is suppressed when decommission shutdown is enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for first contribution. Could you please add test cases for this support?
|
@SteNicholas Thanks for the review. Fixed the timeout issue, added tests, and updated worker.md. |
|
@chenghuichen, please use |
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for update. I left some comments for udpates. PTAL.
|
Overall: Clean, well-scoped change. The decommission-on-SIGTERM approach is the right abstraction for scale-down — avoids requiring custom preStop scripts. The Copilot/SteNicholas feedback has been addressed well (explicit timeout registration, Config version should likely be The new config Reviewed with Claude Code |
|
this is awesome and a known issue with auto-scaling down, thank you for your contribution @chenghuichen. I will review this PR soon. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
Overall lgtm. Could you pls address my comments and the other comments.
|
@afterincomparableyum Thanks for the review! Updated: added assertions, fixed version to 0.7.0, clarified docs. |
1d92a40 to
cf8d472
Compare
| // Graceful Shutdown & Recover // | ||
| // ////////////////////////////////////////////////////// | ||
| def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) | ||
| def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) |
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for updates. I left some comments for the updates. PTAL.
| // ////////////////////////////////////////////////////// | ||
| def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) | ||
| def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) | ||
| def workerGracefulShutdownEnabled: Boolean = |
There was a problem hiding this comment.
IMO, decommission shutdown could not affect graceful shutdown behavior. Therefore, this change could be reverted.
| }, | ||
| "worker-shutdown-hook-thread") | ||
|
|
||
| if (conf.workerDecommissionShutdownEnabled) { |
There was a problem hiding this comment.
Could this add two shutdown hook for decommission shutdown and graceful shutdown?
There was a problem hiding this comment.
overall lgtm, thanks for contributing. Could you address the last comments by CoPilot please and @SteNicholas.
|
@chenghuichen, any update? |
SteNicholas
left a comment
There was a problem hiding this comment.
Reviewed the decommission-shutdown path. The state-machine wiring and the new WORKER_DECOMMISSION exit-kind plumbing through stop()/close() look correct, and the config version + docs row check out. One correctness concern (the shutdown-hook timeout budget doesn't account for stop()), one behavior change worth confirming, and a few cleanup/altitude nits inline.
| ShutdownHookManager.get().addShutdownHook( | ||
| shutdownHookThread, | ||
| WORKER_SHUTDOWN_PRIORITY, | ||
| conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, |
There was a problem hiding this comment.
The hook future is given forceExitTimeout + checkInterval, but decommissionWorker()'s wait loop is bounded by waitTime < forceExitTimeout with the bound checked before the sleep, so the loop alone can run up to ~forceExitTimeout + checkInterval. stop(WORKER_DECOMMISSION) then runs after the loop and isn't cheap — each of the three TransportServer.shutdown calls does channel().close().awaitUninterruptibly(10s), plus flusher shutdown. When shuffle never drains, the loop consumes the whole budget and executeShutdown fires future.cancel(true) mid-stop(), truncating teardown.
The doc has the same gap: it tells operators to set terminationGracePeriodSeconds to forceExitTimeout + checkInterval, so the K8s SIGKILL lands at the same point. Consider sizing both the hook timeout and the recommended grace period to include a stop() budget, or bounding the wait loop strictly below forceExitTimeout to reserve headroom.
| } | ||
| if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { | ||
| if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || | ||
| exitKind == CelebornExitKind.WORKER_DECOMMISSION) { |
There was a problem hiding this comment.
Grouping WORKER_DECOMMISSION with WORKER_GRACEFUL_SHUTDOWN here means the thread pools now get an orderly shutdown() (drains in-flight tasks). Before this PR the decommission path went through stop(EXIT_IMMEDIATELY) -> shutdownNow(), so in-flight replicate/commit tasks (e.g. replicating to a dead peer) are now drained rather than force-cancelled, and the wait is deferred to EXECUTOR.awaitTermination(workerGracefulShutdownTimeoutMs) in shutdownExecutor. Probably intended since decommission already waited for shuffle to drain — but worth confirming, and note the decommission path now silently depends on workerGracefulShutdownTimeoutMs even though the feature disables graceful shutdown.
| private val WORKER_SHUTDOWN_PRIORITY = 100 | ||
| val shutdown = new AtomicBoolean(false) | ||
| private val gracefulShutdown = conf.workerGracefulShutdown | ||
| private val gracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown |
There was a problem hiding this comment.
conf.workerGracefulShutdown && !conf.workerDecommissionShutdown is now hand-copied in three places — here, StorageManager.scala:298, and PartitionFilesSorter.java:116. Consider a single derived accessor on CelebornConf (e.g. effectiveWorkerGracefulShutdown) so the override rule can't drift across Scala/Java if the semantics ever change.
| new Runnable { | ||
| override def run(): Unit = { | ||
| logInfo("Shutdown hook called.") | ||
| workerStatusManager.exitEventType match { |
There was a problem hiding this comment.
The hook matches exitEventType twice back-to-back — once to pick the action (L1082) and once to pick the stop() exit-kind (L1091). Folding into a single match (case Decommission => decommissionWorker(); stop(WORKER_DECOMMISSION), etc.) keeps each event's action and exit-kind from drifting apart when a new type is added.
| }, | ||
| "worker-shutdown-hook-thread") | ||
|
|
||
| if (conf.workerDecommissionShutdown) { |
There was a problem hiding this comment.
This if/else registers the hook twice just to vary the timeout. The codebase already has the idiom: register once with the 2-arg overload and then ShutdownHookManager.get().updateTimeout(...) — which is exactly what the REST exit("DECOMMISSION") path does (Worker.scala:955).
| if (sendHeartbeatTask != null) { | ||
| if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { | ||
| if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || | ||
| exitKind == CelebornExitKind.WORKER_DECOMMISSION) { |
There was a problem hiding this comment.
exitKind == WORKER_GRACEFUL_SHUTDOWN || exitKind == WORKER_DECOMMISSION is repeated three times in stop() (here, L639, L647). A single val gracefulLike = … computed once would avoid the next exit-kind needing three coordinated edits.
|
|
||
| test("Test exitEventType initialization based on config") { | ||
| // Default: neither graceful nor decommission → Immediately | ||
| val conf1 = new CelebornConf() |
There was a problem hiding this comment.
conf1's assertEquals(Immediately, mgr1.exitEventType) relies on no celeborn.worker.*.shutdown.enabled system property being present; new CelebornConf() loads sys-props, so leakage from another test/CI could flake it. Setting the relevant keys explicitly (as conf2–conf4 do) keeps it hermetic. Minor: the test never calls init(worker), so only construction-time exitEventType is covered.
What changes were proposed in this pull request?
Add
celeborn.worker.decommission.shutdown.enabledconfiguration. When set to true, the worker will walk the decommission path (sendReportWorkerDecommissionto master, wait for all shuffle data to be consumed or expired) upon receiving SIGTERM, instead of the graceful shutdown path.When enabled, this overrides
celeborn.worker.graceful.shutdown.enabled. The shutdown hook timeout is also extended toceleborn.worker.decommission.forceExitTimeoutto match the decommission wait window.Why are the changes needed?
In scheduled auto-scaling scenarios (e.g., scale up at peak hours, scale down at off-peak), operators want to simply shrink the node pool and let the PaaS layer send SIGTERM without writing custom preStop scripts or manually invoking the decommission REST API.
The existing graceful shutdown (
celeborn.worker.graceful.shutdown.enabled=true) is designed for rolling upgrades. For permanent scale-down, the correct semantic is decommission. Previously this could only be triggered via REST API or master-pushed events, requiring additional scripting in the teardown workflow.With this change, operators only need to set one config and align
celeborn.worker.decommission.forceExitTimeoutwith the pod'sterminationGracePeriodSeconds.Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
WorkerStatusManager.exitEventTypeis set toDecommissionandworkerGracefulShutdownreturns false (suppressing recovery DB initialization)decommission.forceExitTimeoutbefore callingdecommissionWorker()