Skip to content

[CELEBORN-2350] Support chunk level compression to optimize storage#3699

Open
saurabhd336 wants to merge 29 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter
Open

[CELEBORN-2350] Support chunk level compression to optimize storage#3699
saurabhd336 wants to merge 29 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter

Conversation

@saurabhd336

@saurabhd336 saurabhd336 commented May 23, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.

This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.

This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.

Impl details

Writer side

  1. Added a new FileChannelWriter interface which supports write / close functionalities. BypassFileChannelWriter is the default and ensures the current behaviour (directly write flushBuffer to disk file channel).
  2. Added ChunkCompressedFileChannelWriter: Accumulates records in a direct ByteBuffer of chunkSize bytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger than chunkSize stream directly to disk via ZstdOutputStream. Replaces compressed chunk-boundary offsets into ReduceFileMeta on close. Also updates the bytesFlushed to overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered by MmapMemoryManager and ChunkBufferPool which uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.
  3. Choice b/w ChunkCompressedFileChannelWriter and FileChannelWriter is made basis the new config set by client during ReserveSlots (conf.isChunkCompressionEnabled).

Read side

  1. No changes in the worker (YET TO IMPLEMENT: Partition sorting during AQE flow)
  2. CelebornInputStream: When reading chunkCompressed chunks, wraps the read ByteBuf into a ZSTDIs to inplace decompress and read.

Configs added

Key Default Meaning
celeborn.chunk.compression.enabled false Client side config. Enables chunk-level ZSTD compression on the worker write path and transparent decompression in CelebornInputStream.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

celeborn.chunk.compression.enabled config to enable / disable chunk level compression (disabled by default)

How was this patch tested?

UTs, ITs

@saurabhd336 saurabhd336 changed the title [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage May 25, 2026
@saurabhd336

Copy link
Copy Markdown
Contributor Author

Hi team @SteNicholas / @s0nskar / @zaynt4606 / others

I wanted to start an early discussion for these proposed changes (can share a design doc too).

At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both.

Wanted to start this discussion for the change thanks

@codecov

codecov Bot commented May 26, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.00000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.28%. Comparing base (b4cb5a0) to head (39595da).
⚠️ Report is 54 commits behind head on main.

Files with missing lines Patch % Lines
.../org/apache/celeborn/common/meta/DiskFileInfo.java 85.72% 1 Missing ⚠️
...rg/apache/celeborn/common/meta/ReduceFileMeta.java 88.89% 0 Missing and 1 partial ⚠️
...leborn/common/network/buffer/FileChunkBuffers.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3699      +/-   ##
==========================================
+ Coverage   66.91%   67.28%   +0.37%     
==========================================
  Files         358      360       +2     
  Lines       21986    22337     +351     
  Branches     1946     1982      +36     
==========================================
+ Hits        14710    15027     +317     
- Misses       6262     6285      +23     
- Partials     1014     1025      +11     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
saurabhd336 and others added 9 commits June 1, 2026 12:48
…ge-record chunks

readChunks() was unconditionally decompressing every chunk, but large
records are written raw (uncompressed) by flushLargeRecord(). The fix
consults ReduceFileMeta.getChunkCompressed() per chunk and only calls
ZstdInputStream on chunks that were actually compressed. Also exposes
compressAndFlush() as public so the test can call it directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] Support chunk level compression to optimize storage Jun 5, 2026
@saurabhd336

Copy link
Copy Markdown
Contributor Author

Hi team
@SteNicholas / @s0nskar / @zaynt4606 / others

Ping on this again!

So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR?

@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] Support chunk level compression to optimize storage [CELEBORN-2350] Support chunk level compression to optimize storage Jun 5, 2026
@SteNicholas SteNicholas requested a review from Copilot June 8, 2026 06:19

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces chunk-level ZSTD compression for shuffle data written by workers (disk write path) and adds transparent per-chunk decompression on the client read path (via CelebornInputStream). It threads a new ChunkCompressionContext from client → worker (ReserveSlots) and extends stream metadata so clients can identify which chunks are compressed (to support “large record” chunks written uncompressed).

Changes:

  • Add worker-side FileChannelWriter abstraction with a new ChunkCompressedFileChannelWriter that buffers into fixed-size chunks and ZSTD-compresses each chunk before writing.
  • Extend wire/file metadata (PbStreamHandler, PbFileInfo, ReduceFileMeta) to carry chunk-compressed flags and chunk compression config.
  • Update client readers and CelebornInputStream to support streaming chunk decompression while preserving existing batch-level codecs.

Reviewed changes

Copilot reviewed 49 out of 49 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala Update test writer creation call to pass ChunkCompressionContext.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala Update writer/file-info construction for new chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala Update writer creation for added chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala Update DiskFileInfo construction for added chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala New end-to-end mini-cluster tests for chunk-compressed read/write scenarios.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/MmapMemoryManagerSuiteJ.java New unit tests for mmap-based buffer allocator.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkCompressedFileChannelWriterSuiteJ.java New unit tests validating chunk writer chunking/offsets/compression behavior.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkBufferPoolSuiteJ.java New unit tests for pooled buffer pair behavior and concurrency.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala Switch local tier writer flush to use FileChannelWriter abstraction.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala Pass chunk compression context into disk file creation.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala Thread chunk compression context through writer creation and DiskFileInfo.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala Use FileChannelWriter in local flush task rather than raw FileChannel.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala Include per-chunk compression flags in stream handler metadata to clients.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Receive/pass chunk compression context from ReserveSlots to writer creation.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java Explicitly reject sorting chunk-compressed shuffle files (not yet supported).
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java Add ChunkCompressionContext to writer context and accessors.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileWriterType.java New enum for file writer types.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriterFactory.java Factory to choose bypass vs chunk-compressed writer based on file info.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriter.java New abstraction for file write/close.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/MmapMemoryManager.java New mmap-backed allocator for chunk buffers.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkCompressedFileChannelWriter.java New chunk-buffering + ZSTD-compressing local file writer.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkBufferPool.java New pool for reusable (chunk, compressed) buffer pairs.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/BypassFileChannelWriter.java New writer preserving existing direct-to-channel behavior.
worker/pom.xml Add zstd-jni dependency for worker module.
project/CelebornBuild.scala Add zstd JNI dependency for worker build.
docs/configuration/client.md Document new chunk compression configs.
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala Update PB serde tests for new file info ctor + chunk compression context.
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala Serialize/deserialize chunk compression config into PbFileInfo.
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala Send/receive chunk compression config in ReserveSlots messages.
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Add chunk compression config entries + accessors.
common/src/main/proto/TransportMessages.proto Add PbChunkCompressionConfig and per-chunk compressed flags to stream handler.
common/src/main/java/org/apache/celeborn/common/network/buffer/FileChunkBuffers.java Enforce no sliced reads for chunk-compressed files.
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java Track per-chunk compression flags in reduce metadata.
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java Add setBytesFlushed to override file length post-close.
common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java Add ChunkCompressionContext and expose chunk compression settings.
common/src/main/java/org/apache/celeborn/common/compression/ChunkCompressionContext.java New context object to carry chunk compression settings.
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala Send chunk compression settings to worker via ReserveSlots.
client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java Return (ByteBuf, isChunkCompressed) from next().
client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java Change API to return (ByteBuf, isChunkCompressed) from next().
client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java Return per-chunk compression flag alongside the chunk buffer.
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java Return per-chunk compression flag alongside the chunk buffer.
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Stream per-chunk ZSTD decompression and handle large-record uncompressed chunks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java Outdated
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a substantial, genuinely valuable feature (~40% disk reduction is a big deal), and the high-level design — local-disk-only scoping, a clean FileChannelWriter factory, appended proto fields, a guard on the sort path — shows good instincts. The core streaming-decompression machinery in CelebornInputStream is sound. But as implemented it is not safe to enable yet: there are several independent paths that cause silent data corruption or silent data loss when celeborn.chunk.compression.enabled=true. None affect the default-off path, so this isn't a regression for existing users — but the criticals below should block enabling/merging.

Critical (silent data loss / corruption when enabled)

C1 — Final-chunk flush failure is silently committed as success (data loss). ChunkCompressedFileChannelWriter.write() only flushes on overflow, so the trailing partial chunk is written only in close()compressAndFlush(). That close() catches IOException with an empty // log and ignore block, never signals the FlushNotifier, then unconditionally runs setBytesFlushed(...) + replaceFileMeta(...). Pre-PR, the final write happened on the flush thread and a failure propagated through notifier.checkException()Controller.commitFilesfailedIds. Now a failed last-chunk write commits the partition as success with its tail records missing — no FetchFailedException, no recompute. FileChannelWriter.close(boolean) doesn't declare throws, which is the root enabler. Fix: close() must propagate (or set the notifier) so the commit fails.

C2 — The per-chunk chunkCompressed list is never persisted → corruption after worker graceful-shutdown recovery. PbReduceFileMeta has only chunkOffsets + sorted; PbSerDeUtils.fromPbFileInfo rebuilds with new ReduceFileMeta(getChunkOffsetsList), dropping chunkCompressed/chunkSize. After recovery the stream handler sends an empty list, and the reader then defaults every chunk to compressed (see C4) — so any raw large-record chunk (flushLargeRecord records false) gets fed through ZstdInputStream → corrupt/failed reads. Fix: add chunkCompressed (and chunkSize) to PbReduceFileMeta and round-trip them; add a PbSerDeUtilsTest with an enabled context (the current test only uses disabled()).

C3 — DfsPartitionReader.next() hardcodes compressed = true. It returns Pair.of(chunk.getRight(), true) and never consults the per-chunk list. DFS/S3/OSS files are always written uncompressed (StorageManager forces disabled()), and a DFS network chunk is a dfsReadChunkSize slice, not a ZSTD frame. So enabled=true + remote storage wraps every raw slice in ZstdInputStream → total corruption. Fix: return false (DFS never chunk-compresses).

C4 — Decompression keys off the reader's global config + an "absent list ⇒ compressed" default, instead of the authoritative per-chunk flag. CelebornInputStream decompresses when (chunkCompressed && currentChunkCompressed), where chunkCompressed = conf.isChunkCompressionEnabled() (reader-side), and both readers compute getChunkCompressedCount()==0 || getChunkCompressed(idx) — i.e. absent list means "assume compressed." This is the lynchpin that turns C2/C3 and version skew into corruption: a NEW client (flag on) reading an OLD worker's uncompressed file, or a recovered file, decompresses raw bytes. Fix: make the per-chunk list the sole authority — absent/false ⇒ read raw; let the global config influence writes only.

High

H1 — LocalPartitionReader indexes the per-chunk flag with a relative index. getChunkCompressed(returnedChunks) but returnedChunks is relative to startChunkIndex; for skew/AQE sub-range reads the absolute index is startChunkIndex + returnedChunks (WorkerPartitionReader correctly uses the absolute chunk index). On a mixed compressed/raw file read via skew, the wrong flag is used → silent corruption.

H2 — AQE/skew read of a chunk-compressed file throws an uncaught UnsupportedOperationException. The PartitionFilesSorter guard throws a RuntimeException, but FetchHandler.handleReduceOpenStreamInternal/handleOpenStreamInternal catch only IOException, so it escapes: no OPEN_STREAM_FAILED reply, request hangs to timeout, no metric. Either throw IOException and surface it cleanly, or — better — reject enabled + AQE/skew at reserve time so the combination can't arise.

Medium

  • M1 — Compressed buffer sized at chunkSize, not Zstd.compressBound(chunkSize). A full incompressible chunk (likely, since this can stack on batch-level LZ4/ZSTD) overflows the destination and ZSTD throws; the return code also isn't checked with Zstd.isError. The client's own ZstdCompressor already uses compressBound — follow that. Combined with C1, an overflow on the last chunk = silent loss.
  • M2 — MmapMemoryManager.close() has no caller; the buffers are untracked, unbounded-until-peak, and /tmp-backed. Backing files (512 MB each) leak for the worker lifetime (reclaimed only via deleteOnExit), are not accounted by Celeborn's MemoryManager (bypassing congestion control), and on a tmpfs /tmp are RAM the worker doesn't know it holds. Ownership/lifecycle and memory budgeting need to be wired in.
  • M3 — bytesFlushed dual semantics. It's accumulated as uncompressed during flush, then overwritten with the compressed total in close(). Works only because of strict close-after-flush ordering and full meta replacement; fragile, and disk-usage metrics (getFileLength) now report compressed while in-flight incrementDiskBuffer tracks uncompressed.
  • M4 — celeborn.chunk.compression.level has no checkValue despite the documented 1–22 range; out-of-range passes straight to ZSTD.

Low / Nit

  • ChunkBufferPool comment says "allocateDirect, NOT MmapMemoryManager … would silently corrupt the source," but the code allocates the compressed buffer from MmapMemoryManager. It's safe today (the manager hands out non-overlapping slices), but the comment is actively misleading — fix it.
  • CHUNK_COMPRESSION_ENABLED is tagged version("0.3.0") (and in docs) for a feature added now — should be the current release.
  • destroy() now runs a wasted compress+write right before deleting the file.
  • Read metrics (incBytesRead) count decompressed batch sizes, over-reporting vs on-wire bytes.

Suggested framing

The cleanest fix narrative ties most of the criticals together: (1) make the per-chunk list the single source of truth for decompression (absent ⇒ raw), (2) persist that list, (3) have DFS report false, and (4) make close() propagate flush failures. With those plus compressBound and the mmap lifecycle, the feature gets much closer to safely enable-able. Given the promising results and that this was opened for early discussion, it might be worth landing it explicitly gated/experimental with these tracked as blockers-before-GA.

@saurabhd336 saurabhd336 requested a review from Copilot June 9, 2026 08:03

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (4)

common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:1

  • PbFileInfo now includes chunkCompressionConfig, and fromPbFileInfo reconstructs DiskFileInfo using pbFileInfo.getChunkCompressionConfig. However, toPbFileInfo never sets chunkCompressionConfig, so serialized metadata will always deserialize to (enabled=false, level=0) even for chunk-compressed shuffles. Populate builder.setChunkCompressionConfig(...) using the source file info's ChunkCompressionContext to preserve correctness across PB round-trips.
/*

worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1

  • If the read-back byte stream is corrupted or mis-ordered such that no expected prefix is found, this code silently drops the remainder (remaining = \"\"). That can let tests pass incorrectly (the loop that asserts blob content only iterates over whatever was extracted). Make this fail fast (e.g., throw/assert when None is hit) and assert that all expected prefixes were extracted (e.g., result.size == prefixes.length).
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The fixed Thread.sleep calls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely on mapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps in finally.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The fixed Thread.sleep calls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely on mapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps in finally.

@saurabhd336 saurabhd336 requested a review from Copilot June 9, 2026 08:15

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (2)

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1

  • LocalTierWriter closes fileChannelWriter in both closeStreams() and closeResource(). ChunkCompressedFileChannelWriter is idempotent via an internal closed flag, but BypassFileChannelWriter.close() is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): make FileChannelWriter.close(...) idempotent for all implementations (e.g., add a closed guard to BypassFileChannelWriter), or ensure LocalTierWriter only closes once (e.g., track a closed flag and skip in closeResource() if already closed).
    worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1
  • LocalTierWriter closes fileChannelWriter in both closeStreams() and closeResource(). ChunkCompressedFileChannelWriter is idempotent via an internal closed flag, but BypassFileChannelWriter.close() is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): make FileChannelWriter.close(...) idempotent for all implementations (e.g., add a closed guard to BypassFileChannelWriter), or ensure LocalTierWriter only closes once (e.g., track a closed flag and skip in closeResource() if already closed).

Comment thread client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Outdated
Comment thread client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Outdated
Comment thread client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Outdated
Comment thread client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Outdated
@saurabhd336 saurabhd336 requested a review from Copilot June 10, 2026 05:39

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (4)

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:1

  • ChunkBufferPool (and its MmapMemoryManager) is constructed eagerly for every worker StorageManager, which will create/check the mmap tmp directory even when chunk compression is never used. Consider lazy initialization (e.g., lazy val chunkBufferPool = ...) or constructing the pool only when a chunk-compressed writer is actually requested, so the common path (chunk compression disabled) avoids filesystem work and extra objects.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • If parsing ever desynchronizes (e.g., unexpected bytes, encoding issues), case None silently drops the rest of the data and returns a partial result. Since the assertions iterate only over readStringMap, the test can pass while missing blobs. Make the test fail loudly on None (or assert result.size == prefixes.length and that all expected prefixes are present) so it actually detects corruption/truncation.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The test uses fixed Thread.sleep calls to wait for cluster progress and shutdown, which is prone to flakiness under CI load (slow machines, GC pauses). Prefer waiting on an explicit condition (e.g., polling for commit/mapperEnd completion or using a client/server signal) with a bounded timeout, and avoid sleeps in finally where possible.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The test uses fixed Thread.sleep calls to wait for cluster progress and shutdown, which is prone to flakiness under CI load (slow machines, GC pauses). Prefer waiting on an explicit condition (e.g., polling for commit/mapperEnd completion or using a client/server signal) with a bounded timeout, and avoid sleeps in finally where possible.

Comment thread client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Outdated
Comment thread docs/configuration/worker.md Outdated
@saurabhd336 saurabhd336 requested a review from Copilot June 10, 2026 07:03

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (3)

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1

  • Rethrowing the IOException here likely changes the close semantics: it can prevent notifyFileCommitted() (and any subsequent cleanup/notifications in close()) from running, which can leave the storage manager in an inconsistent state (e.g., writers not deregistered, commit not notified). Consider capturing the failure, performing the usual notifications/cleanup in a finally, and only rethrowing after state has been cleaned up (or returning a failure status upstream without skipping cleanup).
    worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:1
  • ChunkBufferPool (via MmapMemoryManager) creates/ensures the mmap tmp directory during StorageManager construction, even when chunk compression is unused for the lifetime of the worker. This has an operational footprint (unexpected directories, potential permission failures on startup). Consider lazy-initializing the pool on first chunk-compressed writer creation (and guarding close() accordingly), or deferring any filesystem side effects in MmapMemoryManager until the first allocation.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • Using fixed Thread.sleep(...) in mini-cluster integration tests is prone to flakiness and increases CI runtime. Prefer waiting on a concrete condition (e.g., an ack/future/metric indicating data is committed and readable) with a bounded timeout + polling/backoff, so the test is both faster on healthy runs and more reliable under load.

@saurabhd336 saurabhd336 requested a review from Copilot June 10, 2026 07:24

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 8 comments.

Comments suppressed due to low confidence (2)

worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1

  • The test never closes inputStream, which can leak file descriptors / network resources in the mini-cluster runs. Wrap the read loop in a try/finally (or equivalent) to ensure inputStream.close() is always called.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • If the concatenated payload is corrupted or a prefix boundary is missed, extractBlobs silently drops the remaining data (case None => remaining = \"\") and the test can still pass because it only asserts over extracted entries. Prefer failing fast here (e.g., throw/assert on None) and assert that all expected prefixes were found (e.g., result.size == prefixMap.size) to avoid false positives.

Comment thread common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@saurabhd336

Copy link
Copy Markdown
Contributor Author

@SteNicholas Possible to take a look? I've mostly addressed all the comments from Copilot

@SteNicholas

Copy link
Copy Markdown
Member

@saurabhd336 Took another full pass at c96df79 — re-verified each of my June 8 findings against the current code rather than the commit messages. This has improved a lot: all four criticals and both highs are genuinely fixed.

Verified fixed

  • C1 (final-flush failure silently committed)close(boolean) now throws, propagates compress/fsync/close failures, and only runs setBytesFlushed/replaceFileMeta on success, with buffer release and channel close in finally. A failed trailing flush now fails the commit as it should.
  • C2 (per-chunk flags lost on recovery)PbFileInfo round-trips chunkCompressed, and I confirmed graceful-shutdown recovery actually goes through to/fromPbFileInfoMapPbFileInfo. (The PbReduceFileMeta message I originally pointed at turns out to be dead proto — zero source usages — so this is the right place.) Thanks for extending PbSerDeUtilsTest.
  • C3DfsPartitionReader.next() returns false unconditionally.
  • C4 (decompression authority) — the read path no longer consults conf.isChunkCompressionEnabled() at all; both readers compute getChunkCompressedCount() > chunkIdx && getChunkCompressed(chunkIdx), so an absent list now means raw. Exactly the inversion needed — this also kills the version-skew corruption scenarios.
  • H1LocalPartitionReader uses the absolute index; WorkerPartitionReader keying off the fetched chunk id is even more robust. H2 — the sorter guard now throws IOException, which FetchHandler converts to a clean stream-open failure instead of a hang.
  • M1 (compressBound + isError), M2 (configurable mmap tmpDir, MmapMemoryManager.close() deletes backing files, wired into StorageManager.close()), M4 (level checkValue, 0.6.4 version tags) — all good.

On the sliced-read thread: I agree with your call. Both the Java and C++ readers always fetch full chunks (offset=0, len=MAX), and if a slice ever does arrive for a compressed chunk, FileChunkBuffers throws and handleChunkFetchRequest catches it → ChunkFetchFailure — clean failure, no hang, no corruption. Fail-fast is right here.

One new gap found this round

The C++ native client is chunk-compression-blind. PbStreamHandler now carries chunkCompressed (field 5), but nothing under cpp/ reads it, and the C++ CelebornInputStream has no ZSTD chunk decompression. A native-engine deployment (Java driver/LifecycleManager + C++ readers) that enables celeborn.chunk.compression.enabled would have workers writing ZSTD frames that C++ readers consume as raw shuffle data — batch-codec failures at best, silent garbage if the batch codec is NONE.

Ask: document the feature as Java-client-only in the config doc, and ideally make the C++ reader fail fast when any chunkCompressed flag is set in the stream handler it receives. That keeps the failure loud instead of silent.

Non-blocking follow-ups worth tracking (fine post-merge)

  • mmap buffer pool never shrinks (bounded by peak concurrent writers) and remains invisible to MemoryManager congestion control;
  • AQE/skew reads of compressed files unsupported — now fails cleanly, acceptable for experimental;
  • bytesFlushed uncompressed-then-compressed dual semantics still fragile-but-working.

With the C++ gap documented/guarded, I'd consider this mergeable as a default-off experimental feature. Nice work turning the data-safety story around so quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants