Skip to content

[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698

Open
chenghuichen wants to merge 8 commits into
apache:mainfrom
chenghuichen:exit-type
Open

[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698
chenghuichen wants to merge 8 commits into
apache:mainfrom
chenghuichen:exit-type

Conversation

@chenghuichen

Copy link
Copy Markdown

What changes were proposed in this pull request?

Add celeborn.worker.decommission.shutdown.enabled configuration. When set to true, the worker will walk the decommission path (send ReportWorkerDecommission to master, wait for all shuffle data to be consumed or expired) upon receiving SIGTERM, instead of the graceful shutdown path.

When enabled, this overrides celeborn.worker.graceful.shutdown.enabled. The shutdown hook timeout is also extended to celeborn.worker.decommission.forceExitTimeout to match the decommission wait window.

Why are the changes needed?

In scheduled auto-scaling scenarios (e.g., scale up at peak hours, scale down at off-peak), operators want to simply shrink the node pool and let the PaaS layer send SIGTERM without writing custom preStop scripts or manually invoking the decommission REST API.

The existing graceful shutdown (celeborn.worker.graceful.shutdown.enabled=true) is designed for rolling upgrades. For permanent scale-down, the correct semantic is decommission. Previously this could only be triggered via REST API or master-pushed events, requiring additional scripting in the teardown workflow.

With this change, operators only need to set one config and align celeborn.worker.decommission.forceExitTimeout with the pod's terminationGracePeriodSeconds.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

  • Verified default behavior unchanged (config defaults to false, existing graceful shutdown unaffected)
  • Verified that when enabled, WorkerStatusManager.exitEventType is set to Decommission and workerGracefulShutdown returns false (suppressing recovery DB initialization)
  • Verified shutdown hook extends timeout to decommission.forceExitTimeout before calling decommissionWorker()

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in “decommission-on-SIGTERM” mode for workers to better support scale-down scenarios, by introducing a new config flag that makes SIGTERM follow the decommission flow (report to master and wait for shuffle data consumption/expiration) instead of the graceful-restart flow.

Changes:

  • Add celeborn.worker.decommission.shutdown.enabled config and surface it via CelebornConf.
  • Make workerGracefulShutdown effectively disabled when decommission-on-shutdown is enabled, and set WorkerStatusManager.exitEventType accordingly.
  • Attempt to extend shutdown hook timeout to workerDecommissionForceExitTimeout before decommissioning during shutdown.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala Initialize exitEventType to Decommission when decommission-on-shutdown is enabled (overriding graceful).
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala On decommission shutdown path, call ShutdownHookManager.updateTimeout before decommissionWorker().
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Add new config entry + getter, and ensure graceful shutdown is suppressed when decommission shutdown is enabled.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Outdated
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghuichen, thanks for first contribution. Could you please add test cases for this support?

@chenghuichen chenghuichen changed the title [CELEBORN-xxxx] Support decommission shutdown for worker scale-down scenarios [CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios May 25, 2026
@chenghuichen

Copy link
Copy Markdown
Author

@SteNicholas Thanks for the review. Fixed the timeout issue, added tests, and updated worker.md.

@SteNicholas

Copy link
Copy Markdown
Member

@chenghuichen, please use dev/reformat command to format code for failure of style check.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Outdated
Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Outdated
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Outdated
Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Outdated

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghuichen, thanks for update. I left some comments for udpates. PTAL.

@RexXiong

Copy link
Copy Markdown
Contributor

Overall: Clean, well-scoped change. The decommission-on-SIGTERM approach is the right abstraction for scale-down — avoids requiring custom preStop scripts. The Copilot/SteNicholas feedback has been addressed well (explicit timeout registration, WORKER_DECOMMISSION exit kind, assertion order fix, config override test).

Config version should likely be 0.7.0

The new config celeborn.worker.decommission.shutdown.enabled is versioned 0.6.0, but other new configs in recent PRs (e.g., PR #3695) use 0.7.0. If main is targeting 0.7.0, this should be updated to match.

Reviewed with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Outdated
Comment thread docs/configuration/worker.md Outdated
@afterincomparableyum

Copy link
Copy Markdown
Contributor

this is awesome and a known issue with auto-scaling down, thank you for your contribution @chenghuichen. I will review this PR soon.

@afterincomparableyum afterincomparableyum left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm. Could you pls address my comments and the other comments.

@chenghuichen

Copy link
Copy Markdown
Author

@afterincomparableyum Thanks for the review! Updated: added assertions, fixed version to 0.7.0, clarified docs.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghuichen, thanks for updates. I left some comments for the updates. PTAL.

// //////////////////////////////////////////////////////
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)
def workerGracefulShutdownEnabled: Boolean =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, decommission shutdown could not affect graceful shutdown behavior. Therefore, this change could be reverted.

},
"worker-shutdown-hook-thread")

if (conf.workerDecommissionShutdownEnabled) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this add two shutdown hook for decommission shutdown and graceful shutdown?

@afterincomparableyum afterincomparableyum left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, thanks for contributing. Could you address the last comments by CoPilot please and @SteNicholas.

@SteNicholas

Copy link
Copy Markdown
Member

@chenghuichen, any update?

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the decommission-shutdown path. The state-machine wiring and the new WORKER_DECOMMISSION exit-kind plumbing through stop()/close() look correct, and the config version + docs row check out. One correctness concern (the shutdown-hook timeout budget doesn't account for stop()), one behavior change worth confirming, and a few cleanup/altitude nits inline.

ShutdownHookManager.get().addShutdownHook(
shutdownHookThread,
WORKER_SHUTDOWN_PRIORITY,
conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hook future is given forceExitTimeout + checkInterval, but decommissionWorker()'s wait loop is bounded by waitTime < forceExitTimeout with the bound checked before the sleep, so the loop alone can run up to ~forceExitTimeout + checkInterval. stop(WORKER_DECOMMISSION) then runs after the loop and isn't cheap — each of the three TransportServer.shutdown calls does channel().close().awaitUninterruptibly(10s), plus flusher shutdown. When shuffle never drains, the loop consumes the whole budget and executeShutdown fires future.cancel(true) mid-stop(), truncating teardown.

The doc has the same gap: it tells operators to set terminationGracePeriodSeconds to forceExitTimeout + checkInterval, so the K8s SIGKILL lands at the same point. Consider sizing both the hook timeout and the recommended grace period to include a stop() budget, or bounding the wait loop strictly below forceExitTimeout to reserve headroom.

}
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouping WORKER_DECOMMISSION with WORKER_GRACEFUL_SHUTDOWN here means the thread pools now get an orderly shutdown() (drains in-flight tasks). Before this PR the decommission path went through stop(EXIT_IMMEDIATELY) -> shutdownNow(), so in-flight replicate/commit tasks (e.g. replicating to a dead peer) are now drained rather than force-cancelled, and the wait is deferred to EXECUTOR.awaitTermination(workerGracefulShutdownTimeoutMs) in shutdownExecutor. Probably intended since decommission already waited for shuffle to drain — but worth confirming, and note the decommission path now silently depends on workerGracefulShutdownTimeoutMs even though the feature disables graceful shutdown.

private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
private val gracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf.workerGracefulShutdown && !conf.workerDecommissionShutdown is now hand-copied in three places — here, StorageManager.scala:298, and PartitionFilesSorter.java:116. Consider a single derived accessor on CelebornConf (e.g. effectiveWorkerGracefulShutdown) so the override rule can't drift across Scala/Java if the semantics ever change.

new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
workerStatusManager.exitEventType match {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hook matches exitEventType twice back-to-back — once to pick the action (L1082) and once to pick the stop() exit-kind (L1091). Folding into a single match (case Decommission => decommissionWorker(); stop(WORKER_DECOMMISSION), etc.) keeps each event's action and exit-kind from drifting apart when a new type is added.

},
"worker-shutdown-hook-thread")

if (conf.workerDecommissionShutdown) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if/else registers the hook twice just to vary the timeout. The codebase already has the idiom: register once with the 2-arg overload and then ShutdownHookManager.get().updateTimeout(...) — which is exactly what the REST exit("DECOMMISSION") path does (Worker.scala:955).

if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitKind == WORKER_GRACEFUL_SHUTDOWN || exitKind == WORKER_DECOMMISSION is repeated three times in stop() (here, L639, L647). A single val gracefulLike = … computed once would avoid the next exit-kind needing three coordinated edits.


test("Test exitEventType initialization based on config") {
// Default: neither graceful nor decommission → Immediately
val conf1 = new CelebornConf()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf1's assertEquals(Immediately, mgr1.exitEventType) relies on no celeborn.worker.*.shutdown.enabled system property being present; new CelebornConf() loads sys-props, so leakage from another test/CI could flake it. Setting the relevant keys explicitly (as conf2–conf4 do) keeps it hermetic. Minor: the test never calls init(worker), so only construction-time exitEventType is covered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants