Support multi-program DistributedWorker dispatch#1699
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthrough
ChangesMulti-program DistributedWorker support
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request updates DistributedWorker to support executing multiple compiled programs by accepting a sequence of DistributedCompiledProgram objects and maintaining their states in a dictionary. It also refactors the entry function loading logic to ensure the module name matches. The feedback highlights a critical issue where using id(prog) as a dictionary key can lead to silent bugs if programs are garbage collected and their IDs are reused; using the program objects directly as keys is recommended instead. Additionally, the sequence check during initialization should explicitly exclude str and bytes to prevent strings from being incorrectly parsed as sequences of programs.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| "param_infos": param_infos, | ||
| "device_nums": len(dc.device_ids), | ||
| } | ||
| self._states[id(prog)] = state |
There was a problem hiding this comment.
Using id(prog) as a dictionary key is dangerous because id() is only guaranteed to be unique for the lifetime of the object. Since DistributedWorker does not store strong references to the other compiled programs (only self._compiled = first is kept), those programs can be garbage collected. If they are GC'd, their id can be reused by Python for newly created objects, leading to silent correctness bugs where the wrong program's state is retrieved.\n\nUsing the prog object itself as the dictionary key is much safer and more Pythonic. Since standard Python dictionaries hold strong references to their keys, this will also automatically keep the compiled programs alive for the lifetime of the worker.
| self._states[id(prog)] = state | |
| self._states[prog] = state |
| from pypto.ir.compiled_program import _validate_device_tensor # noqa: PLC0415 | ||
|
|
||
| param_infos = self._param_infos | ||
| state = self._states.get(id(compiled)) |
| """ | ||
| self._require_open("register") | ||
| if compiled is not self._compiled: | ||
| if id(compiled) not in self._states: |
| if isinstance(compiled, Sequence): | ||
| compiled_programs = list(compiled) | ||
| else: | ||
| compiled_programs = [compiled] |
There was a problem hiding this comment.
A string is a Sequence in Python. If a user accidentally passes a string (e.g., a path to the compiled program directory), it will be treated as a sequence of characters, leading to a confusing AttributeError later. We should exclude str and bytes from the sequence check to handle single program inputs correctly.
| if isinstance(compiled, Sequence): | |
| compiled_programs = list(compiled) | |
| else: | |
| compiled_programs = [compiled] | |
| if isinstance(compiled, Sequence) and not isinstance(compiled, (str, bytes)): | |
| compiled_programs = list(compiled) | |
| else: | |
| compiled_programs = [compiled] |
585aca3 to
108f640
Compare
Summary
DistributedWorkerto prepare multiple compatibleDistributedCompiledProgramobjects on one L3 workerworker.run(compiled, *args)dispatch through per-program runtime stateDistributedWorkerfrompypto.runtimeMotivation
This supports serving-style generation where prefill and decode are separate HOST JIT programs but must share one L3 worker and worker-resident
DeviceTensorKV cache.Closes #1698
Checks
python -m compileall -q python/pypto/runtime/__init__.py python/pypto/runtime/distributed_runner.pypython - <<PY ... from pypto.runtime import DistributedWorker, DeviceTensor ... PY