Skip to content

feat(distributed): add pld.tensor.allreduce composite op#1746

Open
YunjiQin wants to merge 8 commits into
hw-native-sys:mainfrom
YunjiQin:incore-comm-api
Open

feat(distributed): add pld.tensor.allreduce composite op#1746
YunjiQin wants to merge 8 commits into
hw-native-sys:mainfrom
YunjiQin:incore-comm-api

Conversation

@YunjiQin

Copy link
Copy Markdown
Contributor

Summary

Adds an in-place cross-rank pld.tensor.allreduce intrinsic at the InCore IR level, lowered in LowerCompositeOps (pass 14) to the 4-phase notify / wait / remote_load+accumulate / store recipe.

User API mirrors pl.store's rebind idiom:

pub = pld.tensor.allreduce(pub, sig, op=pld.ReduceOp.Sum)
  • Lowering lives in LowerCompositeOps so distributed-collective expansion reuses the same infrastructure as tile.sin / tile.cos — no new pass slot. LoweringBuilder grows EmitFor / EmitForReduce / EmitIf / EmitIfExpr / NotEq callback-based primitives that emit ForStmt / IfStmt via a nested builder sharing the parent's temp counter, so emitted names stay unique at any nesting depth. The CompositeLoweringFn signature is collapsed to (call, args, builder).
  • First version supports ReduceOp.Sum only; Max / Min / Prod are reserved enum values rejected by the deducer until their lowerings land. Loop bounds are runtime queries (pld.system.nranks), so no CommGroup materialisation is needed and the rule stays at pass 14.
  • Barrier scheme (validated on NPU at P=2 and P=4): Phase 2 pre-reduce barrier uses race-free Set 1 / Eq 1 (single writer per signal cell); Phase 3.5 post-reduce barrier uses AtomicAdd 1 + Eq 2 to prevent fast ranks from overwriting their target slot while slow ranks are still reading it. The post-reduce barrier is the key correctness fix — without it, P=4 produced wrong sums on slower ranks.
  • Cross-layer sync: ReduceOp enum added to comm.h, bound in nanobind, mirrored in ir.pyi, re-exported at the pld top level. Shared put / get / allreduce DistributedTensor validation is extracted to _unwrap_distributed_tensors in op/_utils.py (ops with asymmetric signatures like put, whose src may be a plain pl.Tensor, validate inline).
  • Docs: pass 14 EN + zh-CN updated with the new rule, the (call, args, builder) signature, and a pointer to the structured-CF builder helpers.

Testing

  • Transforms / distributed UTs pass without regression (structural op-set invariants, ForStmt/IfStmt shape, idempotency, deducer rejection of plain Tensor target and non-Sum ReduceOp)
  • On-board ST tests/st/distributed/test_l3_tensor_allreduce_intrinsic.py validated on NPU at P=2 and P=4 against the same torch.allclose golden as the hand-rolled 4-phase reference
  • Rebased onto upstream/main; resolved a conflict in op/tensor_ops.py preserving upstream's permissive put src (Tensor or DistributedTensor) and region/offset support on get

YunjiQin added 4 commits June 11, 2026 11:50
Add an in-place cross-rank allreduce intrinsic at the InCore IR level
that lowers to the 4-phase notify / wait / remote_load+accumulate /
store recipe validated by the hand-written reference in
tests/st/distributed/test_l3_allreduce.py.

User API mirrors pl.store's rebind idiom:

    pub = pld.tensor.allreduce(pub, sig, op=pld.ReduceOp.Sum)

Lowering lives in LowerCompositeOps (pass 14) so distributed-collective
expansion shares infrastructure with tile.sin / tile.cos and no new
pass slot is required. To support the 4-phase recipe's structured
control flow, LoweringBuilder grows EmitFor / EmitForReduce / EmitIf /
EmitIfExpr / NotEq — callback-based primitives that emit ForStmt /
IfStmt via a nested builder sharing the parent's temp counter, so
emitted names stay unique across arbitrary nesting depth. The
CompositeLoweringFn signature is collapsed to (call, args, builder);
rules read kwargs / span / op_name from the original CallPtr.

Loop bounds are runtime queries (pld.system.nranks via get_comm_ctx),
so no CommGroup materialisation is required and the rule stays at
pass 14 rather than pass 36+. First-version lowering supports
ReduceOp.Sum only; Max / Min / Prod enum values are reserved and
rejected by the deducer until their lowerings land.

The shared put / get / allreduce DistributedTensor validation is
extracted to _unwrap_distributed_tensors in op/_utils.py.

Cross-layer sync: ReduceOp enum added to comm.h, bound in nanobind,
mirrored in ir.pyi stub, re-exported at pld top level.

Docs: pass 14 EN + zh-CN updated with the new rule, the (call, args,
builder) signature, and a pointer to the structured CF builder helpers
for future control-flow-bearing rules.

Tests: structural invariants (op-set, 3 ForStmts + 3 IfStmts shape),
idempotency, deducer rejection of plain Tensor target and non-Sum
ReduceOp. 1638 transforms / distributed UTs pass without regression.
…d ST

Validate the new intrinsic on real NPU (P=2 and P=4) and fix three
issues surfaced by the on-board codegen + numerics path:

1. INT32/INDEX type clash in loop bounds. The parser normalises Python
   int literals to INDEX (``_normalize_expr`` default), so loop control
   constants (start/step) must be INDEX. ``pld.system.nranks`` returns
   ScalarType(INT32); cast it to INDEX before using as the for-loop
   stop bound so all three bounds agree. Notify's ``value`` and wait's
   ``expected`` stay INT32 (matching the Python builder's int_dtype
   override for those slots).

2. tile.add result must be bound to a Var inside the reduce-loop's
   EmitIfExpr then-branch. Yielding the raw Call expression left the
   resulting ``pto.tadd ins(...) outs()`` with an empty ``outs`` slot;
   MLIR rejected it with ``error: expected SSA operand``. Bind the
   ``acc + recv`` result before returning it as the yield value.

3. Phase-3.5 post-reduce barrier — the real correctness fix. The
   intrinsic writes the reduced value back into the same ``target``
   window slot that peers are still reading via
   ``pld.tile.remote_load`` in Phase 3. A fast rank that finishes
   Phase 3 early would overwrite its slot while slow ranks were still
   accumulating, producing wrong sums on slower ranks. Symptom at P=4:
   rank 2's output was the correct sum + an extra rank-3 contribution.

   Insert a second notify-all / wait-all wave between Phase 3 and
   Phase 4 reusing the same signal cells; the second wait checks
   ``cell >= 2`` (each peer notifies twice across the two barriers).

Add an ST in tests/st/distributed/test_l3_tensor_allreduce_intrinsic.py
that mirrors test_l3_allreduce.py's harness but replaces the hand-rolled
4-phase body with a single ``pld.tensor.allreduce`` call. Validated on
NPU at P=2 and P=4 against the same torch.allclose golden.

UT update: the control-flow shape test now expects 5 ForStmts + 5
IfStmts (Phase 2a, 2b, 3, 3.5a, 3.5b) rather than 3 of each.
Lighter pre-reduce barrier with no semantic change: every signal cell
``cell[r, 0]`` has exactly one writer (rank r) so ``Set value=1`` is
race-free and avoids the atomic RMW that AtomicAdd carries. Phase 2b
correspondingly waits for ``== 1`` rather than ``>= 1``. The post-reduce
barrier (Phase 3.5) keeps AtomicAdd 1 + Ge >= 2 because the symmetric
``Set 0 / Eq 0`` reset path deadlocks under on-board ``TWAIT(==0)`` —
P=4 was reproducibly stuck on AICPU stream sync. The mixed scheme keeps
the hot path on the runtime's proven monotonic-counter barrier while
shaving the Phase 2 atomic; reentrancy of the signal buffer is still
not handled (cells end at 2, not 0) and remains a follow-up.

NPU validated (P=2 + P=4): tests/st/distributed/test_l3_tensor_allreduce_intrinsic.py
The accumulator goes 0 → 1 (Phase 2a Set 1) → 2 (Phase 3.5a
AtomicAdd 1) monotonically and is never decreased within a single
allreduce call, so cell == 2 is precisely the post-AtomicAdd state.
This makes the wait predicate uniform with Phase 2b (Eq) and matches
the single-shot signal semantics — Ge was unnecessarily lax. P=2 and
P=4 still pass on NPU.
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 191cb900-42b8-4d3f-943c-427368a02170

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces pld.tensor.allreduce, a distributed collective reduction operator spanning IR definition, Python API layers, and a sophisticated multi-phase lowering pass. The change includes ReduceOp enum across language boundaries, IR op registration with validation, control-flow-aware lowering infrastructure, and comprehensive tests.

Changes

Distributed Tensor Allreduce

Layer / File(s) Summary
ReduceOp enum definition across ABI layers
include/pypto/ir/comm.h, python/bindings/modules/ir.cpp, python/pypto/pypto_core/ir.pyi
Introduce ReduceOp enum with Sum, Max, Min, Prod variants in C++ header, Python nanobind bindings, and stub file to represent reduction operators across the ABI boundary.
IR op registration and type deduction
src/ir/op/distributed/allreduce.cpp, CMakeLists.txt
Register pld.tensor.allreduce(target, signal, *, op: int) -> DistributedTensorType with dedicated deducer validating DistributedTensorType arguments, INT32 signal dtype, and op=ReduceOp::kSum enforcement. Add source file to build system.
IR-level allreduce builder
python/pypto/ir/op/distributed/tensor_ops.py
Add allreduce(target, signal, op, *, span) IR builder packing ReduceOp as integer attribute and emitting the composite call.
Distributed module re-exports and validation helper
python/pypto/language/distributed/__init__.py, python/pypto/language/distributed/op/_utils.py
Re-export ReduceOp at package level. Introduce shared _unwrap_distributed_tensors(op_name, **named) to centralize window-bound DistributedTensor validation for DSL wrappers.
DSL-level allreduce wrapper and get refactoring
python/pypto/language/distributed/op/tensor_ops.py
Add pld.tensor.allreduce(target, signal, *, op=ReduceOp.Sum) DSL wrapper unwrapping/validating inputs and returning DistributedTensor result. Refactor pld.tensor.get to use shared validation helper.
LoweringBuilder infrastructure: control-flow emitters and rule interface
src/ir/transforms/lower_composite_ops_pass.cpp
Extend LoweringBuilder with structured control-flow methods (EmitFor, EmitForReduce, EmitIf, EmitIfExpr), scalar comparison helper, and shared nested-builder temp counters. Update CompositeLoweringFn signature to accept original CallPtr for metadata. Add WrapBodyStmts normalization.
Allreduce lowering rule and dispatch registration
src/ir/transforms/lower_composite_ops_pass.cpp
Implement multi-phase LowerTensorAllReduceRule: notify/wait signaling, conditional remote load with accumulation via reduce-if expression, post-reduce atomic barrier, and final store. Adapt sin/cos rules to new signature. Register allreduce rule in dispatch table and update call sites.
Pass documentation updates
docs/en/dev/passes/14-lower_composite_ops.md, docs/zh-cn/dev/passes/14-lower_composite_ops.md
Document allreduce lowering (4-phase notify/wait/remote_load/store), new LoweringBuilder control-flow capabilities, revised CompositeLoweringFn signature, and updated composite-op extension procedure for both languages.
Unit tests for lowering pass
tests/ut/ir/transforms/test_lower_composite_ops.py
Verify allreduce composite removal and required primitive/system ops, validate exact ForStmt/IfStmt control-flow counts, check idempotency, test no-allreduce sanity case, and validate deducer rejection of invalid target types and unsupported reduce operations.
System test for end-to-end allreduce
tests/st/distributed/test_l3_tensor_allreduce_intrinsic.py
Verify N-rank allreduce for 2 and 4 ranks: stage inputs to windowed buffers, call pld.tensor.allreduce with signal barrier, stage outputs, and assert runtime results match golden element-wise sum using torch.allclose.

Sequence Diagrams

sequenceDiagram
    participant User as DSL User
    participant DSLAllreduce as pld.tensor.allreduce<br/>(DSL wrapper)
    participant Validation as _unwrap_distributed<br/>_tensors
    participant IRAllreduce as ir.allreduce<br/>(IR builder)
    participant OpReg as OpRegistry<br/>(Deducer)
    participant Lowering as LowerCompositeOps<br/>Pass
    participant Device as Device Code<br/>(Multi-phase)

    User->>DSLAllreduce: allreduce(target, signal, op=Sum)
    DSLAllreduce->>Validation: unwrap_distributed_tensors("pld.tensor.allreduce", target=..., signal=...)
    Validation->>Validation: Validate window-bound DistributedTensor types
    Validation-->>DSLAllreduce: (target_expr, signal_expr)
    DSLAllreduce->>IRAllreduce: allreduce(target_expr, signal_expr, ReduceOp.Sum)
    IRAllreduce->>IRAllreduce: Pack op as integer attribute
    IRAllreduce-->>DSLAllreduce: pld.tensor.allreduce Call
    DSLAllreduce-->>User: DistributedTensor result
    Note over User: IR validation phase
    IRAllreduce->>OpReg: Deducer validates args
    OpReg->>OpReg: Check DistributedTensorType, INT32 signal, op=Sum
    OpReg-->>IRAllreduce: Return target type (in-place)
    Note over User: Lowering phase
    Lowering->>Lowering: LowerTensorAllReduceRule triggered
    Lowering->>Device: EmitFor notify phase (per neighbor)
    Lowering->>Device: EmitFor/EmitIf reduce phase (conditional loads/accumulate)
    Lowering->>Device: Emit post-reduce barrier (atomic)
    Lowering->>Device: Emit tile.store (accumulator → target)
    Device-->>Lowering: Multi-phase primitive ops
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • hw-native-sys/pypto#1646: Adds N-rank ST test for allreduce, building on the core pld.tensor.allreduce op registration and lowering introduced in this PR.

Suggested labels

enhancement

Poem

🐰 A rabbit's ode to collective dreams,
Where allreduce flows in data streams.
Sum across the ranks so wide,
With structured loops, reduce with pride!
From IR to device, the signal's call,
Multi-phase magic—we lower them all. 🎯

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main feature addition: a new distributed all-reduce composite operation.
Description check ✅ Passed The description comprehensively explains the feature, implementation approach, testing, and cross-layer integration related to the all-reduce composite op.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces pld.tensor.allreduce, a new composite collective operation for distributed tensors. The implementation includes the C++ op definition, Python bindings, and a lowering rule in LowerCompositeOps that utilizes new structured control-flow helpers (EmitFor, EmitIf, etc.). Comprehensive tests are added to verify the decomposition. A review comment identifies a potential synchronization issue, suggesting the use of WaitCmp::kGe instead of WaitCmp::kEq for the post-reduce barrier to align with established best practices.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread src/ir/transforms/lower_composite_ops_pass.cpp Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/pypto/ir/op/distributed/tensor_ops.py`:
- Line 183: The __all__ export list is not alphabetically sorted; update the
__all__ variable (the list named __all__ in this module) to the isort-style
alphabetical order so it reads ["alloc_window_buffer", "allreduce", "get",
"put", "window"] instead of the current ordering.

In `@python/pypto/language/distributed/op/tensor_ops.py`:
- Line 302: Update the module-level __all__ list in tensor_ops.py so its entries
are alphabetically sorted; change the current __all__ = ["allreduce",
"alloc_window_buffer", "get", "put", "window"] to the isort-style order
["alloc_window_buffer", "allreduce", "get", "put", "window"] so exports are
deterministic and consistent with project conventions (edit the __all__ variable
in this file).

In `@src/ir/transforms/lower_composite_ops_pass.cpp`:
- Around line 558-599: The comment claims the wait checks "cell >= 2" and
references the runtime's "AtomicAdd + Ge" pattern, but the code uses
WaitCmp::kEq; fix by making the implementation match the comment: change the
comparator passed to the pld.system.wait Op in the bind that currently uses
WaitCmp::kEq with two_i32 to WaitCmp::kGe (replace WaitCmp::kEq ->
WaitCmp::kGe), and ensure the surrounding comment remains accurate (or adjust it
if you prefer the exact-equals semantics instead). This targets the wait
creation site using OpRegistry::GetInstance().Create("pld.system.wait", ...)
that currently binds "wait2_ret".

In `@tests/ut/ir/transforms/test_lower_composite_ops.py`:
- Line 555: Replace the ambiguous multiplication sign '×' with a plain ASCII 'x'
in the docstring that contains the phrase "a write-after-read race that
manifests as off-by-N×peer drift on" (found in test_lower_composite_ops.py
docstring), i.e., change "off-by-N×peer" to "off-by-Nxpeer" (or "off-by-N x
peer" if spacing preferred) so Ruff no longer flags the character.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1a97aafb-0fa7-45c8-bd4d-88df8512198c

📥 Commits

Reviewing files that changed from the base of the PR and between 3cc256c and 4a5e8c7.

📒 Files selected for processing (14)
  • CMakeLists.txt
  • docs/en/dev/passes/14-lower_composite_ops.md
  • docs/zh-cn/dev/passes/14-lower_composite_ops.md
  • include/pypto/ir/comm.h
  • python/bindings/modules/ir.cpp
  • python/pypto/ir/op/distributed/tensor_ops.py
  • python/pypto/language/distributed/__init__.py
  • python/pypto/language/distributed/op/_utils.py
  • python/pypto/language/distributed/op/tensor_ops.py
  • python/pypto/pypto_core/ir.pyi
  • src/ir/op/distributed/allreduce.cpp
  • src/ir/transforms/lower_composite_ops_pass.cpp
  • tests/st/distributed/test_l3_tensor_allreduce_intrinsic.py
  • tests/ut/ir/transforms/test_lower_composite_ops.py

Comment thread python/pypto/ir/op/distributed/tensor_ops.py Outdated
Comment thread python/pypto/language/distributed/op/tensor_ops.py Outdated
Comment thread src/ir/transforms/lower_composite_ops_pass.cpp Outdated
Comment thread tests/ut/ir/transforms/test_lower_composite_ops.py Outdated
YunjiQin added 4 commits June 11, 2026 14:41
- Sort __all__ alphabetically in both pld.tensor.* tensor_ops.py files
  (Ruff RUF022; CodeRabbit feedback).
- Replace ambiguous `×` with `*` in test_lower_composite_ops.py
  docstring (Ruff RUF002; CodeRabbit feedback).
- Update Phase 3.5 comments in lower_composite_ops_pass.cpp to describe
  WaitCmp::kEq (the actual emitted op) instead of the stale "Ge / >= 2"
  reference; explain why kEq is equivalent-but-tighter within a single
  call and link to PTOAS issue hw-native-sys#797 for the deferred TWAIT(==0) path
  (CodeRabbit feedback).

Skipped: gemini-code-assist's kEq→kGe suggestion at line 594 — the kEq
choice is intentional (single-shot buffer, deterministic monotonic
0→1→2 cell value), tightened in 4a5e8c7 to match Phase 2b style.
Rationale now documented in the rewritten Phase 3.5 comment block.
…educe

ConvertTensorToTileOps (pass 12) runs upstream of LowerCompositeOps
(pass 14), so it sees ``pld.tensor.allreduce`` as a single composite
Call before the 4-phase decomposition exists. Register the op in both
sites of the pass's param-direction analysis:

* ``GetWriteTargetExpr``: return ``args_[0]`` (target) as the primary
  data write target — matches the put/get/remote_store convention.

* Per-op read/write marker: mark both ``target`` (args_[0]) and
  ``signal`` (args_[1]) as read AND written. Both are InOut across
  the eventual decomposition (target read in Phase 3, written in
  Phase 4; signal written in Phase 2a/3.5a, read in Phase 2b/3.5b).
  Marking both args on both sides surfaces the enclosing window
  params as InOut without depending on LowerCompositeOps order.

NPU dist-system-tests at P=2 and P=4 still pass; 1655 transforms UTs
pass without regression.
Add Before/Expected UT mirroring the existing pld.tile.put / pld.tile.get
tests in the same class: verify ConvertTensorToTileOps upgrades both
``target`` and ``signal`` from In to InOut on the kernel signature when
the kernel calls ``pld.tensor.allreduce``. Pins the read+write marker
added in fdb2e36 so a future regression that drops the InOut
annotation surfaces as a structural diff.
Reviewer flagged a real race window in the Phase 2b and Phase 3.5b
waits I had switched from kGe to kEq in 4a5e8c7 / 24a1939e: the cell
is monotonic within a single call, but a slow rank's first poll is NOT
guaranteed to land on the post-notify value. If a faster peer races
ahead (its Phase 2a, then its Phase 2b, then its Phase 3 — microseconds
of remote loads — then its Phase 3.5a AtomicAdd), the slow rank's
cell[peer] has already advanced past 1 before the slow rank even
enters its Phase 2b. ``kEq(==1)`` deadlocks; ``kGe(>=1)`` survives.

The hand-written reference at tests/st/distributed/test_l3_allreduce.py
uses Ge for exactly this reason — my "kEq matches the post-notify
state exactly" argument was wrong because it assumed the observer
reads before the cell can advance.

Both Phase 2b and Phase 3.5b are now kGe again, matching the proven
reference. Cell ranges still cap at 2 within a single call (Set 1 +
AtomicAdd 1, no peer adds twice), so Ge stays tight.

Also surface the single-shot buffer contract to users:

* DSL docstring (pld.tensor.allreduce) gains a prominent warning that
  the same signal buffer cannot be reused for back-to-back allreduces
  — a stale ``2`` would make the next call's wait>=1 pass immediately
  on the leftover value and break the barrier. Callers must allocate
  a fresh signal buffer per call.
* Pass 14 docs (EN + zh-CN) add a new section describing the rule's
  signal scheme, why kGe is load-bearing, and the same reuse warning.

NPU P=2 / P=4 still pass; 114 lowering + convert UTs pass.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant