Add KV cache CPU offload support#35
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR implements Phase 0 KV cache CPU offload for the NPU serving engine. It introduces a new ChangesKV Cache CPU Offload
Sequence Diagram(s)sequenceDiagram
participant AsyncEngine
participant Scheduler
participant KvCacheManager
participant WorkerProcess
participant CPUKvOffloadBackend
rect rgba(70, 130, 180, 0.5)
note over AsyncEngine,Scheduler: Per-step schedule phase
AsyncEngine->>Scheduler: schedule()
Scheduler->>KvCacheManager: build_cpu_load_job(cpu_block_ids) [prefix-cache hit on CPU block]
Scheduler->>KvCacheManager: build_cpu_store_job(freed_block_ids) [on request completion]
Scheduler-->>AsyncEngine: SchedulerOutput(kv_transfer_jobs=[...])
end
rect rgba(60, 179, 113, 0.5)
note over AsyncEngine,CPUKvOffloadBackend: Worker execution phase
AsyncEngine->>WorkerProcess: WorkerCommand(kv_transfer_jobs=[...])
WorkerProcess->>CPUKvOffloadBackend: submit(job) for each job
CPUKvOffloadBackend->>CPUKvOffloadBackend: _copy_npu_to_cpu / _copy_cpu_to_npu
WorkerProcess->>CPUKvOffloadBackend: wait(job_ids)
CPUKvOffloadBackend-->>WorkerProcess: list[TransferResult]
WorkerProcess-->>AsyncEngine: StepOutput(kv_transfer_results=[...])
end
rect rgba(210, 105, 30, 0.5)
note over AsyncEngine,KvCacheManager: Transfer completion phase
AsyncEngine->>Scheduler: complete_transfer_results(kv_transfer_results)
Scheduler->>KvCacheManager: complete_transfer_result(result) for each result
KvCacheManager->>KvCacheManager: update block location/slot/page_id, free CPU slot if NPU load
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces KV cache CPU offload support to PyPTO, allowing logical KV blocks to be transferred between NPU pages and CPU slots. It adds a CLI option --max-cpu-offload-blocks, implements the transfer logic and backend in python/core/kv_offload.py, and integrates the offloading mechanism into the scheduler, executor, and serving worker. A critical issue was identified in python/core/kv_cache.py where a transfer failure leaves block residency metadata and CPU slots in a corrupted or leaked state; a robust reversion of block states is suggested to handle failures properly.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if not result.success: | ||
| for block in blocks: | ||
| block.location = KVBlockLocation.NPU if block.physical_page_id is not None else KVBlockLocation.CPU | ||
| raise RuntimeError(result.error or "KV offload transfer failed") |
There was a problem hiding this comment.
On transfer failure, the block residency metadata and CPU slots are left in a corrupted state or leaked. Specifically:\n- For Store Jobs (NPU -> CPU): if the transfer fails, the block's cpu_slot_id is not cleared, the CPU slot is leaked, and if ref_cnt == 0, the block is leaked from the free_queue.\n- For Load Jobs (CPU -> NPU): if the transfer fails, the block's location is incorrectly set to KVBlockLocation.NPU and physical_page_id remains set, which will cause the engine to read uninitialized/stale NPU pages.\n\nWe should check the transfer direction and correctly revert the state of the blocks and CPU slots on failure.
if not result.success:\n if isinstance(result.src, NPULoadStoreSpec) and isinstance(result.dst, CPULoadStoreSpec):\n for block in blocks:\n block.location = KVBlockLocation.NPU\n if block.cpu_slot_id is not None:\n self._free_cpu_slots.append(block.cpu_slot_id)\n block.cpu_slot_id = None\n if block.ref_cnt == 0:\n self.free_queue.append(block)\n elif isinstance(result.src, CPULoadStoreSpec) and isinstance(result.dst, NPULoadStoreSpec):\n for block in blocks:\n block.location = KVBlockLocation.CPU\n block.physical_page_id = None\n else:\n for block in blocks:\n block.location = KVBlockLocation.NPU if block.physical_page_id is not None else KVBlockLocation.CPU\n raise RuntimeError(result.error or "KV offload transfer failed")There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
python/core/async_engine.py (1)
275-282:⚠️ Potential issue | 🟠 Major | ⚡ Quick winTransfer failure exceptions propagate uncaught and can crash the engine loop.
complete_transfer_resultscan raiseRuntimeErrorif a transfer fails (fromkv_cache.complete_transfer_result). This exception propagates out of_process_step_outputand crashes the engine loop. Consider wrapping this in error handling similar to howstep_output.erroris handled.Proposed fix: add error handling
def _process_step_output( self, scheduler_output: SchedulerOutput, step_output: StepOutput ) -> None: """Process worker results: update scheduler state, push tokens to request queues.""" - self.scheduler.complete_transfer_results(step_output.kv_transfer_results) + try: + self.scheduler.complete_transfer_results(step_output.kv_transfer_results) + except RuntimeError as e: + logger.error(f"KV transfer completion failed: {e}") + self._handle_step_error(scheduler_output) + return request_outputs = self.scheduler.update_from_output( scheduler_output, step_output.new_tokens )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/async_engine.py` around lines 275 - 282, The `complete_transfer_results` call in the `_process_step_output` method can raise an uncaught `RuntimeError` that crashes the engine loop. Wrap the call to `self.scheduler.complete_transfer_results(step_output.kv_transfer_results)` in a try-except block to catch `RuntimeError`, and handle the error gracefully (log it and continue, or handle it similarly to how `step_output.error` is already being handled elsewhere in the method) instead of allowing the exception to propagate and crash the engine.
🧹 Nitpick comments (3)
python/core/kv_offload.py (1)
250-253: 💤 Low value
waitsilently ignores unknown job IDs.If
waitis called with ajob_idthat was never submitted or already retrieved, it's silently skipped. While this may be intentional for a synchronous backend, it could mask bugs where callers pass incorrect job IDs.Consider logging a warning or raising for unknown IDs during development/debug builds.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/kv_offload.py` around lines 250 - 253, The wait method silently skips job IDs that are not found in self._completed, which could mask bugs where callers pass incorrect job IDs. Modify the wait method to log a warning (or raise an exception for debug builds) when a job_id is not present in self._completed, rather than silently ignoring it. This will help catch issues during development where incorrect job IDs are being passed to the method.tests/test_cli.py (1)
87-95: ⚡ Quick winAdd a regression test for negative
--max-cpu-offload-blocks.Current tests cover enabled/default paths, but not invalid negative input. Add a parser rejection test to lock in the CLI contract once non-negative validation is enforced.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_cli.py` around lines 87 - 95, Add a new test function in test_cli.py to validate that the argument parser rejects negative values for the --max-cpu-offload-blocks flag. The test should call _parse_args with a negative value (e.g., --max-cpu-offload-blocks -1) and assert that an appropriate error is raised (such as SystemExit or argparse.ArgumentTypeError), ensuring the CLI contract properly validates non-negative input for this parameter.python/core/kv_cache.py (1)
340-370: ⚡ Quick winConsider validating that blocks have
ref_cnt == 0before storing to CPU.The method allows storing blocks with
ref_cnt > 0to CPU, which could corrupt active request state. While the scheduler currently guards this (line 459 inscheduler.py), adding validation here would prevent misuse from other callers.Proposed defensive check
def build_cpu_store_job(self, block_ids: list[int], request_id: str | None = None) -> TransferJob: """Build a transfer job that stores resident NPU blocks into CPU slots.""" blocks = [self.blocks[block_id] for block_id in block_ids] + for block in blocks: + if block.ref_cnt > 0: + raise RuntimeError(f"Cannot offload block {block.block_id} with active references") if len(blocks) > self.max_cpu_offload_blocks: raise RuntimeError("Insufficient CPU KV offload slots.")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/kv_cache.py` around lines 340 - 370, The build_cpu_store_job method does not validate that blocks have ref_cnt equal to zero before allowing them to be stored to CPU, which could corrupt active request state. Add a validation check early in the method after retrieving the blocks to ensure all blocks in the blocks list have ref_cnt == 0. If any block has a non-zero ref_cnt, raise a RuntimeError with a descriptive message indicating that blocks with active references cannot be offloaded to CPU. This defensive check should be placed before the loop that processes the blocks and pops CPU slots.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@python/cli/main.py`:
- Around line 82-87: The `--max-cpu-offload-blocks` argument parser currently
accepts any integer via `type=int`, including negative values, and then silently
disables offload when negative values are encountered elsewhere in the code.
Instead of allowing invalid input to pass through, add validation to the
argument parser itself to reject negative values at parse time. Use a custom
type function or the choices parameter in the parser.add_argument call for the
max-cpu-offload-blocks argument to ensure only non-negative integers (0 or
greater) are accepted, failing the CLI validation immediately with an
appropriate error message if a negative value is provided.
In `@python/core/kv_cache.py`:
- Around line 395-423: The failure recovery logic in the
complete_transfer_result method incorrectly restores block location when a
transfer fails. Currently it checks only whether physical_page_id is not None,
but for CPU→NPU transfers that fail, the physical_page_id is already set from
build_cpu_load_job yet the data remains on CPU. Replace the simple
physical_page_id check at line 403 with logic that examines the transfer
direction (using result.src and result.dst type checks) to determine whether the
block should be restored to NPU or CPU, ensuring you correctly restore blocks to
their original location before the failed transfer attempt.
In `@python/core/kv_offload.py`:
- Around line 202-214: The ensure_num_cpu_slots method allocates a new tensor
and replaces self.cpu_slots, which breaks shared-memory references held by other
processes (such as forked worker children) that still point to the old tensor.
Fix this by either preventing resizes after initialization by raising an
exception if ensure_num_cpu_slots is called post-fork, or by preallocating
sufficient capacity upfront during initialization based on
max_cpu_offload_blocks so that ensure_num_cpu_slots is never called after the
CPU slots have been shared with child processes. This ensures all processes
reference the same underlying shared-memory pool.
In `@tests/run_npu_kv_cpu_offload.py`:
- Line 40: Add validation to ensure the --max-cpu-offload-blocks argument is a
positive integer. After parsing arguments, check that the parsed
max_cpu_offload_blocks value is greater than 0, and raise an error with a clear
message if it is not. This validation should fail fast before reaching the code
at lines 129-130 where enable_kv_cpu_offload is set to True, preventing
timeout-driven failures when non-positive block counts are provided.
---
Outside diff comments:
In `@python/core/async_engine.py`:
- Around line 275-282: The `complete_transfer_results` call in the
`_process_step_output` method can raise an uncaught `RuntimeError` that crashes
the engine loop. Wrap the call to
`self.scheduler.complete_transfer_results(step_output.kv_transfer_results)` in a
try-except block to catch `RuntimeError`, and handle the error gracefully (log
it and continue, or handle it similarly to how `step_output.error` is already
being handled elsewhere in the method) instead of allowing the exception to
propagate and crash the engine.
---
Nitpick comments:
In `@python/core/kv_cache.py`:
- Around line 340-370: The build_cpu_store_job method does not validate that
blocks have ref_cnt equal to zero before allowing them to be stored to CPU,
which could corrupt active request state. Add a validation check early in the
method after retrieving the blocks to ensure all blocks in the blocks list have
ref_cnt == 0. If any block has a non-zero ref_cnt, raise a RuntimeError with a
descriptive message indicating that blocks with active references cannot be
offloaded to CPU. This defensive check should be placed before the loop that
processes the blocks and pops CPU slots.
In `@python/core/kv_offload.py`:
- Around line 250-253: The wait method silently skips job IDs that are not found
in self._completed, which could mask bugs where callers pass incorrect job IDs.
Modify the wait method to log a warning (or raise an exception for debug builds)
when a job_id is not present in self._completed, rather than silently ignoring
it. This will help catch issues during development where incorrect job IDs are
being passed to the method.
In `@tests/test_cli.py`:
- Around line 87-95: Add a new test function in test_cli.py to validate that the
argument parser rejects negative values for the --max-cpu-offload-blocks flag.
The test should call _parse_args with a negative value (e.g.,
--max-cpu-offload-blocks -1) and assert that an appropriate error is raised
(such as SystemExit or argparse.ArgumentTypeError), ensuring the CLI contract
properly validates non-negative input for this parameter.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4bf0603c-5d38-4dd6-9a5a-f04980eccbcb
📒 Files selected for processing (12)
examples/model/qwen3_14b/runner/npu_runner.pypython/cli/main.pypython/core/async_engine.pypython/core/kv_cache.pypython/core/kv_offload.pypython/core/model_runner.pypython/core/pypto_executor.pypython/core/scheduler.pypython/core/serving_worker.pypython/core/types.pytests/run_npu_kv_cpu_offload.pytests/test_cli.py
| parser.add_argument( | ||
| "--max-cpu-offload-blocks", | ||
| type=int, | ||
| default=0, | ||
| help="Maximum number of KV blocks to keep in CPU offload storage. 0 disables CPU offload (default: 0).", | ||
| ) |
There was a problem hiding this comment.
Reject negative --max-cpu-offload-blocks values at parse time.
Line 83 currently accepts any integer, and Lines 123-124 propagate negative values into EngineConfig while silently disabling offload via > 0. This accepts invalid input instead of failing fast on CLI validation.
Proposed fix
+def _non_negative_int(value: str) -> int:
+ v = int(value)
+ if v < 0:
+ raise argparse.ArgumentTypeError("--max-cpu-offload-blocks must be >= 0")
+ return v
+
def build_parser() -> argparse.ArgumentParser:
@@
parser.add_argument(
"--max-cpu-offload-blocks",
- type=int,
+ type=_non_negative_int,
default=0,
help="Maximum number of KV blocks to keep in CPU offload storage. 0 disables CPU offload (default: 0).",
)Also applies to: 123-124
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/cli/main.py` around lines 82 - 87, The `--max-cpu-offload-blocks`
argument parser currently accepts any integer via `type=int`, including negative
values, and then silently disables offload when negative values are encountered
elsewhere in the code. Instead of allowing invalid input to pass through, add
validation to the argument parser itself to reject negative values at parse
time. Use a custom type function or the choices parameter in the
parser.add_argument call for the max-cpu-offload-blocks argument to ensure only
non-negative integers (0 or greater) are accepted, failing the CLI validation
immediately with an appropriate error message if a negative value is provided.
| def complete_transfer_result(self, result: TransferResult) -> None: | ||
| """Apply a completed CPU offload transfer to block residency metadata.""" | ||
| block_ids = self._pending_transfer_blocks.pop(result.job_id, []) | ||
| if not block_ids: | ||
| return | ||
| blocks = [self.blocks[block_id] for block_id in block_ids] | ||
| if not result.success: | ||
| for block in blocks: | ||
| block.location = KVBlockLocation.NPU if block.physical_page_id is not None else KVBlockLocation.CPU | ||
| raise RuntimeError(result.error or "KV offload transfer failed") | ||
|
|
||
| if isinstance(result.src, NPULoadStoreSpec) and isinstance(result.dst, CPULoadStoreSpec): | ||
| for block, slot_id in zip(blocks, result.dst.slot_ids, strict=True): | ||
| block.location = KVBlockLocation.CPU | ||
| block.physical_page_id = None | ||
| block.cpu_slot_id = slot_id | ||
| self._touch_cpu_block(block) | ||
| if block.ref_cnt == 0: | ||
| self.free_queue.append(block) | ||
| elif isinstance(result.src, CPULoadStoreSpec) and isinstance(result.dst, NPULoadStoreSpec): | ||
| for block, page_id in zip(blocks, result.dst.page_ids, strict=True): | ||
| block.location = KVBlockLocation.NPU | ||
| block.physical_page_id = page_id | ||
| if block.cpu_slot_id is not None: | ||
| self._free_cpu_slots.append(block.cpu_slot_id) | ||
| block.cpu_slot_id = None | ||
| block.cpu_last_access = 0 | ||
| else: | ||
| raise TypeError("Unsupported KV offload transfer result") |
There was a problem hiding this comment.
Failure recovery logic incorrectly restores CPU→NPU load jobs to NPU location.
When a CPU→NPU load transfer fails, the block's physical_page_id was already set to block_id in build_cpu_load_job (line 381). The recovery at line 403 checks physical_page_id is not None and incorrectly restores the location to NPU, but the actual data is still on CPU.
Proposed fix: track original location or check transfer direction
if not result.success:
for block in blocks:
- block.location = KVBlockLocation.NPU if block.physical_page_id is not None else KVBlockLocation.CPU
+ # Restore based on transfer direction, not physical_page_id
+ if isinstance(result.src, NPULoadStoreSpec):
+ # Store job failed: block was on NPU
+ block.location = KVBlockLocation.NPU
+ else:
+ # Load job failed: block was on CPU
+ block.location = KVBlockLocation.CPU
+ block.physical_page_id = None # Clear the prematurely set page_id
raise RuntimeError(result.error or "KV offload transfer failed")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/core/kv_cache.py` around lines 395 - 423, The failure recovery logic
in the complete_transfer_result method incorrectly restores block location when
a transfer fails. Currently it checks only whether physical_page_id is not None,
but for CPU→NPU transfers that fail, the physical_page_id is already set from
build_cpu_load_job yet the data remains on CPU. Replace the simple
physical_page_id check at line 403 with logic that examines the transfer
direction (using result.src and result.dst type checks) to determine whether the
block should be restored to NPU or CPU, ensuring you correctly restore blocks to
their original location before the failed transfer attempt.
| def ensure_num_cpu_slots(self, num_cpu_slots: int) -> None: | ||
| """Grow the CPU slot pool while preserving existing contents.""" | ||
|
|
||
| num_cpu_slots = int(num_cpu_slots) | ||
| if num_cpu_slots <= self.cpu_slots.shape[0]: | ||
| return | ||
| new_slots = torch.empty( | ||
| (num_cpu_slots, self.page_view.page_size_bytes), | ||
| dtype=torch.uint8, | ||
| device="cpu", | ||
| ).share_memory_() | ||
| new_slots[: self.cpu_slots.shape[0]].copy_(self.cpu_slots) | ||
| self.cpu_slots = new_slots |
There was a problem hiding this comment.
Resizing shared-memory pool breaks cross-process references.
ensure_num_cpu_slots allocates a new tensor and copies data, but the old shared-memory tensor reference is discarded. If other processes (e.g., forked chip children) hold references to the original cpu_slots, they will not see the resized pool and will continue using stale memory.
From serving_worker.py, the CPU slots are preallocated before forking (_preallocate_kv_offload_cpu_slots), so any post-fork resize would violate the shared-memory contract. Consider either:
- Preventing resizes after initialization (raise if called post-init)
- Preallocating sufficient capacity upfront based on
max_cpu_offload_blocks
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/core/kv_offload.py` around lines 202 - 214, The ensure_num_cpu_slots
method allocates a new tensor and replaces self.cpu_slots, which breaks
shared-memory references held by other processes (such as forked worker
children) that still point to the old tensor. Fix this by either preventing
resizes after initialization by raising an exception if ensure_num_cpu_slots is
called post-fork, or by preallocating sufficient capacity upfront during
initialization based on max_cpu_offload_blocks so that ensure_num_cpu_slots is
never called after the CPU slots have been shared with child processes. This
ensures all processes reference the same underlying shared-memory pool.
| parser.add_argument("--max-seq-len", type=int, default=512) | ||
| parser.add_argument("--max-new-tokens", type=int, default=2) | ||
| parser.add_argument("--long-prefill-token-threshold", type=int, default=512) | ||
| parser.add_argument("--max-cpu-offload-blocks", type=int, default=2) |
There was a problem hiding this comment.
Fail fast when --max-cpu-offload-blocks is non-positive in this verifier script.
Line 129 forces enable_kv_cpu_offload=True regardless of the value from Line 40. Passing 0 or a negative value can lead to timeout-driven false failures instead of immediate argument validation.
Proposed fix
async def main() -> None:
args = _parse_args()
+ if args.max_cpu_offload_blocks <= 0:
+ raise ValueError("--max-cpu-offload-blocks must be > 0 for this offload verification script.")
@@
engine_config = EngineConfig(
@@
- enable_kv_cpu_offload=True,
+ enable_kv_cpu_offload=True,
max_cpu_offload_blocks=args.max_cpu_offload_blocks,
)Also applies to: 129-130
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/run_npu_kv_cpu_offload.py` at line 40, Add validation to ensure the
--max-cpu-offload-blocks argument is a positive integer. After parsing
arguments, check that the parsed max_cpu_offload_blocks value is greater than 0,
and raise an error with a clear message if it is not. This validation should
fail fast before reaching the code at lines 129-130 where enable_kv_cpu_offload
is set to True, preventing timeout-driven failures when non-positive block
counts are provided.
c484d5f to
4c9f5d7
Compare
Summary
--max-cpu-offload-blocksCLI control;0disables CPU offload.Validation
ruff check --cache-dir /tmp/pypto-ruff-cache --config /data/xufeng/pypto-serving/ruff.toml <changed files>passed.python -m pytest /data/xufeng/pypto-serving/tests/test_cli.py -qpassed: 7 passed.python -m pytest /data/xufeng/pypto-serving/tests/test_batching.py -qpassed: 9 passed.--long-prefill-token-threshold 64.AI assistance was used for this change.