Skip to content

Support multi-program DistributedWorker dispatch#1699

Open
ndleslx wants to merge 1 commit into
hw-native-sys:mainfrom
ndleslx:support-distributed-worker-multiprogram
Open

Support multi-program DistributedWorker dispatch#1699
ndleslx wants to merge 1 commit into
hw-native-sys:mainfrom
ndleslx:support-distributed-worker-multiprogram

Conversation

@ndleslx

@ndleslx ndleslx commented Jun 8, 2026

Copy link
Copy Markdown

Summary

  • allow DistributedWorker to prepare multiple compatible DistributedCompiledProgram objects on one L3 worker
  • add worker.run(compiled, *args) dispatch through per-program runtime state
  • export DistributedWorker from pypto.runtime

Motivation

This supports serving-style generation where prefill and decode are separate HOST JIT programs but must share one L3 worker and worker-resident DeviceTensor KV cache.

Closes #1698

Checks

  • python -m compileall -q python/pypto/runtime/__init__.py python/pypto/runtime/distributed_runner.py
  • python - <<PY ... from pypto.runtime import DistributedWorker, DeviceTensor ... PY

@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 02ff39c9-7e51-4da4-bbf1-096566d15d59

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

DistributedWorker now accepts either a single DistributedCompiledProgram or a Sequence of them. The initializer validates platform/device compatibility, loads orchestration artifacts per program, and maintains per-program dispatch state. A new _run_compiled helper routes both __call__ and run(compiled, ...) through a unified flow that dispatches the correct program by identity.

Changes

Multi-program DistributedWorker support

Layer / File(s) Summary
Public API and module imports
python/pypto/runtime/__init__.py, python/pypto/runtime/distributed_runner.py
DistributedWorker is re-exported from the pypto.runtime package. Sequence from collections.abc is added to support the new multi-program constructor type annotation.
Orchestration entry resolution
python/pypto/runtime/distributed_runner.py
_load_orch_entry now filters candidate callables by matching __module__ to the generated module name, reducing unintended callable selection.
Multi-program initialization
python/pypto/runtime/distributed_runner.py
Constructor accepts single or Sequence[DistributedCompiledProgram], validates shared platform and device IDs across all programs, assembles orchestration artifacts and sub-worker callables per program, constructs a single L3 worker sized for maximum sub-worker count, and stores per-program dispatch state in an internal dictionary keyed by program identity while preserving first-program convenience attributes.
Unified dispatch routing
python/pypto/runtime/distributed_runner.py
New _run_compiled(compiled, *args, config) helper validates argument counts/types and tensor mapping against per-program state. __call__ delegates to _run_compiled using the primary program. run(compiled, ...) and register(compiled) now route through per-program state lookup by identity instead of single-program equality checks. Dispatch invocation uses program-specific entry function, tensors, callables, and configuration from internal state.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A rabbit hops through worker threads so bright,
Multi-programs now dance in shared device light,
One L3 stays steady, state flows just right,
Prefill and decode together take flight! 🐰✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: adding multi-program dispatch support to DistributedWorker, which aligns with the core objective of allowing multiple compatible programs on a single worker.
Description check ✅ Passed The description is directly related to the changeset, covering the three main changes: multi-program support for DistributedWorker, the per-program dispatch API, and the DistributedWorker export.
Linked Issues check ✅ Passed The code changes meet all primary coding requirements from issue #1698: allowing multiple compatible DistributedCompiledProgram objects, providing worker.run(compiled, *args) dispatch, maintaining single-program compatibility, and exporting DistributedWorker.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the linked issue requirements; the two modified files only contain changes supporting multi-program dispatch, export functionality, and per-program runtime state management.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates DistributedWorker to support executing multiple compiled programs by accepting a sequence of DistributedCompiledProgram objects and maintaining their states in a dictionary. It also refactors the entry function loading logic to ensure the module name matches. The feedback highlights a critical issue where using id(prog) as a dictionary key can lead to silent bugs if programs are garbage collected and their IDs are reused; using the program objects directly as keys is recommended instead. Additionally, the sequence check during initialization should explicitly exclude str and bytes to prevent strings from being incorrectly parsed as sequences of programs.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

"param_infos": param_infos,
"device_nums": len(dc.device_ids),
}
self._states[id(prog)] = state

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using id(prog) as a dictionary key is dangerous because id() is only guaranteed to be unique for the lifetime of the object. Since DistributedWorker does not store strong references to the other compiled programs (only self._compiled = first is kept), those programs can be garbage collected. If they are GC'd, their id can be reused by Python for newly created objects, leading to silent correctness bugs where the wrong program's state is retrieved.\n\nUsing the prog object itself as the dictionary key is much safer and more Pythonic. Since standard Python dictionaries hold strong references to their keys, this will also automatically keep the compiled programs alive for the lifetime of the worker.

Suggested change
self._states[id(prog)] = state
self._states[prog] = state

from pypto.ir.compiled_program import _validate_device_tensor # noqa: PLC0415

param_infos = self._param_infos
state = self._states.get(id(compiled))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update to use compiled directly as the key instead of id(compiled) to match the safe dictionary key pattern.

Suggested change
state = self._states.get(id(compiled))
state = self._states.get(compiled)

"""
self._require_open("register")
if compiled is not self._compiled:
if id(compiled) not in self._states:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update to use compiled directly as the key instead of id(compiled) to match the safe dictionary key pattern.

Suggested change
if id(compiled) not in self._states:
if compiled not in self._states:

Comment on lines +412 to +415
if isinstance(compiled, Sequence):
compiled_programs = list(compiled)
else:
compiled_programs = [compiled]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A string is a Sequence in Python. If a user accidentally passes a string (e.g., a path to the compiled program directory), it will be treated as a sequence of characters, leading to a confusing AttributeError later. We should exclude str and bytes from the sequence check to handle single program inputs correctly.

Suggested change
if isinstance(compiled, Sequence):
compiled_programs = list(compiled)
else:
compiled_programs = [compiled]
if isinstance(compiled, Sequence) and not isinstance(compiled, (str, bytes)):
compiled_programs = list(compiled)
else:
compiled_programs = [compiled]

@ndleslx ndleslx force-pushed the support-distributed-worker-multiprogram branch from 585aca3 to 108f640 Compare June 9, 2026 02:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Support multi-program dispatch for DistributedCompiledProgram

1 participant