Skip to content

[CELEBORN-2344] Support to force set serving state via HTTP APIs#3710

Open
s0nskar wants to merge 1 commit into
apache:mainfrom
s0nskar:push_pause_api
Open

[CELEBORN-2344] Support to force set serving state via HTTP APIs#3710
s0nskar wants to merge 1 commit into
apache:mainfrom
s0nskar:push_pause_api

Conversation

@s0nskar

@s0nskar s0nskar commented May 28, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Added /servingState which supports GET and POST http methods.

  • GET will just return the current serving state.

  • POST can be used to force override the serving state of a worker. It takes two params – state (override for serving state) and timeout (after which overridden state should clear up). If timeout is not present forced state will not clear up, unless someone overrides it to empty state.

  • handling live migration scenarios and other cases where we don't want the worker to receive new data but still want to keep it running. Or maybe where we want to force unpause the worker.

Why are the changes needed?

This can be used for planned maintenance of worker or cases where worker is degraded or under high load but not having high memory pressure. This can also be used for cases to force resume worker which can be useful in cases like #3696

We are using this specifically during GCP live migration – https://docs.cloud.google.com/compute/docs/instances/live-migration-process

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

Tested in our dev setup

>>> curl <host:port>/servingState
====================== Worker Serving State ==========================
Current state: NONE_PAUSED.

>>> curl -X POST "<host:port>/servingState"   -H "Content-Type: application/x-www-form-urlencoded"   -d "state=PUSH_PAUSED" -d "timeout=1m"
====================== Set Serving State ============================
Serving state forced to: PUSH_PAUSED
Override will auto-clear after 1m.

>>> curl <host:port>/servingState
====================== Worker Serving State ==========================
Current state: PUSH_PAUSED.
Manual override active.

// Cleared after 1 min
>>> curl <host:port>/servingState
====================== Worker Serving State ==========================
Current state: NONE_PAUSED.

// Forced without timeout
>>> curl -X POST "<host:port>/servingState"   -H "Content-Type: application/x-www-form-urlencoded"   -d "state=PUSH_PAUSED"
====================== Set Serving State ============================
Serving state forced to: PUSH_PAUSED
Override will persist until explicitly cleared.

>>> curl <host:port>/servingState
====================== Worker Serving State ==========================
Current state: PUSH_PAUSED.
Manual override active.

>>> curl -X POST "<host:port>/servingState"   -H "Content-Type: application/x-www-form-urlencoded"   -d "state="
====================== Set Serving State ============================
Manual servingState override cleared.

>>> curl <host:port>/servingState
====================== Worker Serving State ==========================
Current state: NONE_PAUSED.

// Invalid scenarios 
>>> curl -X POST "<host:port>/servingState"   -H "Content-Type: application/x-www-form-urlencoded"   -d "state=PUSH_PAUSED" -d "timeout=1k"
Invalid timeout '1k'. java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us.
Invalid suffix: "k"

>>> curl -X POST "<host:port>/servingState"   -H "Content-Type: application/x-www-form-urlencoded"   -d "state=INVALID"
Invalid state 'INVALID'. Legal values: PUSH_AND_REPLICATE_PAUSED, PUSH_PAUSED, NONE_PAUSED

@SteNicholas

Copy link
Copy Markdown
Member

@s0nskar, could you also add a cli for /servingState interface?

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case described (GCP live migration) can already be covered by the existing DecommissionThenIdle + Recommission workflow via the Master's /sendWorkerEvent API:

# Before migration: stop accepting new shuffle slots, drain existing ones
curl -X POST "http://<master>:<port>/sendWorkerEvent" \
  -d "type=DecommissionThenIdle" \
  -d "workers=<host>:<rpcPort>:<pushPort>:<fetchPort>:<replicatePort>"

# Check if all shuffles have been drained
curl http://<worker>:<port>/isDecommissioning

# ... perform live migration ...

# After migration: bring worker back
curl -X POST "http://<master>:<port>/sendWorkerEvent" \
  -d "type=Recommission" \
  -d "workers=<host>:<rpcPort>:<pushPort>:<fetchPort>:<replicatePort>"

State flow: NormalInDecommissionThenIdle → (all shuffles released) → Idle → (Recommission) → Normal.

This approach already maintains the excluded list on the Master side (no new slots allocated), has a proper state machine with Recommission support, and does not interfere with MemoryManager internals.


Regarding the current implementation, the forced state is injected at the top of currentServingState(), which is consumed by switchServingState() along with its side-effect logic (resume replicate, pinned memory handling, pause timing). This creates several issues:

  1. Safety conflict: If real memory exceeds the replicate threshold (PUSH_AND_REPLICATE_PAUSED) but the forced state is PUSH_PAUSED, switchServingState() will call resumeReplicate() under genuine memory pressure. Conversely, forcing NONE_PAUSED during real memory pressure bypasses all protection and risks OOM.

  2. Unintended eviction: shouldEvict() checks servingState != NONE_PAUSED. Forcing PUSH_PAUSED for maintenance (no actual memory pressure) would trigger unnecessary memory file eviction.

  3. Stale isPaused flag: currentServingState() returns early when forced state is active, skipping the isPaused assignment. After the forced state clears, the hysteresis behavior between resume and pause thresholds depends on the stale isPaused value.

If a forced serving state API is still desired beyond what DecommissionThenIdle provides, I'd suggest restricting it to only allow making the state more restrictive (i.e., max(forcedState, realState)), never less restrictive, to avoid overriding memory safety mechanisms.

Reviewed with Claude Code

@s0nskar

s0nskar commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Hi @RexXiong, Thanks for the review.

The use case described (GCP live migration) can already be covered by the existing DecommissionThenIdle + Recommission workflow via the Master's /sendWorkerEvent API.

This does not work for us as the notification time we get for such events is very less (Less then 5mins). It is is not possible to use decommission flow in such case.

For the other points i definitely agree on your concerns but making it more restrictive might not cover cases like this – #3696 where we have to force the NONE_PAUSE state. We are using this API for such cases as well.

Should i introduce a new flow, just to pause receiving new data on channels and do not have any side effect? I am open to discuss more on this.

@RexXiong

Copy link
Copy Markdown
Contributor

Thanks for the explanation. I agree this is useful as an ops escape hatch for scenarios like GCP live migration.

One suggestion: could you expose a metric (e.g., isForcedOverride) so that monitoring can distinguish between a genuine memory-driven pause and a manual override? This would help operators avoid confusion when debugging serving state issues.

Reviewed with Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants