[CELEBORN-2350] Support chunk level compression to optimize storage#3699
[CELEBORN-2350] Support chunk level compression to optimize storage#3699saurabhd336 wants to merge 29 commits into
Conversation
|
Hi team @SteNicholas / @s0nskar / @zaynt4606 / others I wanted to start an early discussion for these proposed changes (can share a design doc too). At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both. Wanted to start this discussion for the change thanks |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3699 +/- ##
==========================================
+ Coverage 66.91% 67.28% +0.37%
==========================================
Files 358 360 +2
Lines 21986 22337 +351
Branches 1946 1982 +36
==========================================
+ Hits 14710 15027 +317
- Misses 6262 6285 +23
- Partials 1014 1025 +11 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
1d92a40 to
cf8d472
Compare
…ge-record chunks readChunks() was unconditionally decompressing every chunk, but large records are written raw (uncompressed) by flushLargeRecord(). The fix consults ReduceFileMeta.getChunkCompressed() per chunk and only calls ZstdInputStream on chunks that were actually compressed. Also exposes compressAndFlush() as public so the test can call it directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi team Ping on this again! So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR? |
There was a problem hiding this comment.
Pull request overview
This PR introduces chunk-level ZSTD compression for shuffle data written by workers (disk write path) and adds transparent per-chunk decompression on the client read path (via CelebornInputStream). It threads a new ChunkCompressionContext from client → worker (ReserveSlots) and extends stream metadata so clients can identify which chunks are compressed (to support “large record” chunks written uncompressed).
Changes:
- Add worker-side
FileChannelWriterabstraction with a newChunkCompressedFileChannelWriterthat buffers into fixed-size chunks and ZSTD-compresses each chunk before writing. - Extend wire/file metadata (
PbStreamHandler,PbFileInfo,ReduceFileMeta) to carry chunk-compressed flags and chunk compression config. - Update client readers and
CelebornInputStreamto support streaming chunk decompression while preserving existing batch-level codecs.
Reviewed changes
Copilot reviewed 49 out of 49 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala | Update test writer creation call to pass ChunkCompressionContext. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala | Update writer/file-info construction for new chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala | Update writer creation for added chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala | Update DiskFileInfo construction for added chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala | New end-to-end mini-cluster tests for chunk-compressed read/write scenarios. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/MmapMemoryManagerSuiteJ.java | New unit tests for mmap-based buffer allocator. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkCompressedFileChannelWriterSuiteJ.java | New unit tests validating chunk writer chunking/offsets/compression behavior. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkBufferPoolSuiteJ.java | New unit tests for pooled buffer pair behavior and concurrency. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala | Switch local tier writer flush to use FileChannelWriter abstraction. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala | Pass chunk compression context into disk file creation. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala | Thread chunk compression context through writer creation and DiskFileInfo. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala | Use FileChannelWriter in local flush task rather than raw FileChannel. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala | Include per-chunk compression flags in stream handler metadata to clients. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala | Receive/pass chunk compression context from ReserveSlots to writer creation. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java | Explicitly reject sorting chunk-compressed shuffle files (not yet supported). |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java | Add ChunkCompressionContext to writer context and accessors. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileWriterType.java | New enum for file writer types. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriterFactory.java | Factory to choose bypass vs chunk-compressed writer based on file info. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriter.java | New abstraction for file write/close. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/MmapMemoryManager.java | New mmap-backed allocator for chunk buffers. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkCompressedFileChannelWriter.java | New chunk-buffering + ZSTD-compressing local file writer. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkBufferPool.java | New pool for reusable (chunk, compressed) buffer pairs. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/BypassFileChannelWriter.java | New writer preserving existing direct-to-channel behavior. |
| worker/pom.xml | Add zstd-jni dependency for worker module. |
| project/CelebornBuild.scala | Add zstd JNI dependency for worker build. |
| docs/configuration/client.md | Document new chunk compression configs. |
| common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala | Update PB serde tests for new file info ctor + chunk compression context. |
| common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala | Serialize/deserialize chunk compression config into PbFileInfo. |
| common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala | Send/receive chunk compression config in ReserveSlots messages. |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Add chunk compression config entries + accessors. |
| common/src/main/proto/TransportMessages.proto | Add PbChunkCompressionConfig and per-chunk compressed flags to stream handler. |
| common/src/main/java/org/apache/celeborn/common/network/buffer/FileChunkBuffers.java | Enforce no sliced reads for chunk-compressed files. |
| common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java | Track per-chunk compression flags in reduce metadata. |
| common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java | Add setBytesFlushed to override file length post-close. |
| common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java | Add ChunkCompressionContext and expose chunk compression settings. |
| common/src/main/java/org/apache/celeborn/common/compression/ChunkCompressionContext.java | New context object to carry chunk compression settings. |
| client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | Send chunk compression settings to worker via ReserveSlots. |
| client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java | Return (ByteBuf, isChunkCompressed) from next(). |
| client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java | Change API to return (ByteBuf, isChunkCompressed) from next(). |
| client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java | Return per-chunk compression flag alongside the chunk buffer. |
| client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java | Return per-chunk compression flag alongside the chunk buffer. |
| client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java | Stream per-chunk ZSTD decompression and handle large-record uncompressed chunks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
SteNicholas
left a comment
There was a problem hiding this comment.
This is a substantial, genuinely valuable feature (~40% disk reduction is a big deal), and the high-level design — local-disk-only scoping, a clean FileChannelWriter factory, appended proto fields, a guard on the sort path — shows good instincts. The core streaming-decompression machinery in CelebornInputStream is sound. But as implemented it is not safe to enable yet: there are several independent paths that cause silent data corruption or silent data loss when celeborn.chunk.compression.enabled=true. None affect the default-off path, so this isn't a regression for existing users — but the criticals below should block enabling/merging.
Critical (silent data loss / corruption when enabled)
C1 — Final-chunk flush failure is silently committed as success (data loss). ChunkCompressedFileChannelWriter.write() only flushes on overflow, so the trailing partial chunk is written only in close() → compressAndFlush(). That close() catches IOException with an empty // log and ignore block, never signals the FlushNotifier, then unconditionally runs setBytesFlushed(...) + replaceFileMeta(...). Pre-PR, the final write happened on the flush thread and a failure propagated through notifier.checkException() → Controller.commitFiles → failedIds. Now a failed last-chunk write commits the partition as success with its tail records missing — no FetchFailedException, no recompute. FileChannelWriter.close(boolean) doesn't declare throws, which is the root enabler. Fix: close() must propagate (or set the notifier) so the commit fails.
C2 — The per-chunk chunkCompressed list is never persisted → corruption after worker graceful-shutdown recovery. PbReduceFileMeta has only chunkOffsets + sorted; PbSerDeUtils.fromPbFileInfo rebuilds with new ReduceFileMeta(getChunkOffsetsList), dropping chunkCompressed/chunkSize. After recovery the stream handler sends an empty list, and the reader then defaults every chunk to compressed (see C4) — so any raw large-record chunk (flushLargeRecord records false) gets fed through ZstdInputStream → corrupt/failed reads. Fix: add chunkCompressed (and chunkSize) to PbReduceFileMeta and round-trip them; add a PbSerDeUtilsTest with an enabled context (the current test only uses disabled()).
C3 — DfsPartitionReader.next() hardcodes compressed = true. It returns Pair.of(chunk.getRight(), true) and never consults the per-chunk list. DFS/S3/OSS files are always written uncompressed (StorageManager forces disabled()), and a DFS network chunk is a dfsReadChunkSize slice, not a ZSTD frame. So enabled=true + remote storage wraps every raw slice in ZstdInputStream → total corruption. Fix: return false (DFS never chunk-compresses).
C4 — Decompression keys off the reader's global config + an "absent list ⇒ compressed" default, instead of the authoritative per-chunk flag. CelebornInputStream decompresses when (chunkCompressed && currentChunkCompressed), where chunkCompressed = conf.isChunkCompressionEnabled() (reader-side), and both readers compute getChunkCompressedCount()==0 || getChunkCompressed(idx) — i.e. absent list means "assume compressed." This is the lynchpin that turns C2/C3 and version skew into corruption: a NEW client (flag on) reading an OLD worker's uncompressed file, or a recovered file, decompresses raw bytes. Fix: make the per-chunk list the sole authority — absent/false ⇒ read raw; let the global config influence writes only.
High
H1 — LocalPartitionReader indexes the per-chunk flag with a relative index. getChunkCompressed(returnedChunks) but returnedChunks is relative to startChunkIndex; for skew/AQE sub-range reads the absolute index is startChunkIndex + returnedChunks (WorkerPartitionReader correctly uses the absolute chunk index). On a mixed compressed/raw file read via skew, the wrong flag is used → silent corruption.
H2 — AQE/skew read of a chunk-compressed file throws an uncaught UnsupportedOperationException. The PartitionFilesSorter guard throws a RuntimeException, but FetchHandler.handleReduceOpenStreamInternal/handleOpenStreamInternal catch only IOException, so it escapes: no OPEN_STREAM_FAILED reply, request hangs to timeout, no metric. Either throw IOException and surface it cleanly, or — better — reject enabled + AQE/skew at reserve time so the combination can't arise.
Medium
- M1 — Compressed buffer sized at
chunkSize, notZstd.compressBound(chunkSize). A full incompressible chunk (likely, since this can stack on batch-level LZ4/ZSTD) overflows the destination and ZSTD throws; the return code also isn't checked withZstd.isError. The client's ownZstdCompressoralready usescompressBound— follow that. Combined with C1, an overflow on the last chunk = silent loss. - M2 —
MmapMemoryManager.close()has no caller; the buffers are untracked, unbounded-until-peak, and/tmp-backed. Backing files (512 MB each) leak for the worker lifetime (reclaimed only viadeleteOnExit), are not accounted by Celeborn'sMemoryManager(bypassing congestion control), and on a tmpfs/tmpare RAM the worker doesn't know it holds. Ownership/lifecycle and memory budgeting need to be wired in. - M3 —
bytesFlusheddual semantics. It's accumulated as uncompressed during flush, then overwritten with the compressed total inclose(). Works only because of strict close-after-flush ordering and full meta replacement; fragile, and disk-usage metrics (getFileLength) now report compressed while in-flightincrementDiskBuffertracks uncompressed. - M4 —
celeborn.chunk.compression.levelhas nocheckValuedespite the documented 1–22 range; out-of-range passes straight to ZSTD.
Low / Nit
ChunkBufferPoolcomment says "allocateDirect, NOT MmapMemoryManager … would silently corrupt the source," but the code allocates the compressed buffer fromMmapMemoryManager. It's safe today (the manager hands out non-overlapping slices), but the comment is actively misleading — fix it.CHUNK_COMPRESSION_ENABLEDis taggedversion("0.3.0")(and in docs) for a feature added now — should be the current release.destroy()now runs a wasted compress+write right before deleting the file.- Read metrics (
incBytesRead) count decompressed batch sizes, over-reporting vs on-wire bytes.
Suggested framing
The cleanest fix narrative ties most of the criticals together: (1) make the per-chunk list the single source of truth for decompression (absent ⇒ raw), (2) persist that list, (3) have DFS report false, and (4) make close() propagate flush failures. With those plus compressBound and the mmap lifecycle, the feature gets much closer to safely enable-able. Given the promising results and that this was opened for early discussion, it might be worth landing it explicitly gated/experimental with these tracked as blockers-before-GA.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (4)
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:1
PbFileInfonow includeschunkCompressionConfig, andfromPbFileInforeconstructsDiskFileInfousingpbFileInfo.getChunkCompressionConfig. However,toPbFileInfonever setschunkCompressionConfig, so serialized metadata will always deserialize to(enabled=false, level=0)even for chunk-compressed shuffles. Populatebuilder.setChunkCompressionConfig(...)using the source file info'sChunkCompressionContextto preserve correctness across PB round-trips.
/*
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
- If the read-back byte stream is corrupted or mis-ordered such that no expected prefix is found, this code silently drops the remainder (
remaining = \"\"). That can let tests pass incorrectly (the loop that asserts blob content only iterates over whatever was extracted). Make this fail fast (e.g., throw/assert whenNoneis hit) and assert that all expected prefixes were extracted (e.g.,result.size == prefixes.length).
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The fixed
Thread.sleepcalls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely onmapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps infinally.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The fixed
Thread.sleepcalls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely onmapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps infinally.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (2)
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1
LocalTierWriterclosesfileChannelWriterin bothcloseStreams()andcloseResource().ChunkCompressedFileChannelWriteris idempotent via an internalclosedflag, butBypassFileChannelWriter.close()is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): makeFileChannelWriter.close(...)idempotent for all implementations (e.g., add aclosedguard toBypassFileChannelWriter), or ensureLocalTierWriteronly closes once (e.g., track a closed flag and skip incloseResource()if already closed).
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1LocalTierWriterclosesfileChannelWriterin bothcloseStreams()andcloseResource().ChunkCompressedFileChannelWriteris idempotent via an internalclosedflag, butBypassFileChannelWriter.close()is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): makeFileChannelWriter.close(...)idempotent for all implementations (e.g., add aclosedguard toBypassFileChannelWriter), or ensureLocalTierWriteronly closes once (e.g., track a closed flag and skip incloseResource()if already closed).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (4)
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:1
ChunkBufferPool(and itsMmapMemoryManager) is constructed eagerly for every workerStorageManager, which will create/check the mmap tmp directory even when chunk compression is never used. Consider lazy initialization (e.g.,lazy val chunkBufferPool = ...) or constructing the pool only when a chunk-compressed writer is actually requested, so the common path (chunk compression disabled) avoids filesystem work and extra objects.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1- If parsing ever desynchronizes (e.g., unexpected bytes, encoding issues),
case Nonesilently drops the rest of the data and returns a partialresult. Since the assertions iterate only overreadStringMap, the test can pass while missing blobs. Make the test fail loudly onNone(or assertresult.size == prefixes.lengthand that all expected prefixes are present) so it actually detects corruption/truncation.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The test uses fixed
Thread.sleepcalls to wait for cluster progress and shutdown, which is prone to flakiness under CI load (slow machines, GC pauses). Prefer waiting on an explicit condition (e.g., polling for commit/mapperEnd completion or using a client/server signal) with a bounded timeout, and avoid sleeps infinallywhere possible.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The test uses fixed
Thread.sleepcalls to wait for cluster progress and shutdown, which is prone to flakiness under CI load (slow machines, GC pauses). Prefer waiting on an explicit condition (e.g., polling for commit/mapperEnd completion or using a client/server signal) with a bounded timeout, and avoid sleeps infinallywhere possible.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (3)
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1
- Rethrowing the IOException here likely changes the close semantics: it can prevent
notifyFileCommitted()(and any subsequent cleanup/notifications inclose()) from running, which can leave the storage manager in an inconsistent state (e.g., writers not deregistered, commit not notified). Consider capturing the failure, performing the usual notifications/cleanup in afinally, and only rethrowing after state has been cleaned up (or returning a failure status upstream without skipping cleanup).
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:1 ChunkBufferPool(viaMmapMemoryManager) creates/ensures the mmap tmp directory duringStorageManagerconstruction, even when chunk compression is unused for the lifetime of the worker. This has an operational footprint (unexpected directories, potential permission failures on startup). Consider lazy-initializing the pool on first chunk-compressed writer creation (and guardingclose()accordingly), or deferring any filesystem side effects inMmapMemoryManageruntil the first allocation.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1- Using fixed
Thread.sleep(...)in mini-cluster integration tests is prone to flakiness and increases CI runtime. Prefer waiting on a concrete condition (e.g., an ack/future/metric indicating data is committed and readable) with a bounded timeout + polling/backoff, so the test is both faster on healthy runs and more reliable under load.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (2)
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
- The test never closes
inputStream, which can leak file descriptors / network resources in the mini-cluster runs. Wrap the read loop in atry/finally(or equivalent) to ensureinputStream.close()is always called.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - If the concatenated payload is corrupted or a prefix boundary is missed,
extractBlobssilently drops the remaining data (case None => remaining = \"\") and the test can still pass because it only asserts over extracted entries. Prefer failing fast here (e.g., throw/assert onNone) and assert that all expected prefixes were found (e.g.,result.size == prefixMap.size) to avoid false positives.
|
@SteNicholas Possible to take a look? I've mostly addressed all the comments from Copilot |
|
@saurabhd336 Took another full pass at Verified fixed
On the sliced-read thread: I agree with your call. Both the Java and C++ readers always fetch full chunks (offset=0, len=MAX), and if a slice ever does arrive for a compressed chunk, One new gap found this roundThe C++ native client is chunk-compression-blind. Ask: document the feature as Java-client-only in the config doc, and ideally make the C++ reader fail fast when any Non-blocking follow-ups worth tracking (fine post-merge)
With the C++ gap documented/guarded, I'd consider this mergeable as a default-off experimental feature. Nice work turning the data-safety story around so quickly. |
What changes were proposed in this pull request?
Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.
This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.
This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.
Impl details
Writer side
FileChannelWriterinterface which supports write / close functionalities.BypassFileChannelWriteris the default and ensures the current behaviour (directly write flushBuffer to disk file channel).ChunkCompressedFileChannelWriter: Accumulates records in a directByteBufferofchunkSizebytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger thanchunkSizestream directly to disk viaZstdOutputStream. Replaces compressed chunk-boundary offsets intoReduceFileMetaon close. Also updates thebytesFlushedto overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered byMmapMemoryManagerandChunkBufferPoolwhich uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.ChunkCompressedFileChannelWriterandFileChannelWriteris made basis the new config set by client duringReserveSlots(conf.isChunkCompressionEnabled).Read side
CelebornInputStream: When reading chunkCompressed chunks, wraps the readByteBufinto a ZSTDIs to inplace decompress and read.Configs added
celeborn.chunk.compression.enabledfalseCelebornInputStream.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
celeborn.chunk.compression.enabledconfig to enable / disable chunk level compression (disabled by default)How was this patch tested?
UTs, ITs