[CELEBORN-2353] Align C++ shuffle client push/merge data path with Java ShuffleClient#3724
[CELEBORN-2353] Align C++ shuffle client push/merge data path with Java ShuffleClient#3724SteNicholas wants to merge 1 commit into
Conversation
9b6b5fe to
5ac8415
Compare
…va ShuffleClient Bring the C++ push paths in line with the Java ShuffleClient: - Refactor pushData/mergeData/pushMergedData to share one push prologue (prepareBatch) and a uniform synchronous-failure route (pushWithFailureRouting), and move throttling (limitMaxInFlight) to the caller before the initial push so the retry path never re-throttles. - Exclude push workers on connection/timeout failures (celeborn.client.push.excludeWorkerOnFailure.enabled), routing pushes away from them until a successful revive or re-assignment lifts the exclusion, and clear it wholesale on shutdown. - Track push-failed batches and report them at MapperEnd; the reader dedups duplicate batches for the adaptive skewed-partition read optimization. - Derive specific failure causes from transport error messages (getPushDataFailCause) and propagate them through revive/retry; add StatusCode::toString.
5ac8415 to
3ceefce
Compare
|
Ping @RexXiong, @afterincomparableyum. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
overall lgtm, I just have a couple comments.
| CELEBORN_DCHECK_LT(partitionIndex, numBatches); | ||
| int statusCode = partitionInfo.statuscodes(i); | ||
| if (statusCode == | ||
| static_cast<int>(protocol::StatusCode::HARD_SPLIT)) { |
There was a problem hiding this comment.
So here, Java's loop is if (status == SOFT_SPLIT) { revive } else { batchesNeedResubmit.add(......) } (around ShuffleClientImpl.java:1563).
anything that isn't SOFT_SPLIT is resubmitted.
I would suggest mirroring Java's else-resubmit.
| return true; | ||
| } | ||
|
|
||
| void ShuffleClientImpl::shutdown() { |
There was a problem hiding this comment.
shutdown() clears only pushExcludedWorkers.
Java clears both exclusion sets. Java's shutdown() (ShuffleClientImpl.java:2052–2077) also clears fetchExcludedWorkers (and closes the revive manager, retry pool, client factory).
@OverRide
public void shutdown() {
if (null != reviveManager) {
reviveManager.close();
}
if (null != rpcEnv) {
rpcEnv.shutdown();
}
if (null != dataClientFactory) {
dataClientFactory.close();
}
if (null != transportContext) {
transportContext.close();
}
if (null != pushDataRetryPool) {
pushDataRetryPool.shutdown();
}
if (null != lifecycleManagerRef) {
lifecycleManagerRef = null;
}
shuffleIdCache.clear();
pushExcludedWorkers.clear();
fetchExcludedWorkers.clear();
messagesHelper.close();
logger.warn("Shuffle client has been shutdown!");
}
If you want C++ teardown of pools/manager is the destructor's job that's fine, I would just double check that this is the case. But fetchExcludedWorkers_ is the same category of state being cleared like Java. Clear it here too, or note why not.
What changes were proposed in this pull request?
This PR aligns the C++ shuffle client's push/merge data path with the Java
ShuffleClient:pushData/mergeData/pushMergedDatato share one push prologue (prepareBatch) and a uniform synchronous-failure route (pushWithFailureRouting), and move throttling (limitMaxInFlight) to the caller before the initial push so the retry path never re-throttles.celeborn.client.push.excludeWorkerOnFailure.enabled), routing pushes away from them until a successful revive or re-assignment lifts the exclusion, and clear it wholesale on shutdown.MapperEnd; the reader dedups duplicate batches for the adaptive skewed-partition read optimization.getPushDataFailCause) and propagate them through revive/retry; addStatusCode::toString.Why are the changes needed?
The C++ shuffle client lacked the push-side resiliency the Java
ShuffleClientalready provides: excluding failed push workers and tracking data-push failures so the reader can dedup duplicate batches for the adaptive skewed-partition read optimization. Bringing the C++ push/merge data path to parity keeps the native and JVM clients behaviorally consistent and honors the existingceleborn.client.push.excludeWorkerOnFailure.enabled/ adaptive skewed-partition read configs.Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
Covered by new/extended C++ unit tests:
PushDataCallbackTest,PushMergedDataCallbackTest— push / merged-push callback failure routing and cause propagation.ShuffleClientImplTest— push worker exclusion and data-push-failure tracking.CelebornInputStreamRetryTest— reader-side dedup of duplicate push-failed batches.PushStateTest,ControlMessagesTest— failed-batch state andMapperEndserialization round-trip.