Skip to content

[CELEBORN-2353] Align C++ shuffle client push/merge data path with Java ShuffleClient#3724

Open
SteNicholas wants to merge 1 commit into
apache:mainfrom
SteNicholas:CELEBORN-2353
Open

[CELEBORN-2353] Align C++ shuffle client push/merge data path with Java ShuffleClient#3724
SteNicholas wants to merge 1 commit into
apache:mainfrom
SteNicholas:CELEBORN-2353

Conversation

@SteNicholas

@SteNicholas SteNicholas commented Jun 10, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This PR aligns the C++ shuffle client's push/merge data path with the Java ShuffleClient:

  • Refactor pushData/mergeData/pushMergedData to share one push prologue (prepareBatch) and a uniform synchronous-failure route (pushWithFailureRouting), and move throttling (limitMaxInFlight) to the caller before the initial push so the retry path never re-throttles.
  • Exclude push workers on connection/timeout failures (celeborn.client.push.excludeWorkerOnFailure.enabled), routing pushes away from them until a successful revive or re-assignment lifts the exclusion, and clear it wholesale on shutdown.
  • Track push-failed batches and report them at MapperEnd; the reader dedups duplicate batches for the adaptive skewed-partition read optimization.
  • Derive specific failure causes from transport error messages (getPushDataFailCause) and propagate them through revive/retry; add StatusCode::toString.

Why are the changes needed?

The C++ shuffle client lacked the push-side resiliency the Java ShuffleClient already provides: excluding failed push workers and tracking data-push failures so the reader can dedup duplicate batches for the adaptive skewed-partition read optimization. Bringing the C++ push/merge data path to parity keeps the native and JVM clients behaviorally consistent and honors the existing celeborn.client.push.excludeWorkerOnFailure.enabled / adaptive skewed-partition read configs.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

Covered by new/extended C++ unit tests:

  • PushDataCallbackTest, PushMergedDataCallbackTest — push / merged-push callback failure routing and cause propagation.
  • ShuffleClientImplTest — push worker exclusion and data-push-failure tracking.
  • CelebornInputStreamRetryTest — reader-side dedup of duplicate push-failed batches.
  • PushStateTest, ControlMessagesTest — failed-batch state and MapperEnd serialization round-trip.

@SteNicholas SteNicholas force-pushed the CELEBORN-2353 branch 6 times, most recently from 9b6b5fe to 5ac8415 Compare June 10, 2026 05:04
…va ShuffleClient

Bring the C++ push paths in line with the Java ShuffleClient:

- Refactor pushData/mergeData/pushMergedData to share one push prologue
  (prepareBatch) and a uniform synchronous-failure route
  (pushWithFailureRouting), and move throttling (limitMaxInFlight) to the
  caller before the initial push so the retry path never re-throttles.
- Exclude push workers on connection/timeout failures
  (celeborn.client.push.excludeWorkerOnFailure.enabled), routing pushes away
  from them until a successful revive or re-assignment lifts the exclusion,
  and clear it wholesale on shutdown.
- Track push-failed batches and report them at MapperEnd; the reader dedups
  duplicate batches for the adaptive skewed-partition read optimization.
- Derive specific failure causes from transport error messages
  (getPushDataFailCause) and propagate them through revive/retry; add
  StatusCode::toString.
@SteNicholas

Copy link
Copy Markdown
Member Author

Ping @RexXiong, @afterincomparableyum.

@SteNicholas SteNicholas requested a review from HolyLow June 10, 2026 06:37

@afterincomparableyum afterincomparableyum left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, I just have a couple comments.

CELEBORN_DCHECK_LT(partitionIndex, numBatches);
int statusCode = partitionInfo.statuscodes(i);
if (statusCode ==
static_cast<int>(protocol::StatusCode::HARD_SPLIT)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, Java's loop is if (status == SOFT_SPLIT) { revive } else { batchesNeedResubmit.add(......) } (around ShuffleClientImpl.java:1563).

anything that isn't SOFT_SPLIT is resubmitted.

I would suggest mirroring Java's else-resubmit.

return true;
}

void ShuffleClientImpl::shutdown() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() clears only pushExcludedWorkers.

Java clears both exclusion sets. Java's shutdown() (ShuffleClientImpl.java:2052–2077) also clears fetchExcludedWorkers (and closes the revive manager, retry pool, client factory).

@OverRide
public void shutdown() {
if (null != reviveManager) {
reviveManager.close();
}
if (null != rpcEnv) {
rpcEnv.shutdown();
}
if (null != dataClientFactory) {
dataClientFactory.close();
}
if (null != transportContext) {
transportContext.close();
}
if (null != pushDataRetryPool) {
pushDataRetryPool.shutdown();
}
if (null != lifecycleManagerRef) {
lifecycleManagerRef = null;
}

shuffleIdCache.clear();
pushExcludedWorkers.clear();
fetchExcludedWorkers.clear();
messagesHelper.close();
logger.warn("Shuffle client has been shutdown!");

}

If you want C++ teardown of pools/manager is the destructor's job that's fine, I would just double check that this is the case. But fetchExcludedWorkers_ is the same category of state being cleared like Java. Clear it here too, or note why not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants